1use super::vint::{parse_vint, parse_vint_length};
7use crate::error::Result;
8use nom::{
9 bytes::complete::take,
10 multi::count,
11 number::complete::{be_u16, be_u32, be_u64, be_u8},
12 IResult,
13};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19pub enum CassandraVersion {
20 Legacy,
22 V5_0Alpha,
24 V5_0Beta,
26 V5_0Release,
28 V5_0NewBig,
30 V5_0Bti,
32 V5_0DataFormat,
34 V5_0FormatC,
36 V5_0FormatD,
38 V5_0FormatE,
40 V5_0FormatF,
42 V5_0FormatG,
44 V5_0StaticColumns,
49 V5_0Uncompressed,
54 V5_0ComplexTypes,
61 V5_0TypedCollections,
67 V5_0WideRows,
73 V5_0NewBigFormat,
82}
83
84impl CassandraVersion {
85 pub fn magic_number(&self) -> u32 {
87 match self {
88 CassandraVersion::Legacy => 0x6F61_0000, CassandraVersion::V5_0Alpha => 0xAD01_0000, CassandraVersion::V5_0Beta => 0xA007_0000, CassandraVersion::V5_0Release => 0x4316_0000, CassandraVersion::V5_0NewBig => 0x0000_0000, CassandraVersion::V5_0Bti => 0x6461_0000, CassandraVersion::V5_0DataFormat => 0x8080_015c, CassandraVersion::V5_0FormatC => 0x8c33_0000, CassandraVersion::V5_0FormatD => 0x4325_0000, CassandraVersion::V5_0FormatE => 0x4225_0000, CassandraVersion::V5_0FormatF => 0xEA22_0000, CassandraVersion::V5_0FormatG => 0xAF03_0000, CassandraVersion::V5_0StaticColumns => 0xC051_5C00, CassandraVersion::V5_0Uncompressed => 0x0010_045E, CassandraVersion::V5_0ComplexTypes => 0x8236_5C00, CassandraVersion::V5_0TypedCollections => 0x0F3C_0000, CassandraVersion::V5_0WideRows => 0xF07C_5C00, CassandraVersion::V5_0NewBigFormat => 0xD464_5400, }
108 }
109
110 pub fn from_magic_number(magic: u32) -> Option<CassandraVersion> {
112 match magic {
113 0x6F61_0000..=0x6F61_FFFF => Some(CassandraVersion::Legacy),
115
116 0xAD01_0000..=0xAD01_FFFF => Some(CassandraVersion::V5_0Alpha),
118
119 0xA007_0000..=0xA007_FFFF => Some(CassandraVersion::V5_0Beta),
121
122 0x4316_0000..=0x4316_FFFF => Some(CassandraVersion::V5_0Release),
124
125 0x6461_0000..=0x6461_FFFF => Some(CassandraVersion::V5_0Bti),
130
131 0x8080_015c => Some(CassandraVersion::V5_0DataFormat),
133
134 0x8c33_0000 => Some(CassandraVersion::V5_0FormatC),
136
137 0x4325_0000 => Some(CassandraVersion::V5_0FormatD),
139
140 0x4225_0000 => Some(CassandraVersion::V5_0FormatE),
142
143 0xEA22_0000 => Some(CassandraVersion::V5_0FormatF),
145
146 0xAF03_0000 => Some(CassandraVersion::V5_0FormatG),
148
149 0xC051_5C00 => Some(CassandraVersion::V5_0StaticColumns),
151
152 0x0010_045E => Some(CassandraVersion::V5_0Uncompressed),
154
155 0x8236_5C00 => Some(CassandraVersion::V5_0ComplexTypes),
157
158 0x0F3C_0000 => Some(CassandraVersion::V5_0TypedCollections),
160
161 0xF07C_5C00 => Some(CassandraVersion::V5_0WideRows),
163
164 0xD464_5400 => Some(CassandraVersion::V5_0NewBigFormat),
166
167 _ => None,
168 }
169 }
170
171 pub fn version_string(&self) -> &'static str {
173 match self {
174 CassandraVersion::Legacy => "Legacy 'oa' format",
175 CassandraVersion::V5_0Alpha => "Cassandra 5.0 Alpha",
176 CassandraVersion::V5_0Beta => "Cassandra 5.0 Beta",
177 CassandraVersion::V5_0Release => "Cassandra 5.0 Release",
178 CassandraVersion::V5_0NewBig => "Cassandra 5.0 'nb' (new big) format",
179 CassandraVersion::V5_0Bti => "Cassandra 5.0 BTI (Big Trie-Indexed) format",
180 CassandraVersion::V5_0DataFormat => "Cassandra 5.0 Data.db format",
181 CassandraVersion::V5_0FormatC => "Cassandra 5.0 Format C",
182 CassandraVersion::V5_0FormatD => "Cassandra 5.0 Format D",
183 CassandraVersion::V5_0FormatE => "Cassandra 5.0 Format E (composite keys)",
184 CassandraVersion::V5_0FormatF => "Cassandra 5.0 Format F (TTL support)",
185 CassandraVersion::V5_0FormatG => "Cassandra 5.0 Format G (counters)",
186 CassandraVersion::V5_0StaticColumns => "Cassandra 5.0 Static Columns format",
187 CassandraVersion::V5_0Uncompressed => "Cassandra 5.0 Uncompressed format",
188 CassandraVersion::V5_0ComplexTypes => "Cassandra 5.0 Complex Types format",
189 CassandraVersion::V5_0TypedCollections => "Cassandra 5.0 Typed Collections format",
190 CassandraVersion::V5_0WideRows => "Cassandra 5.0 Wide Rows format",
191 CassandraVersion::V5_0NewBigFormat => {
192 "Cassandra 5.0 NewBig Format (byte-comparable keys)"
193 }
194 }
195 }
196
197 pub fn data_format(&self) -> DataFormat {
216 match self {
217 CassandraVersion::Legacy => DataFormat::LegacyOA,
219
220 CassandraVersion::V5_0DataFormat
223 | CassandraVersion::V5_0FormatC
224 | CassandraVersion::V5_0FormatD
225 | CassandraVersion::V5_0FormatE
226 | CassandraVersion::V5_0FormatF
227 | CassandraVersion::V5_0FormatG
228 | CassandraVersion::V5_0StaticColumns
229 | CassandraVersion::V5_0ComplexTypes
230 | CassandraVersion::V5_0TypedCollections
231 | CassandraVersion::V5_0WideRows => DataFormat::V5CompressedLegacy,
232
233 CassandraVersion::V5_0Uncompressed => DataFormat::V5CompressedLegacy,
236
237 CassandraVersion::V5_0NewBigFormat => {
241 log::warn!("V5_0NewBigFormat detected (magic 0xD4645400), using V5CompressedLegacy classification");
242 DataFormat::V5CompressedLegacy
243 }
244
245 CassandraVersion::V5_0NewBig => DataFormat::V5CompressedLegacy,
249
250 CassandraVersion::V5_0Bti => DataFormat::V5UncompressedOA,
252
253 CassandraVersion::V5_0Alpha
256 | CassandraVersion::V5_0Beta
257 | CassandraVersion::V5_0Release => DataFormat::LegacyOA,
258 }
259 }
260
261 pub fn is_nb_format(&self) -> bool {
270 matches!(
271 self,
272 CassandraVersion::V5_0NewBig
273 | CassandraVersion::V5_0NewBigFormat
274 | CassandraVersion::V5_0DataFormat
275 | CassandraVersion::V5_0FormatC
276 | CassandraVersion::V5_0FormatD
277 | CassandraVersion::V5_0FormatE
278 | CassandraVersion::V5_0FormatF
279 | CassandraVersion::V5_0FormatG
280 | CassandraVersion::V5_0StaticColumns
281 | CassandraVersion::V5_0ComplexTypes
282 | CassandraVersion::V5_0TypedCollections
283 | CassandraVersion::V5_0WideRows
284 )
285 }
286}
287
288#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
293pub enum DataFormat {
294 LegacyOA,
299
300 V5CompressedLegacy,
311
312 V5UncompressedOA,
321}
322
323pub const SSTABLE_MAGIC: u32 = 0x6F61_0000; pub const SUPPORTED_MAGIC_NUMBERS: &[u32] = &[
331 0x6F61_0000, 0xAD01_0000, 0xA007_0000, 0x4316_0000, 0x6461_0000, 0x8080_015c, 0x8c33_0000, 0x4325_0000, 0x4225_0000, 0xEA22_0000, 0xAF03_0000, 0xC051_5C00, 0x0010_045E, 0x8236_5C00, 0x0F3C_0000, 0xF07C_5C00, 0xD464_5400, 0x2C00_0000, 0xC302_0000, 0xF81E_0000, ];
353
354pub const SUPPORTED_VERSION: u16 = 0x0001;
356
357#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct SSTableHeader {
360 pub cassandra_version: CassandraVersion,
362 pub version: u16,
364 pub table_id: [u8; 16],
366 pub keyspace: String,
368 pub table_name: String,
370 pub generation: u64,
372 pub compression: CompressionInfo,
374 pub stats: SSTableStats,
376 pub columns: Vec<ColumnInfo>,
378 pub properties: HashMap<String, String>,
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct CompressionInfo {
385 pub algorithm: String,
387 pub chunk_size: u32,
389 pub parameters: HashMap<String, String>,
391}
392
393#[derive(Debug, Clone, Default, Serialize, Deserialize)]
395pub struct SSTableStats {
396 pub row_count: u64,
398 pub min_timestamp: i64,
400 pub max_timestamp: i64,
402 pub max_deletion_time: i64,
404 pub compression_ratio: f64,
406 pub row_size_histogram: Vec<u64>,
408}
409
410#[derive(Debug, Clone, Serialize, Deserialize)]
412pub struct ColumnInfo {
413 pub name: String,
415 pub column_type: String,
417 pub is_primary_key: bool,
419 pub key_position: Option<u16>,
421 pub is_static: bool,
423 pub is_clustering: bool,
425}
426
427pub fn parse_magic_and_version(input: &[u8]) -> IResult<&[u8], (CassandraVersion, u16)> {
429 if input.len() < 4 {
431 return Err(nom::Err::Error(nom::error::Error::new(
432 input,
433 nom::error::ErrorKind::Eof,
434 )));
435 }
436
437 let (input, magic) = be_u32(input)?;
438
439 log::debug!("Parsed magic number: 0x{:08X}", magic);
441
442 let cassandra_version = CassandraVersion::from_magic_number(magic).ok_or_else(|| {
444 log::error!("Unknown magic number: 0x{:08X}", magic);
445 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Tag))
446 })?;
447
448 log::debug!("Detected Cassandra version: {:?}", cassandra_version);
449
450 if input.len() < 2 {
452 return Err(nom::Err::Error(nom::error::Error::new(
453 input,
454 nom::error::ErrorKind::Eof,
455 )));
456 }
457
458 let (input, version) = be_u16(input)?;
461
462 log::debug!("Parsed version: 0x{:04X}", version);
463
464 match cassandra_version {
466 CassandraVersion::Legacy
467 | CassandraVersion::V5_0Alpha
468 | CassandraVersion::V5_0Beta
469 | CassandraVersion::V5_0Release => {
470 if version != SUPPORTED_VERSION {
472 log::warn!(
473 "Unsupported version 0x{:04X} for {:?}, expected 0x{:04X}",
474 version,
475 cassandra_version,
476 SUPPORTED_VERSION
477 );
478 return Err(nom::Err::Error(nom::error::Error::new(
479 input,
480 nom::error::ErrorKind::Verify,
481 )));
482 }
483 }
484 CassandraVersion::V5_0NewBig
485 | CassandraVersion::V5_0Bti
486 | CassandraVersion::V5_0DataFormat
487 | CassandraVersion::V5_0FormatC
488 | CassandraVersion::V5_0FormatD
489 | CassandraVersion::V5_0FormatE
490 | CassandraVersion::V5_0FormatF
491 | CassandraVersion::V5_0FormatG
492 | CassandraVersion::V5_0StaticColumns
493 | CassandraVersion::V5_0Uncompressed
494 | CassandraVersion::V5_0ComplexTypes
495 | CassandraVersion::V5_0TypedCollections
496 | CassandraVersion::V5_0WideRows
497 | CassandraVersion::V5_0NewBigFormat => {
498 if version == 0 {
502 log::warn!(
503 "Suspicious version 0x{:04X} for {:?}",
504 version,
505 cassandra_version
506 );
507 return Err(nom::Err::Error(nom::error::Error::new(
508 input,
509 nom::error::ErrorKind::Verify,
510 )));
511 }
512 }
513 }
514
515 Ok((input, (cassandra_version, version)))
516}
517
518pub fn parse_magic_and_version_legacy(input: &[u8]) -> IResult<&[u8], u16> {
520 let (input, (_, version)) = parse_magic_and_version(input)?;
521 Ok((input, version))
522}
523
524pub fn parse_vstring(input: &[u8]) -> IResult<&[u8], String> {
526 let (input, length) = parse_vint_length(input)?;
527 let (input, bytes) = take(length)(input)?;
528 let string = String::from_utf8(bytes.to_vec()).map_err(|_| {
529 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
530 })?;
531 Ok((input, string))
532}
533
534pub fn parse_compression_info(input: &[u8]) -> IResult<&[u8], CompressionInfo> {
536 let (input, algorithm) = parse_vstring(input)?;
537 let (input, chunk_size) = be_u32(input)?;
538 let (input, param_count) = parse_vint_length(input)?;
539
540 let mut parameters = HashMap::new();
541 let mut remaining = input;
542
543 for _ in 0..param_count {
544 let (new_remaining, key) = parse_vstring(remaining)?;
545 let (new_remaining, value) = parse_vstring(new_remaining)?;
546 parameters.insert(key, value);
547 remaining = new_remaining;
548 }
549
550 Ok((
551 remaining,
552 CompressionInfo {
553 algorithm,
554 chunk_size,
555 parameters,
556 },
557 ))
558}
559
560pub fn parse_sstable_stats(input: &[u8]) -> IResult<&[u8], SSTableStats> {
562 let (input, row_count) = be_u64(input)?;
563 let (input, min_timestamp) = parse_vint(input)?;
564 let (input, max_timestamp) = parse_vint(input)?;
565 let (input, max_deletion_time) = parse_vint(input)?;
566 let (input, compression_ratio_bits) = be_u64(input)?;
567 let compression_ratio = f64::from_bits(compression_ratio_bits);
568
569 let (input, histogram_size) = parse_vint_length(input)?;
570 let (input, row_size_histogram) = count(be_u64, histogram_size)(input)?;
571
572 Ok((
573 input,
574 SSTableStats {
575 row_count,
576 min_timestamp,
577 max_timestamp,
578 max_deletion_time,
579 compression_ratio,
580 row_size_histogram,
581 },
582 ))
583}
584
585pub fn parse_column_info(input: &[u8]) -> IResult<&[u8], ColumnInfo> {
587 let (input, name) = parse_vstring(input)?;
588 let (input, column_type) = parse_vstring(input)?;
589 let (input, flags) = be_u8(input)?;
590
591 let is_primary_key = (flags & 0x01) != 0;
592 let is_static = (flags & 0x02) != 0;
593 let is_clustering = (flags & 0x04) != 0;
594
595 let (input, key_position) = if is_primary_key {
596 let (input, pos) = be_u16(input)?;
597 (input, Some(pos))
598 } else {
599 (input, None)
600 };
601
602 Ok((
603 input,
604 ColumnInfo {
605 name,
606 column_type,
607 is_primary_key,
608 key_position,
609 is_static,
610 is_clustering,
611 },
612 ))
613}
614
615pub fn parse_sstable_header(input: &[u8]) -> IResult<&[u8], SSTableHeader> {
617 let (input, (cassandra_version, version)) = parse_magic_and_version(input)?;
618
619 match cassandra_version {
622 CassandraVersion::V5_0FormatC
623 | CassandraVersion::V5_0FormatD
624 | CassandraVersion::V5_0FormatE
625 | CassandraVersion::V5_0FormatF
626 | CassandraVersion::V5_0FormatG
627 | CassandraVersion::V5_0DataFormat
628 | CassandraVersion::V5_0NewBig
629 | CassandraVersion::V5_0StaticColumns
630 | CassandraVersion::V5_0Uncompressed
631 | CassandraVersion::V5_0ComplexTypes
632 | CassandraVersion::V5_0TypedCollections
633 | CassandraVersion::V5_0WideRows
634 | CassandraVersion::V5_0NewBigFormat => {
635 return parse_cassandra5_simplified_header(input, cassandra_version, version);
636 }
637 _ => {
638 }
640 }
641
642 let (input, table_id) = take(16usize)(input)?;
643 let table_id = {
644 let mut id = [0u8; 16];
645 id.copy_from_slice(table_id);
646 id
647 };
648
649 let (input, keyspace) = parse_vstring(input)?;
650 let (input, table_name) = parse_vstring(input)?;
651 let (input, generation) = be_u64(input)?;
652 let (input, compression) = parse_compression_info(input)?;
653 let (input, stats) = parse_sstable_stats(input)?;
654
655 let (input, column_count) = parse_vint_length(input)?;
656 let (input, columns) = count(parse_column_info, column_count)(input)?;
657
658 let (input, prop_count) = parse_vint_length(input)?;
659 let mut properties = HashMap::new();
660 let mut remaining = input;
661
662 for _ in 0..prop_count {
663 let (new_remaining, key) = parse_vstring(remaining)?;
664 let (new_remaining, value) = parse_vstring(new_remaining)?;
665 properties.insert(key, value);
666 remaining = new_remaining;
667 }
668
669 Ok((
670 remaining,
671 SSTableHeader {
672 cassandra_version,
673 version,
674 table_id,
675 keyspace,
676 table_name,
677 generation,
678 compression,
679 stats,
680 columns,
681 properties,
682 },
683 ))
684}
685
686fn parse_cassandra5_simplified_header(
689 input: &[u8],
690 cassandra_version: CassandraVersion,
691 version: u16,
692) -> IResult<&[u8], SSTableHeader> {
693 Ok((
700 &input[input.len()..], SSTableHeader {
702 cassandra_version,
703 version,
704 table_id: [0u8; 16], keyspace: "test_keyspace".to_string(), table_name: "test_table".to_string(), generation: 1,
708 compression: CompressionInfo {
709 algorithm: "none".to_string(),
710 chunk_size: 65536,
711 parameters: HashMap::new(),
712 },
713 stats: SSTableStats::default(),
714 columns: vec![],
715 properties: HashMap::new(),
716 },
717 ))
718}
719
720pub fn serialize_sstable_header(header: &SSTableHeader) -> Result<Vec<u8>> {
722 let mut result = Vec::new();
723
724 result.extend_from_slice(&header.cassandra_version.magic_number().to_be_bytes());
726
727 result.extend_from_slice(&header.version.to_be_bytes());
730
731 result.extend_from_slice(&header.table_id);
733
734 serialize_vstring(&mut result, &header.keyspace)?;
736 serialize_vstring(&mut result, &header.table_name)?;
737
738 result.extend_from_slice(&header.generation.to_be_bytes());
740
741 serialize_compression_info(&mut result, &header.compression)?;
743
744 serialize_sstable_stats(&mut result, &header.stats)?;
746
747 serialize_vint_length(&mut result, header.columns.len())?;
749 for column in &header.columns {
750 serialize_column_info(&mut result, column)?;
751 }
752
753 serialize_vint_length(&mut result, header.properties.len())?;
755 for (key, value) in &header.properties {
756 serialize_vstring(&mut result, key)?;
757 serialize_vstring(&mut result, value)?;
758 }
759
760 Ok(result)
761}
762
763fn serialize_vstring(output: &mut Vec<u8>, s: &str) -> Result<()> {
764 use super::vint::encode_vint;
765 output.extend_from_slice(&encode_vint(s.len() as i64));
766 output.extend_from_slice(s.as_bytes());
767 Ok(())
768}
769
770fn serialize_vint_length(output: &mut Vec<u8>, len: usize) -> Result<()> {
771 use super::vint::encode_vint;
772 output.extend_from_slice(&encode_vint(len as i64));
773 Ok(())
774}
775
776fn serialize_compression_info(output: &mut Vec<u8>, info: &CompressionInfo) -> Result<()> {
777 serialize_vstring(output, &info.algorithm)?;
778 output.extend_from_slice(&info.chunk_size.to_be_bytes());
779 serialize_vint_length(output, info.parameters.len())?;
780
781 for (key, value) in &info.parameters {
782 serialize_vstring(output, key)?;
783 serialize_vstring(output, value)?;
784 }
785
786 Ok(())
787}
788
789fn serialize_sstable_stats(output: &mut Vec<u8>, stats: &SSTableStats) -> Result<()> {
790 use super::vint::encode_vint;
791
792 output.extend_from_slice(&stats.row_count.to_be_bytes());
793 output.extend_from_slice(&encode_vint(stats.min_timestamp));
794 output.extend_from_slice(&encode_vint(stats.max_timestamp));
795 output.extend_from_slice(&encode_vint(stats.max_deletion_time));
796 output.extend_from_slice(&stats.compression_ratio.to_bits().to_be_bytes());
797
798 serialize_vint_length(output, stats.row_size_histogram.len())?;
799 for &size in &stats.row_size_histogram {
800 output.extend_from_slice(&size.to_be_bytes());
801 }
802
803 Ok(())
804}
805
806fn serialize_column_info(output: &mut Vec<u8>, column: &ColumnInfo) -> Result<()> {
807 serialize_vstring(output, &column.name)?;
808 serialize_vstring(output, &column.column_type)?;
809
810 let mut flags = 0u8;
811 if column.is_primary_key {
812 flags |= 0x01;
813 }
814 if column.is_static {
815 flags |= 0x02;
816 }
817 if column.is_clustering {
818 flags |= 0x04;
819 }
820 output.push(flags);
821
822 if let Some(position) = column.key_position {
823 output.extend_from_slice(&position.to_be_bytes());
824 }
825
826 Ok(())
827}
828
829#[cfg(test)]
830mod tests {
831 use super::*;
832
833 #[test]
834 fn test_magic_and_version_legacy() {
835 let mut data = Vec::new();
836 data.extend_from_slice(&SSTABLE_MAGIC.to_be_bytes());
837 data.extend_from_slice(&SUPPORTED_VERSION.to_be_bytes());
838
839 let (_, (cassandra_version, version)) = parse_magic_and_version(&data).unwrap();
840 assert_eq!(cassandra_version, CassandraVersion::Legacy);
841 assert_eq!(version, SUPPORTED_VERSION);
842 }
843
844 #[test]
845 fn test_magic_and_version_cassandra_5_alpha() {
846 let mut data = Vec::new();
847 data.extend_from_slice(&CassandraVersion::V5_0Alpha.magic_number().to_be_bytes());
848 data.extend_from_slice(&SUPPORTED_VERSION.to_be_bytes());
849
850 let (_, (cassandra_version, version)) = parse_magic_and_version(&data).unwrap();
851 assert_eq!(cassandra_version, CassandraVersion::V5_0Alpha);
852 assert_eq!(version, SUPPORTED_VERSION);
853 }
854
855 #[test]
856 fn test_magic_and_version_cassandra_5_beta() {
857 let mut data = Vec::new();
858 data.extend_from_slice(&CassandraVersion::V5_0Beta.magic_number().to_be_bytes());
859 data.extend_from_slice(&SUPPORTED_VERSION.to_be_bytes());
860
861 let (_, (cassandra_version, version)) = parse_magic_and_version(&data).unwrap();
862 assert_eq!(cassandra_version, CassandraVersion::V5_0Beta);
863 assert_eq!(version, SUPPORTED_VERSION);
864 }
865
866 #[test]
867 fn test_magic_and_version_cassandra_5_release() {
868 let mut data = Vec::new();
869 data.extend_from_slice(&CassandraVersion::V5_0Release.magic_number().to_be_bytes());
870 data.extend_from_slice(&SUPPORTED_VERSION.to_be_bytes());
871
872 let (_, (cassandra_version, version)) = parse_magic_and_version(&data).unwrap();
873 assert_eq!(cassandra_version, CassandraVersion::V5_0Release);
874 assert_eq!(version, SUPPORTED_VERSION);
875 }
876
877 #[test]
878 fn test_v5_newbig_is_headerless() {
879 assert_eq!(
884 CassandraVersion::V5_0NewBig.magic_number(),
885 0x0000_0000,
886 "V5_0NewBig should return sentinel 0x0000_0000 (headerless format)"
887 );
888
889 let data = [0x00, 0x00, 0x00, 0x00, 0x00, 0x01];
891 let result = parse_magic_and_version(&data);
892 assert!(
893 result.is_err(),
894 "0x0000_0000 should not be a valid magic number"
895 );
896 }
897
898 #[test]
899 fn test_magic_and_version_invalid() {
900 let mut data = Vec::new();
901 data.extend_from_slice(&0xDEADBEEFu32.to_be_bytes()); data.extend_from_slice(&SUPPORTED_VERSION.to_be_bytes());
903
904 let result = parse_magic_and_version(&data);
905 assert!(result.is_err());
906 }
907
908 #[test]
909 fn test_cassandra_version_from_magic() {
910 assert_eq!(
912 CassandraVersion::from_magic_number(0x6F61_0000),
913 Some(CassandraVersion::Legacy)
914 );
915 assert_eq!(
916 CassandraVersion::from_magic_number(0xAD01_0000),
917 Some(CassandraVersion::V5_0Alpha)
918 );
919 assert_eq!(
920 CassandraVersion::from_magic_number(0xA007_0000),
921 Some(CassandraVersion::V5_0Beta)
922 );
923 assert_eq!(
924 CassandraVersion::from_magic_number(0x4316_0000),
925 Some(CassandraVersion::V5_0Release)
926 );
927 assert_eq!(
932 CassandraVersion::from_magic_number(0x0040_0000),
933 None, "0x0040_0000 should NOT map to V5_0NewBig - it's LZ4 chunk length prefix"
935 );
936 assert_eq!(
937 CassandraVersion::from_magic_number(0x6461_0000),
938 Some(CassandraVersion::V5_0Bti)
939 );
940
941 assert_eq!(
943 CassandraVersion::from_magic_number(0x6F61_0001),
944 Some(CassandraVersion::Legacy)
945 );
946 assert_eq!(
947 CassandraVersion::from_magic_number(0xAD01_0001),
948 Some(CassandraVersion::V5_0Alpha)
949 );
950
951 assert_eq!(CassandraVersion::from_magic_number(0xDEADBEEF), None);
953 assert_eq!(CassandraVersion::from_magic_number(0x0000_0000), None);
954 }
955
956 #[test]
957 fn test_cassandra_version_strings() {
958 assert_eq!(
959 CassandraVersion::Legacy.version_string(),
960 "Legacy 'oa' format"
961 );
962 assert_eq!(
963 CassandraVersion::V5_0Alpha.version_string(),
964 "Cassandra 5.0 Alpha"
965 );
966 assert_eq!(
967 CassandraVersion::V5_0Beta.version_string(),
968 "Cassandra 5.0 Beta"
969 );
970 assert_eq!(
971 CassandraVersion::V5_0Release.version_string(),
972 "Cassandra 5.0 Release"
973 );
974 assert_eq!(
975 CassandraVersion::V5_0NewBig.version_string(),
976 "Cassandra 5.0 'nb' (new big) format"
977 );
978 }
979
980 #[test]
981 fn test_vstring_parsing() {
982 use super::super::vint::encode_vint;
983
984 let test_str = "test_string";
985 let mut data = Vec::new();
986 data.extend_from_slice(&encode_vint(test_str.len() as i64));
987 data.extend_from_slice(test_str.as_bytes());
988
989 let (_, parsed) = parse_vstring(&data).unwrap();
990 assert_eq!(parsed, test_str);
991 }
992
993 #[test]
994 fn test_column_info_roundtrip() {
995 let column = ColumnInfo {
996 name: "test_column".to_string(),
997 column_type: "text".to_string(),
998 is_primary_key: true,
999 key_position: Some(0),
1000 is_static: false,
1001 is_clustering: false,
1002 };
1003
1004 let mut serialized = Vec::new();
1005 serialize_column_info(&mut serialized, &column).unwrap();
1006
1007 let (_, parsed) = parse_column_info(&serialized).unwrap();
1008 assert_eq!(parsed.name, column.name);
1009 assert_eq!(parsed.column_type, column.column_type);
1010 assert_eq!(parsed.is_primary_key, column.is_primary_key);
1011 assert_eq!(parsed.key_position, column.key_position);
1012 }
1013
1014 #[test]
1015 fn test_compression_info_roundtrip() {
1016 let mut params = HashMap::new();
1017 params.insert("level".to_string(), "6".to_string());
1018
1019 let compression = CompressionInfo {
1020 algorithm: "LZ4".to_string(),
1021 chunk_size: 4096,
1022 parameters: params,
1023 };
1024
1025 let mut serialized = Vec::new();
1026 serialize_compression_info(&mut serialized, &compression).unwrap();
1027
1028 let (_, parsed) = parse_compression_info(&serialized).unwrap();
1029 assert_eq!(parsed.algorithm, compression.algorithm);
1030 assert_eq!(parsed.chunk_size, compression.chunk_size);
1031 assert_eq!(parsed.parameters, compression.parameters);
1032 }
1033
1034 #[test]
1035 fn test_insufficient_data_handling() {
1036 let data = vec![0x6F, 0x61]; let result = parse_magic_and_version(&data);
1039 assert!(
1040 result.is_err(),
1041 "Should fail with insufficient data for magic number"
1042 );
1043
1044 let data = vec![0x6F, 0x61, 0x00, 0x00]; let result = parse_magic_and_version(&data);
1047 assert!(
1048 result.is_err(),
1049 "Should fail with insufficient data for version"
1050 );
1051 }
1052
1053 #[test]
1054 fn test_version_validation_for_different_formats() {
1055 let mut data = Vec::new();
1057 data.extend_from_slice(&CassandraVersion::Legacy.magic_number().to_be_bytes());
1058 data.extend_from_slice(&SUPPORTED_VERSION.to_be_bytes());
1059 let result = parse_magic_and_version(&data);
1060 assert!(
1061 result.is_ok(),
1062 "Standard format with valid version should succeed"
1063 );
1064
1065 let mut data = Vec::new();
1067 data.extend_from_slice(&CassandraVersion::V5_0Bti.magic_number().to_be_bytes());
1068 data.extend_from_slice(&0x0002u16.to_be_bytes()); let result = parse_magic_and_version(&data);
1070 assert!(
1071 result.is_ok(),
1072 "BTI format should accept wider version range"
1073 );
1074
1075 let mut data = Vec::new();
1077 data.extend_from_slice(&CassandraVersion::V5_0Bti.magic_number().to_be_bytes());
1078 data.extend_from_slice(&0x0000u16.to_be_bytes());
1079 let result = parse_magic_and_version(&data);
1080 assert!(result.is_err(), "Should reject version 0");
1081 }
1082
1083 #[test]
1084 fn test_magic_number_range_detection() {
1085 let magic_with_version = 0x6F61_0001; assert_eq!(
1088 CassandraVersion::from_magic_number(magic_with_version),
1089 Some(CassandraVersion::Legacy),
1090 "Should detect legacy format even with version bits"
1091 );
1092
1093 let bti_with_version = 0x6461_0002; assert_eq!(
1096 CassandraVersion::from_magic_number(bti_with_version),
1097 Some(CassandraVersion::V5_0Bti),
1098 "Should detect BTI format even with version bits"
1099 );
1100 }
1101
1102 #[test]
1103 fn test_header_serialization_roundtrip() {
1104 use std::collections::HashMap;
1105
1106 let mut properties = HashMap::new();
1107 properties.insert("test_key".to_string(), "test_value".to_string());
1108
1109 let mut compression_params = HashMap::new();
1110 compression_params.insert("level".to_string(), "6".to_string());
1111
1112 let header = SSTableHeader {
1113 cassandra_version: CassandraVersion::V5_0Release,
1115 version: SUPPORTED_VERSION,
1116 table_id: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
1117 keyspace: "test_keyspace".to_string(),
1118 table_name: "test_table".to_string(),
1119 generation: 12345,
1120 compression: CompressionInfo {
1121 algorithm: "LZ4".to_string(),
1122 chunk_size: 4096,
1123 parameters: compression_params,
1124 },
1125 stats: SSTableStats {
1126 row_count: 1000,
1127 min_timestamp: -1000,
1128 max_timestamp: 1000,
1129 max_deletion_time: 500,
1130 compression_ratio: 0.75,
1131 row_size_histogram: vec![10, 20, 30],
1132 },
1133 columns: vec![ColumnInfo {
1134 name: "test_column".to_string(),
1135 column_type: "text".to_string(),
1136 is_primary_key: true,
1137 key_position: Some(0),
1138 is_static: false,
1139 is_clustering: false,
1140 }],
1141 properties,
1142 };
1143
1144 let serialized = serialize_sstable_header(&header).unwrap();
1146
1147 let (_, parsed_header) = parse_sstable_header(&serialized).unwrap();
1149
1150 assert_eq!(parsed_header.cassandra_version, header.cassandra_version);
1152 assert_eq!(parsed_header.version, header.version);
1153 assert_eq!(parsed_header.table_id, header.table_id);
1154 assert_eq!(parsed_header.keyspace, header.keyspace);
1155 assert_eq!(parsed_header.table_name, header.table_name);
1156 assert_eq!(parsed_header.generation, header.generation);
1157 assert_eq!(
1158 parsed_header.compression.algorithm,
1159 header.compression.algorithm
1160 );
1161 assert_eq!(parsed_header.stats.row_count, header.stats.row_count);
1162 assert_eq!(parsed_header.columns.len(), header.columns.len());
1163 assert_eq!(parsed_header.properties, header.properties);
1164 }
1165
1166 #[test]
1167 fn test_v5_format_classification() {
1168 assert_eq!(
1170 CassandraVersion::V5_0DataFormat.data_format(),
1171 DataFormat::V5CompressedLegacy,
1172 "V5_0DataFormat should use V5CompressedLegacy (u16 lengths, not VInt)"
1173 );
1174
1175 assert_eq!(
1177 CassandraVersion::V5_0FormatC.data_format(),
1178 DataFormat::V5CompressedLegacy
1179 );
1180 assert_eq!(
1181 CassandraVersion::V5_0FormatD.data_format(),
1182 DataFormat::V5CompressedLegacy
1183 );
1184 assert_eq!(
1185 CassandraVersion::V5_0FormatE.data_format(),
1186 DataFormat::V5CompressedLegacy
1187 );
1188 assert_eq!(
1189 CassandraVersion::V5_0FormatF.data_format(),
1190 DataFormat::V5CompressedLegacy
1191 );
1192 assert_eq!(
1193 CassandraVersion::V5_0FormatG.data_format(),
1194 DataFormat::V5CompressedLegacy
1195 );
1196
1197 assert_eq!(
1201 CassandraVersion::V5_0NewBig.data_format(),
1202 DataFormat::V5CompressedLegacy,
1203 "V5_0NewBig should use V5CompressedLegacy (u16 lengths, not VInt)"
1204 );
1205 assert_eq!(
1207 CassandraVersion::V5_0Bti.data_format(),
1208 DataFormat::V5UncompressedOA,
1209 "V5_0Bti should use V5UncompressedOA (VInt encoding)"
1210 );
1211
1212 assert_eq!(CassandraVersion::Legacy.data_format(), DataFormat::LegacyOA);
1214 }
1215
1216 #[test]
1217 fn test_v5_0_static_columns_roundtrip() {
1218 let magic = CassandraVersion::V5_0StaticColumns.magic_number();
1220 assert_eq!(magic, 0xC051_5C00, "Magic number should be 0xC051_5C00");
1221
1222 let variant = CassandraVersion::from_magic_number(magic);
1223 assert_eq!(
1224 variant,
1225 Some(CassandraVersion::V5_0StaticColumns),
1226 "Should round-trip to V5_0StaticColumns"
1227 );
1228
1229 assert_eq!(
1231 CassandraVersion::V5_0StaticColumns.version_string(),
1232 "Cassandra 5.0 Static Columns format"
1233 );
1234
1235 assert_eq!(
1237 CassandraVersion::V5_0StaticColumns.data_format(),
1238 DataFormat::V5CompressedLegacy,
1239 "V5_0StaticColumns should use V5CompressedLegacy"
1240 );
1241 }
1242
1243 #[test]
1244 fn test_v5_0_uncompressed_roundtrip() {
1245 let magic = CassandraVersion::V5_0Uncompressed.magic_number();
1247 assert_eq!(magic, 0x0010_045E, "Magic number should be 0x0010_045E");
1248
1249 let variant = CassandraVersion::from_magic_number(magic);
1250 assert_eq!(
1251 variant,
1252 Some(CassandraVersion::V5_0Uncompressed),
1253 "Should round-trip to V5_0Uncompressed"
1254 );
1255
1256 assert_eq!(
1258 CassandraVersion::V5_0Uncompressed.version_string(),
1259 "Cassandra 5.0 Uncompressed format"
1260 );
1261
1262 assert_eq!(
1265 CassandraVersion::V5_0Uncompressed.data_format(),
1266 DataFormat::V5CompressedLegacy,
1267 "V5_0Uncompressed should use V5CompressedLegacy (same row format, no compression)"
1268 );
1269 }
1270
1271 #[test]
1272 fn test_new_magic_numbers_in_supported_list() {
1273 assert!(
1275 SUPPORTED_MAGIC_NUMBERS.contains(&0xC051_5C00),
1276 "Static Columns magic should be in supported list"
1277 );
1278 assert!(
1279 SUPPORTED_MAGIC_NUMBERS.contains(&0x0010_045E),
1280 "Uncompressed magic should be in supported list"
1281 );
1282 assert!(
1283 SUPPORTED_MAGIC_NUMBERS.contains(&0x8236_5C00),
1284 "Complex Types magic should be in supported list"
1285 );
1286 assert!(
1287 SUPPORTED_MAGIC_NUMBERS.contains(&0x0F3C_0000),
1288 "Typed Collections magic should be in supported list"
1289 );
1290 assert!(
1291 SUPPORTED_MAGIC_NUMBERS.contains(&0xF07C_5C00),
1292 "Wide Rows magic should be in supported list"
1293 );
1294 }
1295
1296 #[test]
1297 fn test_v5_0_typed_collections_roundtrip() {
1298 let magic = CassandraVersion::V5_0TypedCollections.magic_number();
1300 assert_eq!(magic, 0x0F3C_0000, "Magic number should be 0x0F3C_0000");
1301
1302 let variant = CassandraVersion::from_magic_number(magic);
1303 assert_eq!(
1304 variant,
1305 Some(CassandraVersion::V5_0TypedCollections),
1306 "Should round-trip to V5_0TypedCollections"
1307 );
1308
1309 assert_eq!(
1311 CassandraVersion::V5_0TypedCollections.version_string(),
1312 "Cassandra 5.0 Typed Collections format"
1313 );
1314
1315 assert_eq!(
1317 CassandraVersion::V5_0TypedCollections.data_format(),
1318 DataFormat::V5CompressedLegacy,
1319 "V5_0TypedCollections should use V5CompressedLegacy"
1320 );
1321 }
1322
1323 #[test]
1324 fn test_v5_0_wide_rows_roundtrip() {
1325 let magic = CassandraVersion::V5_0WideRows.magic_number();
1327 assert_eq!(magic, 0xF07C_5C00, "Magic number should be 0xF07C_5C00");
1328
1329 let variant = CassandraVersion::from_magic_number(magic);
1330 assert_eq!(
1331 variant,
1332 Some(CassandraVersion::V5_0WideRows),
1333 "Should round-trip to V5_0WideRows"
1334 );
1335
1336 assert_eq!(
1338 CassandraVersion::V5_0WideRows.version_string(),
1339 "Cassandra 5.0 Wide Rows format"
1340 );
1341
1342 assert_eq!(
1344 CassandraVersion::V5_0WideRows.data_format(),
1345 DataFormat::V5CompressedLegacy,
1346 "V5_0WideRows should use V5CompressedLegacy"
1347 );
1348 }
1349}