1use super::statistics::*;
40use super::vint::parse_vuint;
41use crate::error::{Error, Result};
42use crate::storage::sstable::version_gate::VersionGates;
43use nom::{bytes::complete::take, number::complete::be_u32, IResult};
44
45#[allow(dead_code)]
48const METADATA_TYPE_VALIDATION: u32 = 0;
49#[allow(dead_code)]
50const METADATA_TYPE_COMPACTION: u32 = 1;
51#[allow(dead_code)]
52const METADATA_TYPE_STATS: u32 = 2;
53const METADATA_TYPE_HEADER: u32 = 3; const TIMESTAMP_EPOCH: i64 = 1_442_880_000_000_000; const DELETION_TIME_EPOCH: i64 = 1_442_880_000; const TTL_EPOCH: i64 = 0;
63
64type EncodingStatsResult = (
66 i64,
67 i64,
68 Option<i64>,
69 Vec<super::header::ColumnInfo>,
70 Vec<super::header::ColumnInfo>,
71 Vec<super::header::ColumnInfo>,
72);
73
74type SerializationHeaderResult = (Vec<String>, Vec<String>, Vec<super::header::ColumnInfo>);
76
77pub fn parse_nb_format_header(input: &[u8]) -> IResult<&[u8], StatisticsHeader> {
107 let (input, version_type) = be_u32(input)?;
108 let (input, statistics_kind) = be_u32(input)?;
109 let (input, _reserved1) = be_u32(input)?;
110 let (input, data_length) = be_u32(input)?;
111 let (input, metadata1) = be_u32(input)?;
112 let (input, metadata2) = be_u32(input)?;
113 let (input, metadata3) = be_u32(input)?;
114 let (input, checksum_or_more) = be_u32(input)?;
115
116 Ok((
117 input,
118 StatisticsHeader {
119 version: version_type,
120 statistics_kind,
121 data_length,
122 metadata1,
123 metadata2,
124 metadata3,
125 checksum: checksum_or_more,
126 table_id: None,
127 },
128 ))
129}
130
131fn parse_statistics_toc_for_header_offset(input: &[u8]) -> Option<usize> {
147 if input.len() < 8 {
148 log::debug!("Statistics.db too small for TOC: {} bytes", input.len());
149 return None;
150 }
151
152 let num_components = u32::from_be_bytes([input[0], input[1], input[2], input[3]]);
154 log::debug!("Statistics.db TOC: {} components", num_components);
155
156 if num_components > 100 {
160 log::warn!(
161 "Suspicious num_components={} in Statistics.db TOC (expected <=4)",
162 num_components
163 );
164 return None;
165 }
166
167 let toc_start: usize = 8;
171 let toc_entry_size: usize = 8; let toc_size = (num_components as usize)
175 .checked_mul(toc_entry_size)
176 .and_then(|size| size.checked_add(toc_start))?;
177
178 if input.len() < toc_size {
179 log::debug!(
180 "Statistics.db too small for {} TOC entries: {} bytes (need {})",
181 num_components,
182 input.len(),
183 toc_size
184 );
185 return None;
186 }
187
188 for i in 0..num_components as usize {
190 let entry_offset = i
192 .checked_mul(toc_entry_size)
193 .and_then(|offset| offset.checked_add(toc_start))?;
194 let component_type = u32::from_be_bytes([
195 input[entry_offset],
196 input[entry_offset + 1],
197 input[entry_offset + 2],
198 input[entry_offset + 3],
199 ]);
200 let component_offset = u32::from_be_bytes([
201 input[entry_offset + 4],
202 input[entry_offset + 5],
203 input[entry_offset + 6],
204 input[entry_offset + 7],
205 ]) as usize;
206
207 log::debug!(
208 "TOC entry {}: type={} offset=0x{:x}",
209 i,
210 component_type,
211 component_offset
212 );
213
214 if component_type == METADATA_TYPE_HEADER {
215 log::debug!(
216 "Found HEADER component at offset 0x{:x} ({})",
217 component_offset,
218 component_offset
219 );
220 return Some(component_offset);
221 }
222 }
223
224 log::debug!("HEADER component not found in Statistics.db TOC");
225 None
226}
227
228#[allow(clippy::type_complexity)]
256pub fn parse_nb_format_statistics_data(
257 input: &[u8],
258 header: &StatisticsHeader,
259 full_input: &[u8],
260 gates: Option<&VersionGates>,
265) -> Result<(
266 RowStatistics,
267 TimestampStatistics,
268 TableStatistics,
269 PartitionStatistics,
270 CompressionStatistics,
271 Vec<super::header::ColumnInfo>,
272 Vec<super::header::ColumnInfo>,
273 Vec<super::header::ColumnInfo>,
274)> {
275 let header_offset = parse_statistics_toc_for_header_offset(full_input);
277
278 let result = parse_minimal_encoding_stats(input, full_input, header_offset, gates);
280
281 match result {
282 Ok((
283 _,
284 (
285 min_timestamp,
286 min_deletion_time,
287 min_ttl,
288 partition_columns,
289 clustering_columns,
290 regular_columns,
291 ),
292 )) => {
293 let row_stats = RowStatistics {
295 total_rows: 0,
296 live_rows: 0,
297 tombstone_count: 0,
298 partition_count: 0,
299 avg_rows_per_partition: 0.0,
300 row_size_histogram: vec![],
301 };
302
303 let timestamp_stats = TimestampStatistics {
304 min_timestamp,
305 max_timestamp: min_timestamp, min_deletion_time,
307 max_deletion_time: min_deletion_time,
308 min_ttl,
309 max_ttl: min_ttl,
310 rows_with_ttl: 0,
311 };
312
313 let table_stats = TableStatistics {
314 disk_size: 0,
315 uncompressed_size: 0,
316 compressed_size: 0,
317 compression_ratio: 1.0,
318 block_count: 0,
319 avg_block_size: 0.0,
320 index_size: 0,
321 bloom_filter_size: 0,
322 level_count: 0,
323 };
324
325 let partition_stats = PartitionStatistics {
326 avg_partition_size: 0.0,
327 min_partition_size: 0,
328 max_partition_size: 0,
329 large_partition_percentage: 0.0,
330 size_histogram: vec![],
331 };
332
333 let compression_stats = CompressionStatistics {
334 algorithm: "unknown".to_string(),
335 original_size: 0,
336 compressed_size: 0,
337 ratio: 1.0,
338 compression_speed: 0.0,
339 decompression_speed: 0.0,
340 compressed_blocks: 0,
341 };
342
343 Ok((
344 row_stats,
345 timestamp_stats,
346 table_stats,
347 partition_stats,
348 compression_stats,
349 partition_columns,
350 clustering_columns,
351 regular_columns,
352 ))
353 }
354 Err(e) => {
355 log::debug!(
356 "Failed to parse minimal EncodingStats from Statistics.db: {:?}",
357 e
358 );
359 Err(Error::UnsupportedFormat(format!(
360 "Failed to parse minimal nb-format Statistics.db EncodingStats: {:?}. \
361 This is required for delta-coded timestamp decoding. \
362 Header checksum: 0x{:08x}, data_length: {}",
363 e, header.checksum, header.data_length
364 )))
365 }
366 }
367}
368
369fn parse_serialization_header(input: &[u8]) -> IResult<&[u8], SerializationHeaderResult> {
378 log::debug!(
379 "Searching for SerializationHeader in {} bytes (max search: 8KB)",
380 input.len()
381 );
382
383 let preview_len = std::cmp::min(64, input.len());
385 let preview_hex: String = input[..preview_len]
386 .iter()
387 .map(|b| format!("{:02x}", b))
388 .collect::<Vec<_>>()
389 .join(" ");
390 log::debug!(
391 "Input buffer size: {} bytes, first 64 bytes: {}",
392 input.len(),
393 preview_hex
394 );
395
396 let mut search_offset = 0;
399
400 let marshal_pattern = b"org.apache.cassandra.db.marshal";
404
405 while search_offset + marshal_pattern.len() < input.len() && search_offset < 8192 {
406 if &input[search_offset..search_offset + marshal_pattern.len()] == marshal_pattern {
407 let context_start = search_offset.saturating_sub(10);
408 let context_end = (search_offset + 50).min(input.len());
409 log::debug!(
410 "Found 'org.apache.cassandra.db.marshal' at offset {}, context (offset-10 to offset+50): {:02x?}",
411 search_offset,
412 &input[context_start..context_end]
413 );
414
415 for lookback in 1..=15 {
424 if search_offset < lookback {
425 break;
426 }
427 let type_len_offset = search_offset - lookback;
428
429 let first_byte = input[type_len_offset];
433
434 let is_valid_single_byte_len = (0x20..=0x7F).contains(&first_byte);
443
444 let is_multi_byte_vint = first_byte >= 0x80;
446
447 if is_valid_single_byte_len || is_multi_byte_vint {
448 let result = parse_serialization_header_sequential(&input[type_len_offset..]);
450 if let Ok((remaining, (pk_types, ck_types, cols))) = result {
451 if !pk_types.is_empty()
453 && pk_types[0].contains("org.apache.cassandra.db.marshal")
454 {
455 log::debug!(
456 "Successfully parsed SerializationHeader at offset {} (lookback: {}): pk_type={}",
457 type_len_offset,
458 lookback,
459 pk_types[0]
460 );
461 return Ok((remaining, (pk_types, ck_types, cols)));
462 }
463 }
464 }
465
466 if type_len_offset > 0 {
468 let prev_offset = type_len_offset - 1;
469 if input[prev_offset] == 0x00 && input[type_len_offset] == 0x00 {
470 let result = parse_serialization_header_at_offset(&input[prev_offset..]);
471 if result.is_ok() {
472 log::debug!(
473 "Successfully parsed SerializationHeader at legacy marker offset {}",
474 prev_offset
475 );
476 return result;
477 }
478 }
479 }
480 }
481 }
482 search_offset += 1;
483 }
484
485 log::debug!(
486 "Search completed: searched {} bytes, no partition key type found",
487 search_offset
488 );
489
490 log::debug!("Attempting to parse regular columns without partition key metadata");
493 let (remaining, (partition_keys, columns)) = parse_regular_columns(input)?;
494
495 if !columns.is_empty() {
496 log::debug!(
497 "Successfully parsed {} regular columns, {} partition keys via backtracking",
498 columns.len(),
499 partition_keys.len()
500 );
501 return Ok((remaining, (partition_keys, Vec::new(), columns)));
502 }
503
504 log::warn!(
506 "Failed to locate SerializationHeader or regular columns: searched {} bytes",
507 search_offset
508 );
509
510 if let Some((pk_types, ck_types, cols)) = fallback_parse_serialization_header_ascii(input) {
511 log::debug!(
512 "ASCII fallback extracted SerializationHeader: {} partition keys, {} clustering keys, {} regular columns",
513 pk_types.len(),
514 ck_types.len(),
515 cols.len()
516 );
517 return Ok((input, (pk_types, ck_types, cols)));
518 }
519
520 Ok((input, (Vec::new(), Vec::new(), Vec::new())))
521}
522
523fn parse_serialization_header_at_offset(input: &[u8]) -> IResult<&[u8], SerializationHeaderResult> {
525 use nom::bytes::complete::tag;
526 use nom::number::complete::u8 as parse_u8;
527
528 let _original_input = input;
529
530 let (input, _) = tag(b"\x00\x00")(input)?;
532 log::debug!("Found 0x00 0x00 marker");
533
534 let (input, partition_type_len) = parse_u8(input)?;
536 log::debug!("Partition key type length: {} bytes", partition_type_len);
537
538 let (input, partition_type_bytes) =
539 nom::bytes::complete::take(partition_type_len as usize)(input)?;
540 let partition_key_type = std::str::from_utf8(partition_type_bytes)
541 .map_err(|_| nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify)))?
542 .to_string();
543
544 log::debug!("Partition key type: {}", partition_key_type);
545
546 let (input, clustering_count) = parse_u8(input)?;
548 log::debug!("Clustering key count: {}", clustering_count);
549
550 let mut clustering_key_types = Vec::with_capacity(clustering_count as usize);
552 let mut input = input;
553
554 for idx in 0..clustering_count {
555 let (remaining, type_len) = parse_u8(input)?;
557 log::debug!("Clustering key {} type length: {} bytes", idx, type_len);
558
559 let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
560 let clustering_type = std::str::from_utf8(type_bytes)
561 .map_err(|_| {
562 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
563 })?
564 .to_string();
565
566 log::debug!("Clustering key {} type: {}", idx, clustering_type);
567
568 clustering_key_types.push(clustering_type);
569 input = remaining;
570 }
571
572 let (input, static_count) = parse_u8(input)?;
576 log::debug!("Static column count: {}", static_count);
577
578 let mut static_columns = Vec::with_capacity(static_count as usize);
580 let mut input = input;
581
582 for static_idx in 0..static_count {
583 let (remaining, name_len) = parse_u8(input)?;
585 log::debug!(
586 "Static column {} name length: {} bytes",
587 static_idx,
588 name_len
589 );
590
591 if name_len == 0 || name_len > 200 {
593 log::debug!(
594 "Static column {} name_len sanity check failed: {}",
595 static_idx,
596 name_len
597 );
598 return Err(nom::Err::Error(nom::error::Error::new(
599 input,
600 nom::error::ErrorKind::Verify,
601 )));
602 }
603
604 let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
606 let column_name = std::str::from_utf8(name_bytes)
607 .map_err(|_| {
608 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
609 })?
610 .to_string();
611
612 let (remaining, type_len_u64) = parse_vuint(remaining)?;
614 log::debug!(
615 "Static column {} ('{}') type length: {} bytes",
616 static_idx,
617 column_name,
618 type_len_u64
619 );
620
621 if type_len_u64 == 0 || type_len_u64 > 5000 {
623 log::debug!(
624 "Static column {} ('{}') type_len sanity check failed: {}",
625 static_idx,
626 column_name,
627 type_len_u64
628 );
629 return Err(nom::Err::Error(nom::error::Error::new(
630 input,
631 nom::error::ErrorKind::Verify,
632 )));
633 }
634 if type_len_u64 > 1000 {
635 log::warn!(
636 "Unusually long static column type string: {} bytes (typical <1000)",
637 type_len_u64
638 );
639 }
640
641 let (remaining, type_bytes) = nom::bytes::complete::take(type_len_u64 as usize)(remaining)?;
643 let internal_type = std::str::from_utf8(type_bytes)
644 .map_err(|_| {
645 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
646 })?
647 .to_string();
648
649 let cql_type = convert_marshal_type_to_cql(&internal_type);
650
651 log::debug!(
652 "Static column {}: name='{}', type='{}' (CQL: '{}')",
653 static_idx,
654 column_name,
655 internal_type,
656 cql_type
657 );
658
659 static_columns.push(super::header::ColumnInfo {
660 name: column_name,
661 column_type: cql_type,
662 is_primary_key: false,
663 key_position: None,
664 is_static: true, is_clustering: false,
666 });
667
668 input = remaining;
669 }
670
671 log::debug!("Parsed {} static columns", static_columns.len());
672
673 let (mut input, column_count) = parse_u8(input)?;
675 log::debug!("Regular column count: {}", column_count);
676
677 let mut columns = Vec::with_capacity(column_count as usize + static_columns.len());
679
680 for col_idx in 0..column_count {
681 let (remaining, name_len) = parse_u8(input)?;
683 log::debug!("Column {} name length: {} bytes", col_idx, name_len);
684
685 let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
687 let column_name = std::str::from_utf8(name_bytes)
688 .map_err(|_| {
689 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
690 })?
691 .to_string();
692
693 let (remaining, type_len_u64) = parse_vuint(remaining)?;
695 log::debug!(
696 "Column {} ('{}') type length: {} bytes",
697 col_idx,
698 column_name,
699 type_len_u64
700 );
701
702 if type_len_u64 == 0 || type_len_u64 > 5000 {
704 log::debug!(
705 "Column {} ('{}') type_len validation failed: {}",
706 col_idx,
707 column_name,
708 type_len_u64
709 );
710 return Err(nom::Err::Error(nom::error::Error::new(
711 input,
712 nom::error::ErrorKind::Verify,
713 )));
714 }
715 if type_len_u64 > 1000 {
716 log::warn!(
717 "Unusually long column type string: {} bytes (typical <1000)",
718 type_len_u64
719 );
720 }
721
722 let (remaining, type_bytes) = nom::bytes::complete::take(type_len_u64 as usize)(remaining)?;
724 let internal_type = std::str::from_utf8(type_bytes)
725 .map_err(|_| {
726 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
727 })?
728 .to_string();
729
730 input = remaining;
731
732 let cql_type = convert_marshal_type_to_cql(&internal_type);
734
735 log::debug!(
736 "Column {}: name='{}', type='{}' (CQL: '{}')",
737 col_idx,
738 column_name,
739 internal_type,
740 cql_type
741 );
742
743 columns.push(super::header::ColumnInfo {
744 name: column_name,
745 column_type: cql_type,
746 is_primary_key: false,
747 key_position: None,
748 is_static: false,
749 is_clustering: false,
750 });
751 }
752
753 let mut all_columns = static_columns;
756 all_columns.append(&mut columns);
757
758 log::debug!(
759 "Successfully parsed SerializationHeader: {} partition keys, {} clustering keys, {} static columns, {} regular columns ({} total)",
760 1, clustering_key_types.len(),
762 all_columns.iter().filter(|c| c.is_static).count(),
763 all_columns.iter().filter(|c| !c.is_static).count(),
764 all_columns.len()
765 );
766
767 Ok((
768 input,
769 (vec![partition_key_type], clustering_key_types, all_columns),
770 ))
771}
772
773fn extract_partition_key_before_marker(input: &[u8], marker_offset: usize) -> Option<String> {
779 if marker_offset < 3 {
780 return None;
781 }
782
783 log::debug!(
784 "Backtracking from marker at offset {} (input len: {})",
785 marker_offset,
786 input.len()
787 );
788
789 let max_lookback = 210;
793 let search_start = marker_offset.saturating_sub(max_lookback);
794 log::debug!(
795 "Searching for VInt from offset {} to {} ({} positions)",
796 search_start,
797 marker_offset,
798 marker_offset - search_start
799 );
800
801 for vint_start in (search_start..marker_offset).rev() {
802 match parse_vuint(&input[vint_start..marker_offset]) {
804 Ok((remaining, type_len)) => {
805 if !(10..200).contains(&type_len) {
807 continue;
808 }
809
810 let vint_len = marker_offset - vint_start - remaining.len();
812 let type_start = vint_start + vint_len;
813
814 let type_len_usize = type_len as usize;
816 if type_start > input.len() || type_len_usize > input.len() - type_start {
817 continue;
818 }
819
820 let type_end = type_start + type_len_usize;
821
822 if type_end == marker_offset {
827 if let Ok(type_str) = std::str::from_utf8(&input[type_start..type_end]) {
828 log::debug!(
829 "Candidate at vint_start={}: type_len={}, type_start={}, type_end={}, str={}",
830 vint_start, type_len, type_start, type_end, type_str
831 );
832 if type_str.contains("org.apache.cassandra") {
836 log::debug!(
837 "Found partition key type at offset {}: length={}, type={}",
838 vint_start,
839 type_len,
840 type_str
841 );
842 return Some(type_str.to_string());
843 } else {
844 log::debug!(
845 "Rejected candidate (starts_with='(': {}, contains 'org.apache.cassandra': {})",
846 type_str.starts_with('('),
847 type_str.contains("org.apache.cassandra")
848 );
849 }
850 } else {
851 log::debug!(
852 "Rejected candidate at vint_start={}: not valid UTF-8",
853 vint_start
854 );
855 }
856 }
857 }
858 Err(_) => continue, }
860 }
861
862 None
863}
864
865fn parse_regular_columns(
870 input: &[u8],
871) -> IResult<&[u8], (Vec<String>, Vec<super::header::ColumnInfo>)> {
872 use super::header::ColumnInfo;
873
874 let mut search_offset = 0;
875 let mut partition_key_types = Vec::new();
876
877 while search_offset + 2 < input.len() && search_offset < 8192 {
878 if input[search_offset] == 0x00 {
879 let (marker_offset, count_offset) =
880 if search_offset + 1 < input.len() && input[search_offset + 1] == 0x00 {
881 (search_offset, search_offset + 2)
882 } else {
883 (search_offset, search_offset + 1)
884 };
885
886 if count_offset >= input.len() {
887 break;
888 }
889
890 let column_count = input[count_offset] as usize;
891 if column_count == 0 || column_count > 50 {
892 search_offset += 1;
893 continue;
894 }
895
896 log::debug!(
897 "Attempting to extract partition key by backtracking from marker at offset {}",
898 marker_offset
899 );
900 if let Some(pk_type) = extract_partition_key_before_marker(input, marker_offset) {
901 log::debug!("Found partition key type before marker: {}", pk_type);
902 partition_key_types.push(pk_type);
903 } else {
904 log::debug!(
905 "No partition key type found via backtracking at offset {}",
906 marker_offset
907 );
908 }
909
910 let mut pos = count_offset + 1;
911
912 let context_len = std::cmp::min(128, input.len() - marker_offset);
913 let context_hex: String = input[marker_offset..marker_offset + context_len]
914 .iter()
915 .map(|b| format!("{:02x}", b))
916 .collect::<Vec<_>>()
917 .join(" ");
918 log::debug!(
919 "Pattern found at offset {}: count={}, next 128 bytes: {}",
920 marker_offset,
921 column_count,
922 context_hex
923 );
924
925 let mut parsed_columns = Vec::with_capacity(column_count);
927 let mut parse_success = true;
928
929 for col_idx in 0..column_count {
930 if pos >= input.len() {
931 log::debug!(
932 "Column {} parsing failed at offset {}: position {} exceeds buffer length {}",
933 col_idx,
934 marker_offset,
935 pos,
936 input.len()
937 );
938 parse_success = false;
939 break;
940 }
941
942 if pos >= input.len() {
943 log::debug!(
944 "Column {} parsing failed at offset {}: no data available for name length byte (pos={}, len={})",
945 col_idx,
946 marker_offset,
947 pos,
948 input.len()
949 );
950 parse_success = false;
951 break;
952 }
953
954 let name_len = input[pos] as usize;
955 pos += 1;
956
957 if name_len == 0 || name_len > 200 || pos + name_len > input.len() {
958 log::debug!(
959 "Column {} parsing failed at offset {}: name_len sanity check failed (name_len={}, pos={}, buffer_len={})",
960 col_idx,
961 marker_offset,
962 name_len,
963 pos,
964 input.len()
965 );
966 parse_success = false;
967 break;
968 }
969
970 let name_bytes = &input[pos..pos + name_len];
972 let column_name = match std::str::from_utf8(name_bytes) {
973 Ok(s) => s.to_string(),
974 Err(e) => {
975 let name_hex: String = name_bytes
976 .iter()
977 .map(|b| format!("{:02x}", b))
978 .collect::<Vec<_>>()
979 .join(" ");
980 log::debug!(
981 "Column {} parsing failed at offset {}: UTF-8 decode error for column name at pos {} (len={}): {:?}, bytes: {}",
982 col_idx,
983 marker_offset,
984 pos,
985 name_len,
986 e,
987 name_hex
988 );
989 parse_success = false;
990 break;
991 }
992 };
993 pos += name_len;
994
995 if pos >= input.len() {
996 log::debug!(
997 "Column {} ('{}') parsing failed at offset {}: no data available for type length byte (pos={}, len={})",
998 col_idx,
999 column_name,
1000 marker_offset,
1001 pos,
1002 input.len()
1003 );
1004 parse_success = false;
1005 break;
1006 }
1007
1008 let type_len_result = parse_vuint(&input[pos..]);
1010 let (type_remaining, type_len_u64) = match type_len_result {
1011 Ok(r) => r,
1012 Err(_) => {
1013 log::debug!(
1014 "Column {} ('{}') parsing failed at offset {}: VInt parse error at pos {}",
1015 col_idx,
1016 column_name,
1017 marker_offset,
1018 pos
1019 );
1020 parse_success = false;
1021 break;
1022 }
1023 };
1024 let type_len = type_len_u64 as usize;
1025 pos = input.len() - type_remaining.len();
1026
1027 if type_len == 0 || type_len > 5000 || pos + type_len > input.len() {
1028 log::debug!(
1029 "Column {} ('{}') parsing failed at offset {}: type_len sanity check failed (type_len={}, pos={}, buffer_len={})",
1030 col_idx,
1031 column_name,
1032 marker_offset,
1033 type_len,
1034 pos,
1035 input.len()
1036 );
1037 parse_success = false;
1038 break;
1039 }
1040
1041 let type_bytes = &input[pos..pos + type_len];
1043 let internal_type = match std::str::from_utf8(type_bytes) {
1044 Ok(s) => s.to_string(),
1045 Err(e) => {
1046 let type_hex: String = type_bytes
1047 .iter()
1048 .map(|b| format!("{:02x}", b))
1049 .collect::<Vec<_>>()
1050 .join(" ");
1051 log::debug!(
1052 "Column {} ('{}') parsing failed at offset {}: UTF-8 decode error for column type at pos {} (len={}): {:?}, bytes: {}",
1053 col_idx,
1054 column_name,
1055 marker_offset,
1056 pos,
1057 type_len,
1058 e,
1059 type_hex
1060 );
1061 parse_success = false;
1062 break;
1063 }
1064 };
1065 pos += type_len;
1066
1067 let cql_type = convert_marshal_type_to_cql(&internal_type);
1069
1070 parsed_columns.push(ColumnInfo {
1071 name: column_name,
1072 column_type: cql_type,
1073 is_primary_key: false, key_position: None,
1075 is_static: false,
1076 is_clustering: false,
1077 });
1078 }
1079
1080 if parse_success && parsed_columns.len() == column_count {
1081 let column_names: Vec<&str> =
1083 parsed_columns.iter().map(|c| c.name.as_str()).collect();
1084 log::debug!(
1085 "Successfully parsed {} columns at offset {}: {:?}",
1086 parsed_columns.len(),
1087 marker_offset,
1088 column_names
1089 );
1090 if !partition_key_types.is_empty() {
1091 log::debug!(
1092 "Extracted {} partition key types via backtracking: {:?}",
1093 partition_key_types.len(),
1094 partition_key_types
1095 );
1096 }
1097
1098 let remaining = &input[pos..];
1099 return Ok((remaining, (partition_key_types, parsed_columns)));
1100 }
1101 }
1102
1103 search_offset += 1;
1104 }
1105
1106 log::debug!(
1108 "Regular column section not found: searched {} bytes",
1109 search_offset
1110 );
1111 Ok((input, (Vec::new(), Vec::new())))
1112}
1113
1114fn fallback_parse_serialization_header_ascii(
1116 input: &[u8],
1117) -> Option<(Vec<String>, Vec<String>, Vec<super::header::ColumnInfo>)> {
1118 use super::header::ColumnInfo;
1119
1120 fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
1122 haystack
1123 .windows(needle.len())
1124 .position(|window| window == needle)
1125 }
1126
1127 let mut partition_types = Vec::new();
1128 let mut clustering_types = Vec::new();
1129 let mut columns = Vec::new();
1130
1131 if let Some(comp_idx) = find_subsequence(input, b"CompositeType(") {
1133 let start = comp_idx + "CompositeType(".len();
1134 let mut end = start;
1135 while end < input.len() && input[end] != b')' {
1136 end += 1;
1137 }
1138 if end <= input.len() {
1139 if let Ok(inner) = std::str::from_utf8(&input[start..end]) {
1140 partition_types = inner
1141 .split(',')
1142 .map(|s| s.trim().to_string())
1143 .filter(|s| !s.is_empty())
1144 .collect();
1145 }
1146
1147 let mut cursor = end + 1;
1149 while cursor < input.len() && input[cursor] < 0x20 {
1150 cursor += 1;
1151 }
1152 if cursor < input.len() && input[cursor] == b'(' {
1153 cursor += 1;
1154 let mut cluster_end = cursor;
1155 while cluster_end < input.len() && input[cluster_end] >= 0x20 {
1156 cluster_end += 1;
1157 }
1158 if cluster_end > cursor {
1159 if let Ok(cluster_str) = std::str::from_utf8(&input[cursor..cluster_end]) {
1160 if cluster_str.contains("org.apache.cassandra.db.marshal") {
1161 clustering_types = cluster_str
1162 .split(',')
1163 .map(|s| s.trim().to_string())
1164 .filter(|s| !s.is_empty())
1165 .collect();
1166 }
1167 }
1168 }
1169 let mut scan_start = cluster_end;
1171 while scan_start < input.len() && input[scan_start] < 0x20 {
1172 scan_start += 1;
1173 }
1174
1175 let mut idx = scan_start;
1177 while idx < input.len() {
1178 let name_len = input[idx] as usize;
1179 if name_len == 0 || name_len > 64 {
1180 idx += 1;
1181 continue;
1182 }
1183
1184 let name_start = idx + 1;
1185 let name_end = name_start + name_len;
1186 if name_end > input.len() {
1187 break;
1188 }
1189
1190 let name_bytes = &input[name_start..name_end];
1191 if !name_bytes
1192 .iter()
1193 .all(|b| b.is_ascii_alphanumeric() || *b == b'_')
1194 {
1195 idx += 1;
1196 continue;
1197 }
1198
1199 if name_end >= input.len() || input[name_end] != b'(' {
1200 idx += 1;
1201 continue;
1202 }
1203
1204 let type_start = name_end + 1;
1205 let mut type_end = type_start;
1206 while type_end < input.len() && input[type_end] >= 0x20 {
1207 type_end += 1;
1208 }
1209
1210 if type_end == type_start {
1211 idx += 1;
1212 continue;
1213 }
1214
1215 let type_bytes = &input[type_start..type_end];
1216 if !type_bytes.windows(10).any(|w| w == b"org.apach") {
1217 idx += 1;
1218 continue;
1219 }
1220
1221 let column_name = match std::str::from_utf8(name_bytes) {
1222 Ok(s) => s.to_string(),
1223 Err(_) => {
1224 idx += 1;
1225 continue;
1226 }
1227 };
1228
1229 let internal_type = match std::str::from_utf8(type_bytes) {
1230 Ok(s) => s.trim().to_string(),
1231 Err(_) => {
1232 idx += 1;
1233 continue;
1234 }
1235 };
1236
1237 let cql_type = convert_marshal_type_to_cql(&internal_type);
1238 columns.push(ColumnInfo {
1239 name: column_name,
1240 column_type: cql_type,
1241 is_primary_key: false,
1242 key_position: None,
1243 is_static: false,
1244 is_clustering: false,
1245 });
1246
1247 idx = type_end;
1249 while idx < input.len() && input[idx] < 0x20 {
1250 idx += 1;
1251 }
1252 }
1253 }
1254 }
1255 }
1256
1257 if partition_types.is_empty() && columns.is_empty() {
1258 return None;
1259 }
1260
1261 Some((partition_types, clustering_types, columns))
1262}
1263
1264fn extract_inner_type(type_with_close_paren: &str) -> Option<&str> {
1272 let mut depth = 1; for (idx, ch) in type_with_close_paren.char_indices() {
1274 match ch {
1275 '(' => depth += 1,
1276 ')' => {
1277 depth -= 1;
1278 if depth == 0 {
1279 if idx == 0 {
1281 return None;
1282 }
1283 return Some(&type_with_close_paren[..idx]);
1284 }
1285 }
1286 _ => {}
1287 }
1288 }
1289 None }
1291
1292fn split_type_arguments(input: &str) -> Vec<&str> {
1294 let mut args = Vec::new();
1295 let mut depth = 0;
1296 let mut start = 0;
1297 for (idx, ch) in input.char_indices() {
1298 match ch {
1299 '(' => depth += 1,
1300 ')' => {
1301 if depth > 0 {
1302 depth -= 1;
1303 } else {
1304 log::warn!(
1305 "Unmatched closing parenthesis at position {} in type arguments: '{}'",
1306 idx,
1307 input
1308 );
1309 }
1310 }
1311 ',' if depth == 0 => {
1312 let part = input[start..idx].trim();
1313 if !part.is_empty() {
1314 args.push(part);
1315 }
1316 start = idx + ch.len_utf8();
1317 }
1318 _ => {}
1319 }
1320 }
1321
1322 let tail = input[start..].trim();
1323 if !tail.is_empty() {
1324 args.push(tail);
1325 }
1326
1327 args
1328}
1329
1330fn convert_marshal_type_to_cql(marshal_type: &str) -> String {
1332 fn strip_wrapping_parens(mut value: &str) -> &str {
1333 loop {
1334 let trimmed = value.trim();
1335 if trimmed.starts_with('(') && trimmed.ends_with(')') && trimmed.len() > 2 {
1336 value = &trimmed[1..trimmed.len() - 1];
1337 } else {
1338 return trimmed;
1339 }
1340 }
1341 }
1342
1343 fn strip_namespace(type_name: &str) -> &str {
1344 type_name.rsplit('.').next().unwrap_or(type_name)
1345 }
1346
1347 fn strip_type_suffix(name: &str) -> &str {
1348 name.trim_end_matches("Type")
1349 }
1350
1351 let mut cleaned = strip_wrapping_parens(marshal_type);
1352
1353 if cleaned.contains("org.apache.cassandra.db.marshal.UserType(") {
1357 return marshal_type.to_string();
1358 }
1359
1360 for prefix in [
1363 "org.apache.cassandra.db.marshal.ReversedType(",
1364 "ReversedType(",
1365 ] {
1366 if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
1367 if let Some(inner) = extract_inner_type(params_with_close) {
1368 return convert_marshal_type_to_cql(inner);
1369 }
1370 }
1371 }
1372
1373 for prefix in ["org.apache.cassandra.db.marshal.FrozenType(", "FrozenType("] {
1374 if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
1375 if let Some(inner) = extract_inner_type(params_with_close) {
1376 return format!("frozen<{}>", convert_marshal_type_to_cql(inner));
1377 }
1378 }
1379 }
1380
1381 for prefix in ["org.apache.cassandra.db.marshal.ListType(", "ListType("] {
1382 if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
1383 if let Some(inner) = extract_inner_type(params_with_close) {
1384 return format!("list<{}>", convert_marshal_type_to_cql(inner));
1385 }
1386 }
1387 }
1388
1389 for prefix in ["org.apache.cassandra.db.marshal.SetType(", "SetType("] {
1390 if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
1391 if let Some(inner) = extract_inner_type(params_with_close) {
1392 return format!("set<{}>", convert_marshal_type_to_cql(inner));
1393 }
1394 }
1395 }
1396
1397 for prefix in ["org.apache.cassandra.db.marshal.MapType(", "MapType("] {
1398 if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
1399 if let Some(inner) = extract_inner_type(params_with_close) {
1400 let args = split_type_arguments(inner);
1401 if args.len() == 2 {
1402 let key = convert_marshal_type_to_cql(args[0]);
1403 let value = convert_marshal_type_to_cql(args[1]);
1404 return format!("map<{}, {}>", key, value);
1405 } else if args.len() == 1 {
1406 let value = convert_marshal_type_to_cql(args[0]);
1407 return format!("map<text, {}>", value);
1408 }
1409 }
1410 }
1411 }
1412
1413 cleaned = strip_wrapping_parens(cleaned);
1414 let base = strip_type_suffix(strip_namespace(cleaned)).trim_end_matches(')');
1415
1416 match base {
1418 "UTF8" => "text".to_string(),
1419 "Int32" => "int".to_string(),
1420 "Integer" => "int".to_string(),
1421 "Long" => "bigint".to_string(),
1422 "Short" => "smallint".to_string(),
1423 "Byte" => "tinyint".to_string(),
1424 "SimpleDate" => "date".to_string(),
1425 "Timestamp" => "timestamp".to_string(),
1426 "Boolean" => "boolean".to_string(),
1427 "Decimal" => "decimal".to_string(),
1428 "Float" => "float".to_string(),
1429 "Double" => "double".to_string(),
1430 "Bytes" => "blob".to_string(),
1431 "Ascii" => "ascii".to_string(),
1432 "InetAddress" => "inet".to_string(),
1433 "UUID" => "uuid".to_string(),
1434 "TimeUUID" => "timeuuid".to_string(),
1435 "Duration" => "duration".to_string(),
1436 "Time" => "time".to_string(),
1437 "Counter" | "CounterColumn" => "counter".to_string(),
1438 other => other.to_lowercase(),
1439 }
1440}
1441
1442fn build_partition_key_columns(partition_types: &[String]) -> Vec<super::header::ColumnInfo> {
1444 if partition_types.is_empty() {
1445 return Vec::new();
1446 }
1447
1448 let total = partition_types.len();
1449 partition_types
1450 .iter()
1451 .enumerate()
1452 .map(|(idx, marshal_type)| {
1453 let cql_type = convert_marshal_type_to_cql(marshal_type);
1454 let name = if total == 1 {
1455 match cql_type.as_str() {
1456 "uuid" | "timeuuid" => "id".to_string(),
1457 _ => "partition_key".to_string(),
1458 }
1459 } else {
1460 format!("partition_key_{}", idx)
1461 };
1462
1463 super::header::ColumnInfo {
1464 name,
1465 column_type: cql_type,
1466 is_primary_key: true,
1467 key_position: Some(idx as u16),
1468 is_static: false,
1469 is_clustering: false,
1470 }
1471 })
1472 .collect()
1473}
1474
1475fn build_clustering_key_columns(clustering_types: &[String]) -> Vec<super::header::ColumnInfo> {
1477 if clustering_types.is_empty() {
1478 return Vec::new();
1479 }
1480
1481 let total = clustering_types.len();
1482 clustering_types
1483 .iter()
1484 .enumerate()
1485 .map(|(idx, marshal_type)| {
1486 let cql_type = convert_marshal_type_to_cql(marshal_type);
1487 let name = if total == 1 {
1488 "clustering_key".to_string()
1489 } else {
1490 format!("clustering_key_{}", idx)
1491 };
1492
1493 super::header::ColumnInfo {
1494 name,
1495 column_type: cql_type,
1496 is_primary_key: true,
1497 key_position: Some(idx as u16),
1498 is_static: false,
1499 is_clustering: true,
1500 }
1501 })
1502 .collect()
1503}
1504
1505fn parse_serialization_header_sequential(
1516 input: &[u8],
1517) -> IResult<&[u8], SerializationHeaderResult> {
1518 let (input, pk_type_len) = parse_vuint(input)?;
1520
1521 if pk_type_len == 0 || pk_type_len > 5000 {
1523 log::debug!(
1524 "Invalid partition key type length: {} (expected 1-2000)",
1525 pk_type_len
1526 );
1527 return Err(nom::Err::Error(nom::error::Error::new(
1528 input,
1529 nom::error::ErrorKind::Verify,
1530 )));
1531 }
1532
1533 let (input, pk_type_bytes) = nom::bytes::complete::take(pk_type_len as usize)(input)?;
1534 let partition_key_type = std::str::from_utf8(pk_type_bytes)
1535 .map_err(|_| nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify)))?
1536 .to_string();
1537
1538 log::debug!(
1539 "Sequential parser: partition key type (len={}): {}",
1540 pk_type_len,
1541 partition_key_type
1542 );
1543
1544 let (input, clustering_count) = parse_vuint(input)?;
1546 let clustering_count = clustering_count as usize;
1547
1548 if clustering_count > 100 {
1549 log::debug!(
1550 "Invalid clustering key count: {} (expected 0-100)",
1551 clustering_count
1552 );
1553 return Err(nom::Err::Error(nom::error::Error::new(
1554 input,
1555 nom::error::ErrorKind::Verify,
1556 )));
1557 }
1558
1559 log::debug!(
1560 "Sequential parser: clustering key count: {}",
1561 clustering_count
1562 );
1563
1564 let mut clustering_key_types = Vec::with_capacity(clustering_count);
1565 let mut input = input;
1566
1567 for idx in 0..clustering_count {
1568 let (remaining, type_len) = parse_vuint(input)?;
1569
1570 if type_len == 0 || type_len > 5000 {
1571 log::debug!("Invalid clustering key {} type length: {}", idx, type_len);
1572 return Err(nom::Err::Error(nom::error::Error::new(
1573 input,
1574 nom::error::ErrorKind::Verify,
1575 )));
1576 }
1577
1578 let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
1579 let clustering_type = std::str::from_utf8(type_bytes)
1580 .map_err(|_| {
1581 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
1582 })?
1583 .to_string();
1584
1585 log::debug!(
1586 "Sequential parser: clustering key {} type (len={}): {}",
1587 idx,
1588 type_len,
1589 clustering_type
1590 );
1591
1592 clustering_key_types.push(clustering_type);
1593 input = remaining;
1594 }
1595
1596 let (input, static_count) = parse_vuint(input)?;
1598 let static_count = static_count as usize;
1599
1600 if static_count > 200 {
1601 log::debug!(
1602 "Invalid static column count: {} (expected 0-200)",
1603 static_count
1604 );
1605 return Err(nom::Err::Error(nom::error::Error::new(
1606 input,
1607 nom::error::ErrorKind::Verify,
1608 )));
1609 }
1610
1611 log::debug!("Sequential parser: static column count: {}", static_count);
1612
1613 let mut static_columns = Vec::with_capacity(static_count);
1614 let mut input = input;
1615
1616 for idx in 0..static_count {
1617 let (remaining, name_len) = parse_vuint(input)?;
1619
1620 if name_len == 0 || name_len > 200 {
1621 log::debug!("Invalid static column {} name length: {}", idx, name_len);
1622 return Err(nom::Err::Error(nom::error::Error::new(
1623 input,
1624 nom::error::ErrorKind::Verify,
1625 )));
1626 }
1627
1628 let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
1629 let column_name = std::str::from_utf8(name_bytes)
1630 .map_err(|_| {
1631 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
1632 })?
1633 .to_string();
1634
1635 let (remaining, type_len) = parse_vuint(remaining)?;
1637
1638 if type_len == 0 || type_len > 5000 {
1639 log::debug!(
1640 "Invalid static column '{}' type length: {}",
1641 column_name,
1642 type_len
1643 );
1644 return Err(nom::Err::Error(nom::error::Error::new(
1645 input,
1646 nom::error::ErrorKind::Verify,
1647 )));
1648 }
1649
1650 let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
1651 let internal_type = std::str::from_utf8(type_bytes)
1652 .map_err(|_| {
1653 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
1654 })?
1655 .to_string();
1656
1657 let cql_type = convert_marshal_type_to_cql(&internal_type);
1658
1659 log::debug!(
1660 "Sequential parser: static column {}: name='{}', type='{}'",
1661 idx,
1662 column_name,
1663 cql_type
1664 );
1665
1666 static_columns.push(super::header::ColumnInfo {
1667 name: column_name,
1668 column_type: cql_type,
1669 is_primary_key: false,
1670 key_position: None,
1671 is_static: true,
1672 is_clustering: false,
1673 });
1674
1675 input = remaining;
1676 }
1677
1678 let (input, regular_count) = parse_vuint(input)?;
1680 let regular_count = regular_count as usize;
1681
1682 if regular_count > 500 {
1683 log::debug!(
1684 "Invalid regular column count: {} (expected 0-500)",
1685 regular_count
1686 );
1687 return Err(nom::Err::Error(nom::error::Error::new(
1688 input,
1689 nom::error::ErrorKind::Verify,
1690 )));
1691 }
1692
1693 log::debug!("Sequential parser: regular column count: {}", regular_count);
1694
1695 let mut regular_columns = Vec::with_capacity(regular_count);
1696 let mut input = input;
1697
1698 for idx in 0..regular_count {
1699 let (remaining, name_len) = parse_vuint(input)?;
1701
1702 if name_len == 0 || name_len > 200 {
1703 log::debug!("Invalid regular column {} name length: {}", idx, name_len);
1704 return Err(nom::Err::Error(nom::error::Error::new(
1705 input,
1706 nom::error::ErrorKind::Verify,
1707 )));
1708 }
1709
1710 let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
1711 let column_name = std::str::from_utf8(name_bytes)
1712 .map_err(|_| {
1713 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
1714 })?
1715 .to_string();
1716
1717 let (remaining, type_len) = parse_vuint(remaining)?;
1719
1720 if type_len == 0 || type_len > 5000 {
1721 log::debug!(
1722 "Invalid regular column '{}' type length: {}",
1723 column_name,
1724 type_len
1725 );
1726 return Err(nom::Err::Error(nom::error::Error::new(
1727 input,
1728 nom::error::ErrorKind::Verify,
1729 )));
1730 }
1731
1732 let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
1733 let internal_type = std::str::from_utf8(type_bytes)
1734 .map_err(|_| {
1735 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
1736 })?
1737 .to_string();
1738
1739 let cql_type = convert_marshal_type_to_cql(&internal_type);
1740
1741 log::debug!(
1742 "Sequential parser: regular column {}: name='{}', type='{}'",
1743 idx,
1744 column_name,
1745 cql_type
1746 );
1747
1748 regular_columns.push(super::header::ColumnInfo {
1749 name: column_name,
1750 column_type: cql_type,
1751 is_primary_key: false,
1752 key_position: None,
1753 is_static: false,
1754 is_clustering: false,
1755 });
1756
1757 input = remaining;
1758 }
1759
1760 let mut all_columns = static_columns;
1762 all_columns.extend(regular_columns);
1763
1764 log::debug!(
1765 "Sequential parser complete: partition_key='{}', {} clustering keys, {} total columns",
1766 partition_key_type,
1767 clustering_key_types.len(),
1768 all_columns.len()
1769 );
1770
1771 Ok((
1772 input,
1773 (vec![partition_key_type], clustering_key_types, all_columns),
1774 ))
1775}
1776
1777fn parse_serialization_header_schema(input: &[u8]) -> IResult<&[u8], SerializationHeaderResult> {
1785 let (input, pk_type_len) = parse_vuint(input)?;
1787 if pk_type_len == 0 || pk_type_len > 5000 {
1788 log::debug!("Invalid pk_type_len: {}", pk_type_len);
1789 return Err(nom::Err::Error(nom::error::Error::new(
1790 input,
1791 nom::error::ErrorKind::Verify,
1792 )));
1793 }
1794 if pk_type_len > 1000 {
1795 log::warn!(
1796 "Unusually long partition key type string: {} bytes (typical <1000)",
1797 pk_type_len
1798 );
1799 }
1800
1801 let (input, pk_type_bytes) = take(pk_type_len as usize)(input)?;
1802 let partition_key_type = match std::str::from_utf8(pk_type_bytes) {
1803 Ok(s) => convert_marshal_type_to_cql(s),
1804 Err(_) => {
1805 log::debug!("Invalid UTF-8 in partition key type");
1806 return Err(nom::Err::Error(nom::error::Error::new(
1807 input,
1808 nom::error::ErrorKind::Verify,
1809 )));
1810 }
1811 };
1812
1813 log::debug!(
1814 "HEADER: Partition key type: {} ({} bytes)",
1815 partition_key_type,
1816 pk_type_len
1817 );
1818
1819 let (input, clustering_count) = parse_vuint(input)?;
1821 if clustering_count > 1000 {
1823 log::warn!(
1824 "Suspicious clustering_count={} in SerializationHeader (expected <100)",
1825 clustering_count
1826 );
1827 return Err(nom::Err::Error(nom::error::Error::new(
1828 input,
1829 nom::error::ErrorKind::Verify,
1830 )));
1831 }
1832 log::debug!("HEADER: {} clustering key types", clustering_count);
1833
1834 let mut input = input;
1835 let mut clustering_key_types = Vec::with_capacity(clustering_count as usize);
1836
1837 for i in 0..clustering_count {
1838 let (remaining, ck_type_len) = parse_vuint(input)?;
1839 if ck_type_len == 0 || ck_type_len > 5000 {
1840 log::debug!("Invalid clustering key type length: {}", ck_type_len);
1841 return Err(nom::Err::Error(nom::error::Error::new(
1842 input,
1843 nom::error::ErrorKind::Verify,
1844 )));
1845 }
1846 if ck_type_len > 1000 {
1847 log::warn!(
1848 "Unusually long clustering key type string: {} bytes (typical <1000)",
1849 ck_type_len
1850 );
1851 }
1852
1853 let (remaining, ck_type_bytes) = take(ck_type_len as usize)(remaining)?;
1854 let ck_type = match std::str::from_utf8(ck_type_bytes) {
1855 Ok(s) => convert_marshal_type_to_cql(s),
1856 Err(_) => {
1857 log::debug!("Invalid UTF-8 in clustering key type {}", i);
1858 return Err(nom::Err::Error(nom::error::Error::new(
1859 input,
1860 nom::error::ErrorKind::Verify,
1861 )));
1862 }
1863 };
1864
1865 log::debug!(
1866 "HEADER: Clustering key {}: {} ({} bytes)",
1867 i,
1868 ck_type,
1869 ck_type_len
1870 );
1871 clustering_key_types.push(ck_type);
1872 input = remaining;
1873 }
1874
1875 let (input, static_count) = parse_vuint(input)?;
1877 if static_count > 10000 {
1879 log::warn!(
1880 "Suspicious static_count={} in SerializationHeader (expected <1000)",
1881 static_count
1882 );
1883 return Err(nom::Err::Error(nom::error::Error::new(
1884 input,
1885 nom::error::ErrorKind::Verify,
1886 )));
1887 }
1888 log::debug!("HEADER: {} static columns", static_count);
1889
1890 let mut input = input;
1891 let mut static_columns = Vec::with_capacity(static_count as usize);
1892
1893 for i in 0..static_count {
1894 let (remaining, name_len) = parse_vuint(input)?;
1896 if name_len == 0 || name_len > 200 {
1897 log::debug!("Invalid static column name length: {}", name_len);
1898 return Err(nom::Err::Error(nom::error::Error::new(
1899 input,
1900 nom::error::ErrorKind::Verify,
1901 )));
1902 }
1903
1904 let (remaining, name_bytes) = take(name_len as usize)(remaining)?;
1905 let column_name = match std::str::from_utf8(name_bytes) {
1906 Ok(s) => s.to_string(),
1907 Err(_) => {
1908 log::debug!("Invalid UTF-8 in static column name {}", i);
1909 return Err(nom::Err::Error(nom::error::Error::new(
1910 input,
1911 nom::error::ErrorKind::Verify,
1912 )));
1913 }
1914 };
1915
1916 let (remaining, type_len) = parse_vuint(remaining)?;
1918 if type_len == 0 || type_len > 5000 {
1919 log::debug!("Invalid static column type length: {}", type_len);
1920 return Err(nom::Err::Error(nom::error::Error::new(
1921 input,
1922 nom::error::ErrorKind::Verify,
1923 )));
1924 }
1925 if type_len > 1000 {
1926 log::warn!(
1927 "Unusually long static column type string: {} bytes (typical <1000)",
1928 type_len
1929 );
1930 }
1931
1932 let (remaining, type_bytes) = take(type_len as usize)(remaining)?;
1933 let cql_type = match std::str::from_utf8(type_bytes) {
1934 Ok(s) => convert_marshal_type_to_cql(s),
1935 Err(_) => {
1936 log::debug!("Invalid UTF-8 in static column type {}", i);
1937 return Err(nom::Err::Error(nom::error::Error::new(
1938 input,
1939 nom::error::ErrorKind::Verify,
1940 )));
1941 }
1942 };
1943
1944 log::debug!(
1945 "HEADER: Static column '{}': {} ({} bytes)",
1946 column_name,
1947 cql_type,
1948 type_len
1949 );
1950
1951 static_columns.push(super::header::ColumnInfo {
1952 name: column_name,
1953 column_type: cql_type,
1954 is_primary_key: false,
1955 key_position: None,
1956 is_static: true,
1957 is_clustering: false,
1958 });
1959
1960 input = remaining;
1961 }
1962
1963 let (input, regular_count) = parse_vuint(input)?;
1965 if regular_count > 10000 {
1967 log::warn!(
1968 "Suspicious regular_count={} in SerializationHeader (expected <1000)",
1969 regular_count
1970 );
1971 return Err(nom::Err::Error(nom::error::Error::new(
1972 input,
1973 nom::error::ErrorKind::Verify,
1974 )));
1975 }
1976 log::debug!("HEADER: {} regular columns", regular_count);
1977
1978 let mut input = input;
1979 let mut regular_columns = Vec::with_capacity(regular_count as usize);
1980
1981 for i in 0..regular_count {
1982 let (remaining, name_len) = parse_vuint(input)?;
1984 if name_len == 0 || name_len > 200 {
1985 log::debug!("Invalid regular column name length: {}", name_len);
1986 return Err(nom::Err::Error(nom::error::Error::new(
1987 input,
1988 nom::error::ErrorKind::Verify,
1989 )));
1990 }
1991
1992 let (remaining, name_bytes) = take(name_len as usize)(remaining)?;
1993 let column_name = match std::str::from_utf8(name_bytes) {
1994 Ok(s) => s.to_string(),
1995 Err(_) => {
1996 log::debug!("Invalid UTF-8 in regular column name {}", i);
1997 return Err(nom::Err::Error(nom::error::Error::new(
1998 input,
1999 nom::error::ErrorKind::Verify,
2000 )));
2001 }
2002 };
2003
2004 let (remaining, type_len) = parse_vuint(remaining)?;
2006 if type_len == 0 || type_len > 5000 {
2007 log::debug!("Invalid regular column type length: {}", type_len);
2008 return Err(nom::Err::Error(nom::error::Error::new(
2009 input,
2010 nom::error::ErrorKind::Verify,
2011 )));
2012 }
2013 if type_len > 1000 {
2014 log::warn!(
2015 "Unusually long regular column type string: {} bytes (typical <1000)",
2016 type_len
2017 );
2018 }
2019
2020 let (remaining, type_bytes) = take(type_len as usize)(remaining)?;
2021 let cql_type = match std::str::from_utf8(type_bytes) {
2022 Ok(s) => convert_marshal_type_to_cql(s),
2023 Err(_) => {
2024 log::debug!("Invalid UTF-8 in regular column type {}", i);
2025 return Err(nom::Err::Error(nom::error::Error::new(
2026 input,
2027 nom::error::ErrorKind::Verify,
2028 )));
2029 }
2030 };
2031
2032 log::debug!(
2033 "HEADER: Regular column '{}': {} ({} bytes)",
2034 column_name,
2035 cql_type,
2036 type_len
2037 );
2038
2039 regular_columns.push(super::header::ColumnInfo {
2040 name: column_name,
2041 column_type: cql_type,
2042 is_primary_key: false,
2043 key_position: None,
2044 is_static: false,
2045 is_clustering: false,
2046 });
2047
2048 input = remaining;
2049 }
2050
2051 let mut all_columns = static_columns;
2053 all_columns.extend(regular_columns);
2054
2055 log::debug!(
2056 "HEADER parsing complete: partition_key='{}', {} clustering keys, {} total columns",
2057 partition_key_type,
2058 clustering_key_types.len(),
2059 all_columns.len()
2060 );
2061
2062 Ok((
2063 input,
2064 (vec![partition_key_type], clustering_key_types, all_columns),
2065 ))
2066}
2067
2068fn parse_minimal_encoding_stats<'a>(
2079 input: &'a [u8],
2080 full_input: &'a [u8],
2081 header_offset: Option<usize>,
2082 gates: Option<&VersionGates>,
2083) -> IResult<&'a [u8], EncodingStatsResult> {
2084 let Some(offset) = header_offset else {
2090 log::debug!("No HEADER TOC offset, using fallback EncodingStats parsing");
2091 return parse_encoding_stats_fallback(input, gates);
2092 };
2093
2094 if offset >= full_input.len() {
2095 log::warn!(
2096 "TOC offset 0x{:x} exceeds input length {}, using fallback",
2097 offset,
2098 full_input.len()
2099 );
2100 return parse_encoding_stats_fallback(input, gates);
2101 }
2102
2103 let header_data = &full_input[offset..];
2104 log::debug!(
2105 "Parsing EncodingStats + SerializationHeader at TOC offset 0x{:x} ({} bytes available)",
2106 offset,
2107 header_data.len()
2108 );
2109
2110 let (rest, (min_timestamp, min_deletion_time, min_ttl)) =
2112 parse_encoding_stats_vuints(header_data, gates)?;
2113
2114 log::debug!(
2115 "EncodingStats from HEADER: min_timestamp={}, min_deletion_time={}, min_ttl={:?}",
2116 min_timestamp,
2117 min_deletion_time,
2118 min_ttl
2119 );
2120
2121 let (partition_types, clustering_types, columns) = match parse_serialization_header_schema(rest)
2123 {
2124 Ok((_, result)) => result,
2125 Err(e) => {
2126 log::warn!(
2127 "Schema parsing after EncodingStats failed: {:?}, falling back to marker search",
2128 e
2129 );
2130 parse_serialization_header(input)?.1
2131 }
2132 };
2133
2134 let (partition_key_columns, clustering_key_columns) =
2135 build_column_infos(&partition_types, &clustering_types);
2136
2137 Ok((
2138 input,
2139 (
2140 min_timestamp,
2141 min_deletion_time,
2142 min_ttl,
2143 partition_key_columns,
2144 clustering_key_columns,
2145 columns,
2146 ),
2147 ))
2148}
2149
2150fn parse_encoding_stats_vuints<'a>(
2169 input: &'a [u8],
2170 _gates: Option<&VersionGates>,
2181) -> IResult<&'a [u8], (i64, i64, Option<i64>)> {
2182 let (rest, min_ts_delta) = parse_vuint(input)?;
2183 let (rest, min_ldt_delta) = parse_vuint(rest)?;
2184 let (rest, min_ttl_delta) = parse_vuint(rest)?;
2185
2186 Ok((
2187 rest,
2188 (
2189 min_ts_delta as i64 + TIMESTAMP_EPOCH,
2190 min_ldt_delta as i64 + DELETION_TIME_EPOCH,
2193 Some(min_ttl_delta as i64 + TTL_EPOCH),
2194 ),
2195 ))
2196}
2197
2198fn build_column_infos(
2200 partition_types: &[String],
2201 clustering_types: &[String],
2202) -> (
2203 Vec<super::header::ColumnInfo>,
2204 Vec<super::header::ColumnInfo>,
2205) {
2206 let partition_key_columns = build_partition_key_columns(partition_types);
2207 let clustering_key_columns = build_clustering_key_columns(clustering_types);
2208
2209 log::debug!(
2210 "Constructed ColumnInfo entries from SerializationHeader: {} partition keys, {} clustering keys",
2211 partition_key_columns.len(),
2212 clustering_key_columns.len()
2213 );
2214
2215 (partition_key_columns, clustering_key_columns)
2216}
2217
2218fn parse_encoding_stats_fallback<'a>(
2221 input: &'a [u8],
2222 gates: Option<&VersionGates>,
2223) -> IResult<&'a [u8], EncodingStatsResult> {
2224 let (rest, _metadata_type) = be_u32(input)?;
2226
2227 let (rest, _data_length) = parse_vuint(rest)?;
2229
2230 let (rest, partitioner_len) = parse_vuint(rest)?;
2232
2233 let (rest, _) = take(partitioner_len as usize)(rest)?;
2235
2236 let (rest, _metadata1) = parse_vuint(rest)?;
2238 let (rest, _metadata2) = parse_vuint(rest)?;
2239
2240 let (rest, (min_timestamp, min_deletion_time, min_ttl)) =
2242 parse_encoding_stats_vuints(rest, gates)?;
2243
2244 let (_, (partition_types, clustering_types, columns)) = parse_serialization_header(rest)?;
2246
2247 let (partition_key_columns, clustering_key_columns) =
2248 build_column_infos(&partition_types, &clustering_types);
2249
2250 Ok((
2251 input,
2252 (
2253 min_timestamp,
2254 min_deletion_time,
2255 min_ttl,
2256 partition_key_columns,
2257 clustering_key_columns,
2258 columns,
2259 ),
2260 ))
2261}
2262
2263pub fn parse_enhanced_statistics_file<'a>(
2282 input: &'a [u8],
2283 gates: Option<&VersionGates>,
2284) -> IResult<&'a [u8], SSTableStatistics> {
2285 let (remaining, header) = parse_nb_format_header(input)?;
2287
2288 let result = parse_nb_format_statistics_data(remaining, &header, input, gates);
2291
2292 match result {
2293 Ok((
2294 row_stats,
2295 timestamp_stats,
2296 table_stats,
2297 partition_stats,
2298 compression_stats,
2299 partition_columns,
2300 clustering_columns,
2301 columns,
2302 )) => {
2303 log::debug!(
2304 "Successfully parsed Statistics.db serialization header: {} partition keys, {} clustering keys, {} regular columns",
2305 partition_columns.len(),
2306 clustering_columns.len(),
2307 columns.len()
2308 );
2309
2310 let statistics = SSTableStatistics {
2311 header,
2312 row_stats,
2313 timestamp_stats,
2314 column_stats: vec![],
2315 table_stats,
2316 partition_stats,
2317 compression_stats,
2318 metadata: std::collections::HashMap::new(),
2319 serialization_header_columns: columns,
2320 serialization_header_partition_keys: partition_columns,
2321 serialization_header_clustering_keys: clustering_columns,
2322 };
2323
2324 Ok((remaining, statistics))
2325 }
2326 Err(e) => {
2327 log::warn!("Failed to parse nb-format Statistics.db: {}", e);
2329 Err(nom::Err::Error(nom::error::Error::new(
2330 input,
2331 nom::error::ErrorKind::Verify,
2332 )))
2333 }
2334 }
2335}
2336
2337pub fn parse_statistics_with_fallback<'a>(
2352 input: &'a [u8],
2353 gates: Option<&VersionGates>,
2354) -> IResult<&'a [u8], SSTableStatistics> {
2355 parse_enhanced_statistics_file(input, gates)
2357}
2358
2359#[cfg(test)]
2360mod tests {
2361 use super::*;
2362
2363 #[test]
2364 fn test_serialization_header_with_no_clustering_keys() {
2365 let mut test_data = vec![];
2369
2370 let partition_type = b"(org.apache.cassandra.db.marshal.UUIDType";
2372 test_data.extend_from_slice(&[0x00, 0x00]); test_data.push(partition_type.len() as u8);
2374 test_data.extend_from_slice(partition_type);
2375
2376 test_data.push(0x00);
2378
2379 test_data.push(0x00); test_data.push(0x02); test_data.push(0x02); test_data.extend_from_slice(b"id");
2386 test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UUIDType");
2388
2389 test_data.push(0x04); test_data.extend_from_slice(b"name");
2392 test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2394
2395 let mut full_data = vec![0xFF; 100];
2397 full_data.extend_from_slice(&test_data);
2398
2399 let result = parse_serialization_header(&full_data);
2400 assert!(
2401 result.is_ok(),
2402 "Failed to parse SerializationHeader: {:?}",
2403 result.as_ref().err()
2404 );
2405
2406 let (_remaining, (partition_types, clustering_types, columns)) = result.unwrap();
2407
2408 assert_eq!(partition_types.len(), 1, "Expected 1 partition key");
2410 assert!(partition_types[0].contains("UUIDType"));
2411
2412 assert_eq!(clustering_types.len(), 0, "Expected 0 clustering keys");
2414
2415 assert_eq!(columns.len(), 2, "Expected 2 columns");
2417 assert_eq!(columns[0].name, "id");
2418 assert_eq!(columns[0].column_type, "uuid");
2419 assert_eq!(columns[1].name, "name");
2420 assert_eq!(columns[1].column_type, "text");
2421 }
2422
2423 #[test]
2424 fn test_serialization_header_with_clustering_keys() {
2425 let mut test_data = vec![];
2428
2429 let partition_type = b"(org.apache.cassandra.db.marshal.UUIDType";
2431 test_data.extend_from_slice(&[0x00, 0x00]); test_data.push(partition_type.len() as u8);
2433 test_data.extend_from_slice(partition_type);
2434
2435 test_data.push(0x02);
2437
2438 let ck1 =
2440 b"[org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimestampType)";
2441 test_data.push(ck1.len() as u8);
2442 test_data.extend_from_slice(ck1);
2443
2444 let ck2 = b"(org.apache.cassandra.db.marshal.UTF8Type)";
2446 test_data.push(ck2.len() as u8);
2447 test_data.extend_from_slice(ck2);
2448
2449 test_data.push(0x00); test_data.push(0x02); test_data.push(0x04); test_data.extend_from_slice(b"data");
2456 test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2458
2459 test_data.push(0x05); test_data.extend_from_slice(b"value");
2462 test_data.push(0x29); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.Int32Type");
2464
2465 let mut full_data = vec![0xFF; 100];
2467 full_data.extend_from_slice(&test_data);
2468
2469 let result = parse_serialization_header(&full_data);
2470 assert!(
2471 result.is_ok(),
2472 "Failed to parse SerializationHeader with clustering keys: {:?}",
2473 result.err()
2474 );
2475
2476 let (_remaining, (partition_types, clustering_types, columns)) = result.unwrap();
2477
2478 assert_eq!(partition_types.len(), 1);
2480 assert!(partition_types[0].contains("UUIDType"));
2481
2482 assert_eq!(clustering_types.len(), 2, "Expected 2 clustering keys");
2484 assert!(clustering_types[0].contains("ReversedType"));
2485 assert!(clustering_types[0].contains("TimestampType"));
2486 assert!(clustering_types[1].contains("UTF8Type"));
2487
2488 assert_eq!(columns.len(), 2);
2490 assert_eq!(columns[0].name, "data");
2491 assert_eq!(columns[0].column_type, "text");
2492 assert_eq!(columns[1].name, "value");
2493 assert_eq!(columns[1].column_type, "int");
2494 }
2495
2496 #[test]
2497 fn test_serialization_header_with_static_columns() {
2498 let mut test_data = vec![];
2503
2504 test_data.extend_from_slice(&[0x00, 0x00]);
2506
2507 let partition_type = b"org.apache.cassandra.db.marshal.UUIDType";
2509 test_data.push(partition_type.len() as u8);
2510 test_data.extend_from_slice(partition_type);
2511
2512 test_data.push(0x01);
2514
2515 let ck1 = b"org.apache.cassandra.db.marshal.TimestampType";
2517 test_data.push(ck1.len() as u8);
2518 test_data.extend_from_slice(ck1);
2519
2520 test_data.push(0x01);
2522
2523 test_data.push(0x0b); test_data.extend_from_slice(b"static_data");
2526 test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2528
2529 test_data.push(0x02);
2531
2532 test_data.push(0x08); test_data.extend_from_slice(b"row_data");
2535 test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2537
2538 test_data.push(0x09); test_data.extend_from_slice(b"row_value");
2541 test_data.push(0x29); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.Int32Type");
2543
2544 let mut full_data = vec![0xFF; 100];
2546 full_data.extend_from_slice(&test_data);
2547
2548 let result = parse_serialization_header(&full_data);
2549 assert!(
2550 result.is_ok(),
2551 "Failed to parse SerializationHeader with static columns: {:?}",
2552 result.err()
2553 );
2554
2555 let (_remaining, (partition_types, clustering_types, columns)) = result.unwrap();
2556
2557 assert_eq!(partition_types.len(), 1);
2559 assert!(partition_types[0].contains("UUIDType"));
2560
2561 assert_eq!(clustering_types.len(), 1);
2563 assert!(clustering_types[0].contains("TimestampType"));
2564
2565 assert_eq!(
2567 columns.len(),
2568 3,
2569 "Expected 3 columns (1 static + 2 regular)"
2570 );
2571
2572 assert_eq!(columns[0].name, "static_data");
2574 assert_eq!(columns[0].column_type, "text");
2575 assert!(
2576 columns[0].is_static,
2577 "static_data should be marked as static"
2578 );
2579
2580 assert_eq!(columns[1].name, "row_data");
2582 assert_eq!(columns[1].column_type, "text");
2583 assert!(
2584 !columns[1].is_static,
2585 "row_data should NOT be marked as static"
2586 );
2587
2588 assert_eq!(columns[2].name, "row_value");
2589 assert_eq!(columns[2].column_type, "int");
2590 assert!(
2591 !columns[2].is_static,
2592 "row_value should NOT be marked as static"
2593 );
2594 }
2595
2596 #[test]
2597 fn test_marshal_type_conversion() {
2598 assert_eq!(
2600 convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.Int32Type"),
2601 "int"
2602 );
2603 assert_eq!(
2604 convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.UTF8Type"),
2605 "text"
2606 );
2607 assert_eq!(
2608 convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.UUIDType"),
2609 "uuid"
2610 );
2611 assert_eq!(
2612 convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.TimestampType"),
2613 "timestamp"
2614 );
2615 assert_eq!(
2616 convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.DecimalType"),
2617 "decimal"
2618 );
2619 assert_eq!(
2620 convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.SimpleDataType"),
2621 "simpledata"
2622 );
2623
2624 let udt = "org.apache.cassandra.db.marshal.UserType(test_collections,616464726573735f74797065,737472656574:org.apache.cassandra.db.marshal.UTF8Type,63697479:org.apache.cassandra.db.marshal.UTF8Type)";
2626 assert_eq!(
2627 convert_marshal_type_to_cql(udt),
2628 udt,
2629 "UserType definitions must be preserved to retain keyspace, type name, and field metadata"
2630 );
2631
2632 let frozen_udt = "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(test_collections,616464726573735f74797065,737472656574:org.apache.cassandra.db.marshal.UTF8Type))";
2634 assert!(
2635 convert_marshal_type_to_cql(frozen_udt).contains("UserType("),
2636 "UserType inside FrozenType should be preserved"
2637 );
2638
2639 let list_udt = "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(test_collections,616464726573735f74797065,737472656574:org.apache.cassandra.db.marshal.UTF8Type)))";
2641 assert!(
2642 convert_marshal_type_to_cql(list_udt).contains("UserType("),
2643 "UserType inside List should be preserved"
2644 );
2645 }
2646
2647 #[test]
2648 fn test_nb_format_header_parsing() {
2649 let test_data = vec![
2651 0x00, 0x00, 0x00, 0x04, 0x26, 0x29, 0x1b, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2c, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x65, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x14, 0xd4, ];
2660
2661 let result = parse_nb_format_header(&test_data);
2662 assert!(result.is_ok());
2663
2664 let (_, header) = result.unwrap();
2665 assert_eq!(header.version, 4);
2666 assert_eq!(header.statistics_kind, 0x2629_1b05);
2667 assert_eq!(header.data_length, 44);
2668 assert_eq!(header.metadata1, 1);
2669 assert_eq!(header.metadata2, 101);
2670 assert_eq!(header.metadata3, 2);
2671 assert_eq!(header.checksum, 0x14d4);
2672 }
2673
2674 #[test]
2675 fn test_statistics_data_extraction_with_invalid_data() {
2676 let header = StatisticsHeader {
2678 version: 4,
2679 statistics_kind: 0x2629_1b05,
2680 data_length: 44,
2681 metadata1: 1,
2682 metadata2: 101,
2683 metadata3: 2,
2684 checksum: 0x14d4,
2685 table_id: None,
2686 };
2687
2688 let dummy_data = vec![0xFF; 10]; let result = parse_nb_format_statistics_data(&dummy_data, &header, &dummy_data, None);
2690
2691 assert!(result.is_err());
2693 }
2694
2695 #[test]
2696 fn test_enhanced_statistics_file_with_incomplete_data() {
2697 let test_data = vec![
2699 0x00, 0x00, 0x00, 0x04, 0x26, 0x29, 0x1b, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2c, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x65, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x14,
2707 0xd4, ];
2710
2711 let result = parse_enhanced_statistics_file(&test_data, None);
2712
2713 assert!(result.is_err());
2715 }
2716
2717 #[test]
2718 fn test_parser_fallback_with_incomplete_data() {
2719 let test_data = vec![
2721 0x00, 0x00, 0x00, 0x04, 0x26, 0x29, 0x1b, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2c, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x65, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x14, 0xd4, ];
2730
2731 let result = parse_statistics_with_fallback(&test_data, None);
2732
2733 assert!(result.is_err());
2735 }
2736
2737 #[test]
2738 fn test_invalid_data_returns_error() {
2739 let invalid_data = vec![0xFF; 10];
2741 let result = parse_statistics_with_fallback(&invalid_data, None);
2742 assert!(result.is_err(), "Invalid data should fail to parse");
2743 }
2744
2745 #[test]
2746 fn test_partition_key_extraction_via_backtracking() {
2747 let mut test_data = vec![];
2753
2754 test_data.extend_from_slice(&[0xFF; 50]);
2756
2757 test_data.extend_from_slice(&[0x80, 0x28]); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UUIDType");
2760
2761 test_data.push(0x00); test_data.push(0x02); test_data.push(0x0E); test_data.extend_from_slice(b"expiring_value");
2770 test_data.push(0x29); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.Int32Type");
2772
2773 test_data.push(0x0C); test_data.extend_from_slice(b"session_info");
2776 test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2778
2779 let result = parse_regular_columns(&test_data);
2781 assert!(
2782 result.is_ok(),
2783 "Failed to parse columns with backtracking: {:?}",
2784 result.err()
2785 );
2786
2787 let (_remaining, (partition_keys, columns)) = result.unwrap();
2788
2789 assert_eq!(
2791 partition_keys.len(),
2792 1,
2793 "Expected 1 partition key via backtracking"
2794 );
2795 assert_eq!(
2796 partition_keys[0],
2797 "org.apache.cassandra.db.marshal.UUIDType"
2798 );
2799
2800 assert_eq!(columns.len(), 2, "Expected 2 regular columns");
2802 assert_eq!(columns[0].name, "expiring_value");
2803 assert_eq!(columns[0].column_type, "int");
2804 assert!(!columns[0].is_primary_key);
2805 assert_eq!(columns[1].name, "session_info");
2806 assert_eq!(columns[1].column_type, "text");
2807 assert!(!columns[1].is_primary_key);
2808 }
2809
2810 #[test]
2811 fn test_partition_key_extraction_with_longer_type() {
2812 let mut test_data = vec![0xFF; 100]; let composite_type =
2817 "(org.apache.cassandra.db.marshal.CompositeType(UTF8Type,Int32Type,UUIDType)";
2818 let type_len = composite_type.len() as u8;
2819
2820 test_data.push(type_len);
2822 test_data.extend_from_slice(composite_type.as_bytes());
2823
2824 test_data.push(0x00); test_data.push(0x01); test_data.push(0x04);
2830 test_data.extend_from_slice(b"data");
2831 test_data.push(0x28);
2832 test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2833
2834 let result = parse_regular_columns(&test_data);
2835 assert!(result.is_ok(), "Failed to parse: {:?}", result.err());
2836
2837 let (_remaining, (partition_keys, columns)) = result.unwrap();
2838
2839 assert_eq!(partition_keys.len(), 1);
2840 assert_eq!(partition_keys[0], composite_type);
2841
2842 assert_eq!(columns.len(), 1);
2844 assert_eq!(columns[0].name, "data");
2845 assert!(!columns[0].is_primary_key);
2846 }
2847
2848 #[test]
2849 fn test_backtracking_with_no_partition_key() {
2850 let mut test_data = vec![];
2854
2855 test_data.push(0x00); test_data.push(0x01); test_data.push(0x04);
2861 test_data.extend_from_slice(b"name");
2862 test_data.push(0x28);
2863 test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2864
2865 let result = parse_regular_columns(&test_data);
2866 assert!(result.is_ok());
2867
2868 let (_remaining, (partition_keys, columns)) = result.unwrap();
2869
2870 assert_eq!(partition_keys.len(), 0, "Should have no partition keys");
2871 assert_eq!(columns.len(), 1);
2872 assert_eq!(columns[0].name, "name");
2873 }
2874
2875 #[test]
2876 fn test_backtracking_rejects_invalid_types() {
2877 let mut test_data = vec![0xFF; 50];
2879
2880 test_data.push(0x15); test_data.extend_from_slice(b"InvalidTypeDescriptor");
2883
2884 test_data.extend_from_slice(&[0x00, 0x00, 0x01]);
2886
2887 test_data.push(0x04);
2889 test_data.extend_from_slice(b"test");
2890 test_data.push(0x28);
2891 test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2892
2893 let result = parse_regular_columns(&test_data);
2894 assert!(result.is_ok());
2895
2896 let (_remaining, (partition_keys, _columns)) = result.unwrap();
2897
2898 assert_eq!(
2900 partition_keys.len(),
2901 0,
2902 "Should reject invalid type pattern"
2903 );
2904 }
2905}