1use std::collections::HashMap;
7
8#[derive(Debug)]
10pub struct KafkaProtocolHandler {
11 api_versions: HashMap<i16, ApiVersion>,
12}
13
14impl KafkaProtocolHandler {
15 pub fn new() -> Self {
17 let mut api_versions = HashMap::new();
18 api_versions.insert(
20 0,
21 ApiVersion {
22 min_version: 0,
23 max_version: 12,
24 },
25 ); api_versions.insert(
27 1,
28 ApiVersion {
29 min_version: 0,
30 max_version: 16,
31 },
32 ); api_versions.insert(
34 3,
35 ApiVersion {
36 min_version: 0,
37 max_version: 12,
38 },
39 ); api_versions.insert(
41 9,
42 ApiVersion {
43 min_version: 0,
44 max_version: 5,
45 },
46 ); api_versions.insert(
48 15,
49 ApiVersion {
50 min_version: 0,
51 max_version: 9,
52 },
53 ); api_versions.insert(
55 16,
56 ApiVersion {
57 min_version: 0,
58 max_version: 9,
59 },
60 ); api_versions.insert(
62 18,
63 ApiVersion {
64 min_version: 0,
65 max_version: 4,
66 },
67 ); api_versions.insert(
69 19,
70 ApiVersion {
71 min_version: 0,
72 max_version: 7,
73 },
74 ); api_versions.insert(
76 20,
77 ApiVersion {
78 min_version: 0,
79 max_version: 6,
80 },
81 ); api_versions.insert(
83 32,
84 ApiVersion {
85 min_version: 0,
86 max_version: 4,
87 },
88 ); api_versions.insert(
90 49,
91 ApiVersion {
92 min_version: 0,
93 max_version: 4,
94 },
95 ); Self { api_versions }
98 }
99}
100
101impl Default for KafkaProtocolHandler {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107impl KafkaProtocolHandler {
108 pub fn parse_request(&self, data: &[u8]) -> Result<KafkaRequest> {
110 if data.len() < 12 {
112 return Err(anyhow::anyhow!("Message too short for header"));
113 }
114
115 let api_key = ((data[4] as i16) << 8) | (data[5] as i16);
117
118 let api_version = ((data[6] as i16) << 8) | (data[7] as i16);
120
121 let correlation_id = ((data[8] as i32) << 24)
123 | ((data[9] as i32) << 16)
124 | ((data[10] as i32) << 8)
125 | (data[11] as i32);
126
127 if data.len() < 14 {
129 return Err(anyhow::anyhow!("Message too short for client ID length"));
130 }
131 let client_id_len = ((data[12] as i16) << 8) | (data[13] as i16);
132
133 let client_id_start = 14;
135 let client_id_end = client_id_start + (client_id_len as usize);
136 if data.len() < client_id_end {
137 return Err(anyhow::anyhow!("Message too short for client ID"));
138 }
139 let client_id = if client_id_len > 0 {
140 String::from_utf8(data[client_id_start..client_id_end].to_vec())
141 .map_err(|e| anyhow::anyhow!("Invalid client ID encoding: {}", e))?
142 } else {
143 String::new()
144 };
145
146 let request_type = match api_key {
147 0 => KafkaRequestType::Produce,
148 1 => KafkaRequestType::Fetch,
149 3 => KafkaRequestType::Metadata,
150 9 => KafkaRequestType::ListGroups,
151 15 => KafkaRequestType::DescribeGroups,
152 18 => KafkaRequestType::ApiVersions,
153 19 => KafkaRequestType::CreateTopics,
154 20 => KafkaRequestType::DeleteTopics,
155 32 => KafkaRequestType::DescribeConfigs,
156 _ => KafkaRequestType::ApiVersions, };
158
159 Ok(KafkaRequest {
160 api_key,
161 api_version,
162 correlation_id,
163 client_id,
164 request_type,
165 })
166 }
167
168 pub fn serialize_response(
170 &self,
171 response: &KafkaResponse,
172 correlation_id: i32,
173 ) -> Result<Vec<u8>> {
174 match response {
176 KafkaResponse::ApiVersions => {
177 let mut data = Vec::new();
179 data.extend_from_slice(&correlation_id.to_be_bytes());
181 data.extend_from_slice(&0i16.to_be_bytes());
183 data.extend_from_slice(&0i32.to_be_bytes());
185 Ok(data)
186 }
187 KafkaResponse::CreateTopics => {
188 let mut data = Vec::new();
190 data.extend_from_slice(&correlation_id.to_be_bytes());
191 data.extend_from_slice(&0i16.to_be_bytes()); data.extend_from_slice(&1i32.to_be_bytes()); let topic_name = b"default-topic";
195 data.extend_from_slice(&(topic_name.len() as i16).to_be_bytes());
196 data.extend_from_slice(topic_name);
197 data.extend_from_slice(&0i16.to_be_bytes()); Ok(data)
199 }
200 _ => {
201 let mut data = Vec::new();
203 data.extend_from_slice(&correlation_id.to_be_bytes());
204 data.extend_from_slice(&0i16.to_be_bytes()); Ok(data)
206 }
207 }
208 }
209
210 pub fn is_api_version_supported(&self, api_key: i16, version: i16) -> bool {
212 if let Some(api_version) = self.api_versions.get(&api_key) {
213 version >= api_version.min_version && version <= api_version.max_version
214 } else {
215 false
216 }
217 }
218}
219
220#[derive(Debug)]
222pub struct KafkaRequest {
223 pub api_key: i16,
224 pub api_version: i16,
225 pub correlation_id: i32,
226 pub client_id: String,
227 pub request_type: KafkaRequestType,
228}
229
230#[derive(Debug)]
232pub enum KafkaRequestType {
233 Metadata,
234 Produce,
235 Fetch,
236 ListGroups,
237 DescribeGroups,
238 ApiVersions,
239 CreateTopics,
240 DeleteTopics,
241 DescribeConfigs,
242}
243
244#[derive(Debug)]
246pub enum KafkaResponse {
247 Metadata,
248 Produce,
249 Fetch,
250 ListGroups,
251 DescribeGroups,
252 ApiVersions,
253 CreateTopics,
254 DeleteTopics,
255 DescribeConfigs,
256}
257
258#[derive(Debug)]
259struct ApiVersion {
260 min_version: i16,
261 max_version: i16,
262}
263
264type Result<T> = std::result::Result<T, anyhow::Error>;
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
273 fn test_protocol_handler_new() {
274 let handler = KafkaProtocolHandler::new();
275 assert!(handler.api_versions.len() > 0);
276 assert!(handler.api_versions.contains_key(&0)); assert!(handler.api_versions.contains_key(&1)); assert!(handler.api_versions.contains_key(&18)); }
280
281 #[test]
282 fn test_protocol_handler_default() {
283 let handler = KafkaProtocolHandler::default();
284 assert!(handler.api_versions.len() > 0);
285 }
286
287 #[test]
288 fn test_is_api_version_supported_produce() {
289 let handler = KafkaProtocolHandler::new();
290 assert!(handler.is_api_version_supported(0, 0));
292 assert!(handler.is_api_version_supported(0, 12));
293 assert!(!handler.is_api_version_supported(0, 13));
294 assert!(!handler.is_api_version_supported(0, -1));
295 }
296
297 #[test]
298 fn test_is_api_version_supported_fetch() {
299 let handler = KafkaProtocolHandler::new();
300 assert!(handler.is_api_version_supported(1, 0));
302 assert!(handler.is_api_version_supported(1, 16));
303 assert!(!handler.is_api_version_supported(1, 17));
304 }
305
306 #[test]
307 fn test_is_api_version_supported_metadata() {
308 let handler = KafkaProtocolHandler::new();
309 assert!(handler.is_api_version_supported(3, 0));
311 assert!(handler.is_api_version_supported(3, 12));
312 assert!(!handler.is_api_version_supported(3, 13));
313 }
314
315 #[test]
316 fn test_is_api_version_supported_api_versions() {
317 let handler = KafkaProtocolHandler::new();
318 assert!(handler.is_api_version_supported(18, 0));
320 assert!(handler.is_api_version_supported(18, 4));
321 assert!(!handler.is_api_version_supported(18, 5));
322 }
323
324 #[test]
325 fn test_is_api_version_unsupported_api_key() {
326 let handler = KafkaProtocolHandler::new();
327 assert!(!handler.is_api_version_supported(999, 0));
329 assert!(!handler.is_api_version_supported(-1, 0));
330 }
331
332 #[test]
335 fn test_parse_request_too_short() {
336 let handler = KafkaProtocolHandler::new();
337 let data = vec![0u8; 5]; let result = handler.parse_request(&data);
339 assert!(result.is_err());
340 }
341
342 #[test]
343 fn test_parse_request_minimal_header() {
344 let handler = KafkaProtocolHandler::new();
345 let mut data = vec![0u8; 14];
347 data[4] = 0;
349 data[5] = 18;
350 data[6] = 0;
352 data[7] = 0;
353 data[8] = 0;
355 data[9] = 0;
356 data[10] = 0;
357 data[11] = 1;
358 data[12] = 0;
360 data[13] = 0;
361
362 let result = handler.parse_request(&data);
363 assert!(result.is_ok());
364 let request = result.unwrap();
365 assert_eq!(request.api_key, 18);
366 assert_eq!(request.api_version, 0);
367 assert_eq!(request.correlation_id, 1);
368 assert_eq!(request.client_id, "");
369 }
370
371 #[test]
372 fn test_parse_request_with_client_id() {
373 let handler = KafkaProtocolHandler::new();
374 let client_id = b"test-client";
375 let client_id_len = client_id.len() as i16;
376
377 let mut data = vec![0u8; 14 + client_id.len()];
378 data[4] = 0;
380 data[5] = 0;
381 data[6] = 0;
383 data[7] = 7;
384 data[8] = 0;
386 data[9] = 0;
387 data[10] = 0;
388 data[11] = 42;
389 data[12] = (client_id_len >> 8) as u8;
391 data[13] = (client_id_len & 0xFF) as u8;
392 data[14..].copy_from_slice(client_id);
394
395 let result = handler.parse_request(&data);
396 assert!(result.is_ok());
397 let request = result.unwrap();
398 assert_eq!(request.api_key, 0);
399 assert_eq!(request.api_version, 7);
400 assert_eq!(request.correlation_id, 42);
401 assert_eq!(request.client_id, "test-client");
402 assert!(matches!(request.request_type, KafkaRequestType::Produce));
403 }
404
405 #[test]
406 fn test_parse_request_produce() {
407 let handler = KafkaProtocolHandler::new();
408 let mut data = vec![0u8; 14];
409 data[4] = 0; data[5] = 0;
411 data[12] = 0; data[13] = 0;
413
414 let result = handler.parse_request(&data).unwrap();
415 assert!(matches!(result.request_type, KafkaRequestType::Produce));
416 }
417
418 #[test]
419 fn test_parse_request_fetch() {
420 let handler = KafkaProtocolHandler::new();
421 let mut data = vec![0u8; 14];
422 data[4] = 0; data[5] = 1;
424 data[12] = 0;
425 data[13] = 0;
426
427 let result = handler.parse_request(&data).unwrap();
428 assert!(matches!(result.request_type, KafkaRequestType::Fetch));
429 }
430
431 #[test]
432 fn test_parse_request_metadata() {
433 let handler = KafkaProtocolHandler::new();
434 let mut data = vec![0u8; 14];
435 data[4] = 0; data[5] = 3;
437 data[12] = 0;
438 data[13] = 0;
439
440 let result = handler.parse_request(&data).unwrap();
441 assert!(matches!(result.request_type, KafkaRequestType::Metadata));
442 }
443
444 #[test]
445 fn test_parse_request_list_groups() {
446 let handler = KafkaProtocolHandler::new();
447 let mut data = vec![0u8; 14];
448 data[4] = 0; data[5] = 9;
450 data[12] = 0;
451 data[13] = 0;
452
453 let result = handler.parse_request(&data).unwrap();
454 assert!(matches!(result.request_type, KafkaRequestType::ListGroups));
455 }
456
457 #[test]
458 fn test_parse_request_describe_groups() {
459 let handler = KafkaProtocolHandler::new();
460 let mut data = vec![0u8; 14];
461 data[4] = 0; data[5] = 15;
463 data[12] = 0;
464 data[13] = 0;
465
466 let result = handler.parse_request(&data).unwrap();
467 assert!(matches!(result.request_type, KafkaRequestType::DescribeGroups));
468 }
469
470 #[test]
471 fn test_parse_request_api_versions() {
472 let handler = KafkaProtocolHandler::new();
473 let mut data = vec![0u8; 14];
474 data[4] = 0; data[5] = 18;
476 data[12] = 0;
477 data[13] = 0;
478
479 let result = handler.parse_request(&data).unwrap();
480 assert!(matches!(result.request_type, KafkaRequestType::ApiVersions));
481 }
482
483 #[test]
484 fn test_parse_request_create_topics() {
485 let handler = KafkaProtocolHandler::new();
486 let mut data = vec![0u8; 14];
487 data[4] = 0; data[5] = 19;
489 data[12] = 0;
490 data[13] = 0;
491
492 let result = handler.parse_request(&data).unwrap();
493 assert!(matches!(result.request_type, KafkaRequestType::CreateTopics));
494 }
495
496 #[test]
497 fn test_parse_request_delete_topics() {
498 let handler = KafkaProtocolHandler::new();
499 let mut data = vec![0u8; 14];
500 data[4] = 0; data[5] = 20;
502 data[12] = 0;
503 data[13] = 0;
504
505 let result = handler.parse_request(&data).unwrap();
506 assert!(matches!(result.request_type, KafkaRequestType::DeleteTopics));
507 }
508
509 #[test]
510 fn test_parse_request_describe_configs() {
511 let handler = KafkaProtocolHandler::new();
512 let mut data = vec![0u8; 14];
513 data[4] = 0; data[5] = 32;
515 data[12] = 0;
516 data[13] = 0;
517
518 let result = handler.parse_request(&data).unwrap();
519 assert!(matches!(result.request_type, KafkaRequestType::DescribeConfigs));
520 }
521
522 #[test]
523 fn test_parse_request_unsupported_api_defaults_to_api_versions() {
524 let handler = KafkaProtocolHandler::new();
525 let mut data = vec![0u8; 14];
526 data[4] = 0; data[5] = 99;
528 data[12] = 0;
529 data[13] = 0;
530
531 let result = handler.parse_request(&data).unwrap();
532 assert!(matches!(result.request_type, KafkaRequestType::ApiVersions));
534 }
535
536 #[test]
537 fn test_parse_request_invalid_client_id_length() {
538 let handler = KafkaProtocolHandler::new();
539 let mut data = vec![0u8; 14];
540 data[4] = 0;
541 data[5] = 18;
542 data[12] = 0;
544 data[13] = 100;
545
546 let result = handler.parse_request(&data);
547 assert!(result.is_err());
548 }
549
550 #[test]
551 fn test_parse_request_missing_client_id_length() {
552 let handler = KafkaProtocolHandler::new();
553 let data = vec![0u8; 12]; let result = handler.parse_request(&data);
556 assert!(result.is_err());
557 }
558
559 #[test]
560 fn test_parse_request_max_values() {
561 let handler = KafkaProtocolHandler::new();
562 let mut data = vec![0u8; 14];
563 data[4] = 0x7F;
565 data[5] = 0xFF;
566 data[6] = 0x7F;
568 data[7] = 0xFF;
569 data[8] = 0x7F;
571 data[9] = 0xFF;
572 data[10] = 0xFF;
573 data[11] = 0xFF;
574 data[12] = 0;
576 data[13] = 0;
577
578 let result = handler.parse_request(&data);
579 assert!(result.is_ok());
580 let request = result.unwrap();
581 assert_eq!(request.api_key, 0x7FFF);
582 assert_eq!(request.api_version, 0x7FFF);
583 assert_eq!(request.correlation_id, 0x7FFFFFFF);
584 }
585
586 #[test]
589 fn test_serialize_response_api_versions() {
590 let handler = KafkaProtocolHandler::new();
591 let response = KafkaResponse::ApiVersions;
592 let correlation_id = 12345;
593
594 let result = handler.serialize_response(&response, correlation_id);
595 assert!(result.is_ok());
596
597 let data = result.unwrap();
598 assert!(data.len() > 0);
599
600 let corr_id = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
602 assert_eq!(corr_id, correlation_id);
603
604 let error_code = i16::from_be_bytes([data[4], data[5]]);
606 assert_eq!(error_code, 0); }
608
609 #[test]
610 fn test_serialize_response_create_topics() {
611 let handler = KafkaProtocolHandler::new();
612 let response = KafkaResponse::CreateTopics;
613 let correlation_id = 999;
614
615 let result = handler.serialize_response(&response, correlation_id);
616 assert!(result.is_ok());
617
618 let data = result.unwrap();
619 assert!(data.len() > 0);
620
621 let corr_id = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
623 assert_eq!(corr_id, correlation_id);
624 }
625
626 #[test]
627 fn test_serialize_response_metadata() {
628 let handler = KafkaProtocolHandler::new();
629 let response = KafkaResponse::Metadata;
630 let correlation_id = 1;
631
632 let result = handler.serialize_response(&response, correlation_id);
633 assert!(result.is_ok());
634
635 let data = result.unwrap();
636 assert!(data.len() >= 6); }
638
639 #[test]
640 fn test_serialize_response_produce() {
641 let handler = KafkaProtocolHandler::new();
642 let response = KafkaResponse::Produce;
643 let correlation_id = 42;
644
645 let result = handler.serialize_response(&response, correlation_id);
646 assert!(result.is_ok());
647 }
648
649 #[test]
650 fn test_serialize_response_fetch() {
651 let handler = KafkaProtocolHandler::new();
652 let response = KafkaResponse::Fetch;
653 let correlation_id = 100;
654
655 let result = handler.serialize_response(&response, correlation_id);
656 assert!(result.is_ok());
657 }
658
659 #[test]
660 fn test_serialize_response_list_groups() {
661 let handler = KafkaProtocolHandler::new();
662 let response = KafkaResponse::ListGroups;
663 let correlation_id = 200;
664
665 let result = handler.serialize_response(&response, correlation_id);
666 assert!(result.is_ok());
667 }
668
669 #[test]
670 fn test_serialize_response_describe_groups() {
671 let handler = KafkaProtocolHandler::new();
672 let response = KafkaResponse::DescribeGroups;
673 let correlation_id = 300;
674
675 let result = handler.serialize_response(&response, correlation_id);
676 assert!(result.is_ok());
677 }
678
679 #[test]
680 fn test_serialize_response_delete_topics() {
681 let handler = KafkaProtocolHandler::new();
682 let response = KafkaResponse::DeleteTopics;
683 let correlation_id = 400;
684
685 let result = handler.serialize_response(&response, correlation_id);
686 assert!(result.is_ok());
687 }
688
689 #[test]
690 fn test_serialize_response_describe_configs() {
691 let handler = KafkaProtocolHandler::new();
692 let response = KafkaResponse::DescribeConfigs;
693 let correlation_id = 500;
694
695 let result = handler.serialize_response(&response, correlation_id);
696 assert!(result.is_ok());
697 }
698
699 #[test]
700 fn test_serialize_response_negative_correlation_id() {
701 let handler = KafkaProtocolHandler::new();
702 let response = KafkaResponse::ApiVersions;
703 let correlation_id = -1;
704
705 let result = handler.serialize_response(&response, correlation_id);
706 assert!(result.is_ok());
707
708 let data = result.unwrap();
709 let corr_id = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
710 assert_eq!(corr_id, -1);
711 }
712
713 #[test]
714 fn test_serialize_response_zero_correlation_id() {
715 let handler = KafkaProtocolHandler::new();
716 let response = KafkaResponse::ApiVersions;
717 let correlation_id = 0;
718
719 let result = handler.serialize_response(&response, correlation_id);
720 assert!(result.is_ok());
721
722 let data = result.unwrap();
723 let corr_id = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
724 assert_eq!(corr_id, 0);
725 }
726
727 #[test]
730 fn test_kafka_request_debug() {
731 let request = KafkaRequest {
732 api_key: 0,
733 api_version: 7,
734 correlation_id: 123,
735 client_id: "test".to_string(),
736 request_type: KafkaRequestType::Produce,
737 };
738
739 let debug_str = format!("{:?}", request);
740 assert!(debug_str.contains("KafkaRequest"));
741 assert!(debug_str.contains("api_key"));
742 }
743
744 #[test]
745 fn test_kafka_request_type_debug() {
746 let metadata = KafkaRequestType::Metadata;
747 let debug_str = format!("{:?}", metadata);
748 assert!(debug_str.contains("Metadata"));
749 }
750
751 #[test]
752 fn test_kafka_response_debug() {
753 let response = KafkaResponse::Produce;
754 let debug_str = format!("{:?}", response);
755 assert!(debug_str.contains("Produce"));
756 }
757
758 #[test]
761 fn test_api_version_ranges_complete() {
762 let handler = KafkaProtocolHandler::new();
763
764 let api_configs = vec![
766 (0, 0, 12), (1, 0, 16), (3, 0, 12), (9, 0, 5), (15, 0, 9), (16, 0, 9), (18, 0, 4), (19, 0, 7), (20, 0, 6), (32, 0, 4), (49, 0, 4), ];
778
779 for (api_key, min_ver, max_ver) in api_configs {
780 assert!(handler.is_api_version_supported(api_key, min_ver));
781 assert!(handler.is_api_version_supported(api_key, max_ver));
782 assert!(!handler.is_api_version_supported(api_key, max_ver + 1));
783 if min_ver > 0 {
784 assert!(!handler.is_api_version_supported(api_key, min_ver - 1));
785 }
786 }
787 }
788
789 #[test]
792 fn test_parse_request_large_client_id() {
793 let handler = KafkaProtocolHandler::new();
794 let client_id = "a".repeat(1000); let client_id_len = client_id.len() as i16;
796
797 let mut data = vec![0u8; 14 + client_id.len()];
798 data[4] = 0;
799 data[5] = 18;
800 data[12] = (client_id_len >> 8) as u8;
801 data[13] = (client_id_len & 0xFF) as u8;
802 data[14..].copy_from_slice(client_id.as_bytes());
803
804 let result = handler.parse_request(&data);
805 assert!(result.is_ok());
806 assert_eq!(result.unwrap().client_id, client_id);
807 }
808
809 #[test]
810 fn test_parse_request_invalid_utf8_client_id() {
811 let handler = KafkaProtocolHandler::new();
812 let mut data = vec![0u8; 17];
813 data[4] = 0;
814 data[5] = 18;
815 data[12] = 0;
816 data[13] = 3; data[14] = 0xFF;
819 data[15] = 0xFF;
820 data[16] = 0xFF;
821
822 let result = handler.parse_request(&data);
823 assert!(result.is_err());
824 }
825
826 #[test]
827 fn test_api_version_struct() {
828 let api_version = ApiVersion {
829 min_version: 0,
830 max_version: 10,
831 };
832
833 assert_eq!(api_version.min_version, 0);
834 assert_eq!(api_version.max_version, 10);
835 }
836}