1#![allow(deprecated)]
26
27use log::{debug, info, warn};
28use std::fs::File;
29use std::io::BufReader;
30use std::path::{Path, PathBuf};
31
32use super::{
33 chunk_decompressor::{create_decompressor_from_file, ChunkDecompressor},
34 compression_info::CompressionInfo,
35 format_detector::{SSTableComponent, SSTableFormat, SSTableInfo},
36};
37use crate::parser::vint::parse_vint;
38use crate::{Error, Result};
39
40#[deprecated(
47 since = "0.1.0",
48 note = "Use SSTableReader instead. This reader is EXPERIMENTAL and not suitable for production. See Issue #190."
49)]
50pub struct BulletproofReader {
51 info: SSTableInfo,
53 base_dir: PathBuf,
55 decompressor: Option<ChunkDecompressor>,
57 data_reader: Option<BufReader<File>>,
59}
60
61impl Default for BulletproofReader {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67impl BulletproofReader {
68 pub fn new() -> Self {
70 Self {
71 info: SSTableInfo::default(),
72 base_dir: PathBuf::new(),
73 decompressor: None,
74 data_reader: None,
75 }
76 }
77
78 pub fn open<P: AsRef<Path>>(sstable_path: P) -> Result<Self> {
83 let path = sstable_path.as_ref();
84 let info = SSTableInfo::from_path(path)?;
85
86 let base_dir = path
87 .parent()
88 .ok_or_else(|| Error::InvalidPath("No parent directory".to_string()))?
89 .to_path_buf();
90
91 info!(
92 "Opening SSTable with bulletproof reader: format={:?}, generation={}, size={}, component={:?}, base={}",
93 info.format, info.generation_numeric().unwrap_or(0), info.size, info.component, info.base_name
94 );
95
96 let mut reader = Self {
97 info,
98 base_dir,
99 decompressor: None,
100 data_reader: None,
101 };
102
103 reader.initialize()?;
104 Ok(reader)
105 }
106
107 fn initialize(&mut self) -> Result<()> {
109 if self.info.format.supports_compression() {
111 if let Err(e) = self.setup_compression() {
112 warn!(
113 "Compression setup failed: {}, trying without compression",
114 e
115 );
116 }
117 }
118
119 self.open_data_file()?;
121
122 Ok(())
123 }
124
125 fn setup_compression(&mut self) -> Result<()> {
127 let compression_info_path = self
128 .info
129 .companion_path(SSTableComponent::CompressionInfo, &self.base_dir);
130
131 if compression_info_path.exists() {
132 debug!("Found CompressionInfo.db, setting up decompression");
133
134 let decompressor = create_decompressor_from_file(&compression_info_path)?;
135 self.decompressor = Some(decompressor);
136
137 debug!("Compression setup complete");
138 } else {
139 debug!("No CompressionInfo.db found, assuming uncompressed data");
140 }
141
142 Ok(())
143 }
144
145 fn open_data_file(&mut self) -> Result<()> {
147 let data_path = self
148 .info
149 .companion_path(SSTableComponent::Data, &self.base_dir);
150
151 if !data_path.exists() {
152 return Err(Error::InvalidPath(format!(
153 "Data.db file not found: {:?}",
154 data_path
155 )));
156 }
157
158 let file = File::open(&data_path).map_err(Error::Io)?;
159 let reader = BufReader::new(file);
160
161 self.data_reader = Some(reader);
162
163 debug!("Data.db file opened: {:?}", data_path);
164 Ok(())
165 }
166
167 pub fn read_raw_data(&mut self, offset: u64, length: usize) -> Result<Vec<u8>> {
171 let reader = self
172 .data_reader
173 .as_mut()
174 .ok_or_else(|| Error::InvalidState("Data reader not initialized".to_string()))?;
175
176 if let Some(decompressor) = &mut self.decompressor {
177 decompressor.read_data(reader, offset, length)
179 } else {
180 use std::io::{Read, Seek, SeekFrom};
182
183 reader.seek(SeekFrom::Start(offset)).map_err(Error::Io)?;
184
185 let mut buffer = vec![0u8; length];
186 reader.read_exact(&mut buffer).map_err(Error::Io)?;
187
188 Ok(buffer)
189 }
190 }
191
192 pub fn read_all_data(&mut self) -> Result<Vec<u8>> {
194 if let Some(decompressor) = &mut self.decompressor {
195 let reader = self
196 .data_reader
197 .as_mut()
198 .ok_or_else(|| Error::InvalidState("Data reader not initialized".to_string()))?;
199
200 decompressor.read_all_data(reader)
201 } else {
202 let reader = self
203 .data_reader
204 .as_mut()
205 .ok_or_else(|| Error::InvalidState("Data reader not initialized".to_string()))?;
206
207 use std::io::{Read, Seek, SeekFrom};
208
209 let current_pos = reader.stream_position().map_err(Error::Io)?;
211 let file_size = reader.seek(SeekFrom::End(0)).map_err(Error::Io)?;
212 reader
213 .seek(SeekFrom::Start(current_pos))
214 .map_err(Error::Io)?;
215
216 reader.seek(SeekFrom::Start(0)).map_err(Error::Io)?;
218
219 let mut buffer = Vec::with_capacity(file_size as usize);
220 reader.read_to_end(&mut buffer).map_err(Error::Io)?;
221
222 Ok(buffer)
223 }
224 }
225
226 pub fn parse_sstable_data(&mut self) -> Result<Vec<SSTableEntry>> {
231 let data = self.read_all_data()?;
232
233 info!(
234 "Parsing SSTable data ({} bytes) with format {:?}",
235 data.len(),
236 self.info.format
237 );
238
239 match &self.info.format {
240 SSTableFormat::V4x(_) | SSTableFormat::V5x(_) => self.parse_modern_format(&data),
241 SSTableFormat::V3x(_) => self.parse_v3_format(&data),
242 SSTableFormat::V2x(_) => self.parse_v2_format(&data),
243 SSTableFormat::Unknown(version) => Err(Error::UnsupportedFormat(format!(
244 "Unknown SSTable version: {}",
245 version
246 ))),
247 }
248 }
249
250 fn parse_modern_format(&self, data: &[u8]) -> Result<Vec<SSTableEntry>> {
261 warn!("EXPERIMENTAL: Parsing modern SSTable format with custom 'oa' format parsing");
262 warn!("This implementation may not fully align with Cassandra Big format specification");
263
264 if data.len() < 8 {
265 return Err(Error::InvalidFormat(
266 "Data too short for 'oa' format header".to_string(),
267 ));
268 }
269
270 let header = self.parse_oa_header(data)?;
272 debug!(
273 "Parsed 'oa' header: version={}, partition_count={}",
274 header.format_version, header.partition_count
275 );
276
277 let entries = self.parse_data_blocks(data, &header)?;
279
280 info!(
281 "Parsed {} entries from {} bytes using structured parsing",
282 entries.len(),
283 data.len()
284 );
285 Ok(entries)
286 }
287
288 pub fn parse_oa_header(&self, data: &[u8]) -> Result<OaFormatHeader> {
299 if data.len() < 32 {
300 return Err(Error::InvalidFormat(
301 "OA header must be exactly 32 bytes".to_string(),
302 ));
303 }
304
305 let header_data = &data[..32];
308
309 let magic = u32::from_be_bytes([
311 header_data[0],
312 header_data[1],
313 header_data[2],
314 header_data[3],
315 ]);
316 if magic != 0x6F61_0000 {
317 return Err(Error::InvalidFormat(format!(
318 "Invalid magic number: expected 0x6F61_0000, got 0x{:08x}",
319 magic
320 )));
321 }
322
323 let format_version = u16::from_be_bytes([header_data[4], header_data[5]]);
325 debug!("'oa' format version: {}", format_version);
326
327 if format_version != 1 {
329 return Err(Error::InvalidFormat(format!(
330 "Unsupported OA format version: {}. Only version 1 is supported.",
331 format_version
332 )));
333 }
334
335 let _flags = u32::from_be_bytes([
337 header_data[6],
338 header_data[7],
339 header_data[8],
340 header_data[9],
341 ]);
342
343 Ok(OaFormatHeader {
350 magic_number: magic,
351 format_version,
352 partition_count: 0, metadata_size: 0, header_size: 32, })
356 }
357
358 fn parse_data_blocks(&self, data: &[u8], header: &OaFormatHeader) -> Result<Vec<SSTableEntry>> {
360 let mut entries = Vec::new();
361 let mut offset = header.header_size;
362
363 if header.partition_count == 0 {
366 debug!("Header-only parsing mode - no data blocks to parse");
367 return Ok(entries);
368 }
369
370 debug!(
371 "Parsing {} partitions starting at offset {}",
372 header.partition_count, offset
373 );
374
375 for partition_idx in 0..header.partition_count {
376 if offset >= data.len() {
377 warn!(
378 "Reached end of data while parsing partition {}",
379 partition_idx
380 );
381 break;
382 }
383
384 match self.parse_partition_block(&data[offset..], partition_idx) {
385 Ok((entry, bytes_consumed)) => {
386 entries.push(entry);
387 offset += bytes_consumed;
388
389 if offset >= data.len() {
390 break;
391 }
392 }
393 Err(e) => {
394 warn!("Failed to parse partition {}: {}", partition_idx, e);
395 offset += 16; continue;
398 }
399 }
400 }
401
402 Ok(entries)
403 }
404
405 fn parse_partition_block(
407 &self,
408 data: &[u8],
409 partition_idx: u64,
410 ) -> Result<(SSTableEntry, usize)> {
411 if data.len() < 4 {
412 return Err(Error::InvalidFormat(
413 "Insufficient data for partition block".to_string(),
414 ));
415 }
416
417 let mut offset = 0;
418
419 let (key_length, vint_bytes) = self.read_vint(&data[offset..])?;
421 offset += vint_bytes;
422
423 if offset + key_length as usize > data.len() {
424 return Err(Error::InvalidFormat(
425 "Partition key extends beyond data".to_string(),
426 ));
427 }
428
429 let key_data = &data[offset..offset + key_length as usize];
431 offset += key_length as usize;
432
433 let (row_count, vint_bytes) = self.read_vint(&data[offset..])?;
435 offset += vint_bytes;
436
437 debug!(
438 "Partition {}: key_length={}, row_count={}",
439 partition_idx, key_length, row_count
440 );
441
442 let key_str = key_data
444 .iter()
445 .map(|b| format!("{:02x}", b))
446 .collect::<Vec<_>>()
447 .join("");
448
449 let entry = SSTableEntry {
453 key: crate::RowKey::from(key_data.to_vec()),
454 values: vec![crate::Value::Text(key_str)],
455 timestamp: Some(
456 std::time::SystemTime::now()
457 .duration_since(std::time::UNIX_EPOCH)
458 .unwrap()
459 .as_millis() as i64,
460 ),
461 generation: Some(self.info.generation_numeric().unwrap_or(0)),
462 format_info: format!("oa_format:partition={}", partition_idx),
463 };
464
465 Ok((entry, offset))
466 }
467
468 pub fn read_vint(&self, data: &[u8]) -> Result<(u64, usize)> {
470 match parse_vint(data) {
471 Ok((remaining, value)) => {
472 let bytes_consumed = data.len() - remaining.len();
473 Ok((value as u64, bytes_consumed))
476 }
477 Err(nom_error) => Err(Error::InvalidFormat(format!(
478 "VInt parsing failed: {:?}",
479 nom_error
480 ))),
481 }
482 }
483
484 #[allow(dead_code)]
486 fn read_varint(&self, data: &[u8]) -> Result<(u64, usize)> {
487 if data.is_empty() {
488 return Err(Error::InvalidFormat("Empty data for varint".to_string()));
489 }
490
491 let mut result = 0u64;
492 let mut shift = 0;
493 let mut bytes_read = 0;
494
495 for &byte in data {
496 bytes_read += 1;
497
498 if byte & 0x80 == 0 {
499 result |= (byte as u64) << shift;
501 break;
502 } else {
503 result |= ((byte & 0x7F) as u64) << shift;
505 shift += 7;
506
507 if shift >= 64 {
508 return Err(Error::InvalidFormat("Varint overflow".to_string()));
509 }
510 }
511 }
512
513 Ok((result, bytes_read))
514 }
515 fn parse_v3_format(&self, _data: &[u8]) -> Result<Vec<SSTableEntry>> {
517 debug!("Parsing V3.x SSTable format");
518 Ok(Vec::new())
520 }
521
522 fn parse_v2_format(&self, _data: &[u8]) -> Result<Vec<SSTableEntry>> {
524 debug!("Parsing V2.x SSTable format");
525 Ok(Vec::new())
527 }
528
529 pub fn info(&self) -> &SSTableInfo {
531 &self.info
532 }
533
534 pub fn compression_info(&self) -> Option<&CompressionInfo> {
536 self.decompressor.as_ref().map(|d| d.compression_info())
537 }
538
539 pub fn cache_stats(&self) -> Option<(usize, usize)> {
541 self.decompressor.as_ref().map(|d| d.cache_stats())
542 }
543
544 pub async fn get_header(&self) -> Result<crate::parser::header::SSTableHeader> {
546 Ok(crate::parser::header::SSTableHeader {
548 cassandra_version: match &self.info.format {
549 SSTableFormat::V5x(_) => crate::parser::header::CassandraVersion::V5_0Release,
550 SSTableFormat::V4x(_) => crate::parser::header::CassandraVersion::Legacy, SSTableFormat::V3x(_) => crate::parser::header::CassandraVersion::Legacy, SSTableFormat::V2x(_) => crate::parser::header::CassandraVersion::Legacy, SSTableFormat::Unknown(_) => crate::parser::header::CassandraVersion::V5_0Release,
554 },
555 version: 1,
556 table_id: [0; 16], keyspace: "unknown".to_string(),
558 table_name: "unknown".to_string(),
559 generation: self.info.generation_numeric().unwrap_or(0),
560 compression: crate::parser::header::CompressionInfo {
561 algorithm: "NONE".to_string(),
562 chunk_size: 65536,
563 parameters: std::collections::HashMap::new(),
564 },
565 stats: crate::parser::header::SSTableStats::default(),
566 columns: vec![],
567 properties: std::collections::HashMap::new(),
568 })
569 }
570
571 pub async fn stream_entries(&self) -> Result<SSTableEntryStream> {
573 let entries = self.parse_sstable_data_readonly()?;
574 Ok(SSTableEntryStream {
575 entries,
576 position: 0,
577 })
578 }
579
580 pub fn get_file_path(&self) -> &Path {
582 &self.base_dir
583 }
584
585 pub async fn verify_integrity(&self) -> Result<bool> {
587 match self.parse_sstable_data_readonly() {
589 Ok(_) => Ok(true),
590 Err(_) => Ok(false),
591 }
592 }
593
594 fn parse_sstable_data_readonly(&self) -> Result<Vec<SSTableEntry>> {
596 use std::fs::File;
598 use std::io::Read;
599
600 let data_file_path = self
601 .base_dir
602 .join(format!("{}-Data.db", self.info.base_name));
603 let mut file = File::open(&data_file_path)?;
604 let mut data = Vec::new();
605 file.read_to_end(&mut data)?;
606
607 self.parse_modern_format_readonly(&data)
609 }
610
611 fn parse_modern_format_readonly(&self, data: &[u8]) -> Result<Vec<SSTableEntry>> {
617 if data.len() < 8 {
618 return Err(Error::InvalidFormat(
619 "Data too short for 'oa' format".to_string(),
620 ));
621 }
622
623 match self.parse_oa_header(data) {
625 Ok(header) => self.parse_data_blocks(data, &header),
626 Err(_) => {
627 warn!("EXPERIMENTAL: 'oa' header parsing failed, using fallback");
629 warn!("Consider using spec-accurate readers for production");
630 Ok(Vec::new())
631 }
632 }
633 }
634}
635
636#[derive(Debug, Clone)]
643pub struct OaFormatHeader {
644 #[allow(dead_code)]
647 pub magic_number: u32,
648 pub format_version: u16,
650 partition_count: u64,
652 #[allow(dead_code)]
654 metadata_size: u64,
655 header_size: usize,
657}
658
659#[derive(Debug, Clone)]
661pub struct SSTableEntry {
662 pub key: crate::RowKey,
664 pub values: Vec<crate::Value>,
666 pub timestamp: Option<i64>,
668 pub generation: Option<u64>,
670 pub format_info: String,
672}
673
674pub struct SSTableEntryStream {
676 entries: Vec<SSTableEntry>,
677 position: usize,
678}
679
680impl SSTableEntryStream {
681 pub async fn next(&mut self) -> Result<Option<SSTableEntry>> {
683 if self.position < self.entries.len() {
684 let entry = self.entries[self.position].clone();
685 self.position += 1;
686 Ok(Some(entry))
687 } else {
688 Ok(None)
689 }
690 }
691}
692
693pub fn test_read_sstable_directory<P: AsRef<Path>>(dir_path: P) -> Result<()> {
695 let dir = dir_path.as_ref();
696
697 info!("Testing bulletproof SSTable reading in: {:?}", dir);
698
699 let entries = std::fs::read_dir(dir).map_err(Error::Io)?;
701
702 for entry in entries {
703 let entry = entry.map_err(Error::Io)?;
704 let path = entry.path();
705
706 if path
707 .file_name()
708 .and_then(|s| s.to_str())
709 .map(|s| s.ends_with("-Data.db"))
710 .unwrap_or(false)
711 {
712 debug!("Testing SSTable: {:?}", path);
713
714 match BulletproofReader::open(path) {
715 Ok(mut reader) => {
716 info!("Successfully opened SSTable");
717
718 if let Some(compression_info) = reader.compression_info() {
719 debug!("Compression: {}", compression_info.algorithm);
720 debug!("Chunk size: {} bytes", compression_info.chunk_length);
721 }
722
723 match reader.read_raw_data(0, 1024) {
725 Ok(data) => {
726 debug!("Read {} bytes successfully", data.len());
727 debug!(
728 "First 32 bytes: {:02x?}",
729 &data[..std::cmp::min(32, data.len())]
730 );
731
732 match reader.parse_sstable_data() {
734 Ok(entries) => {
735 info!("Parsed {} entries", entries.len());
736 for (i, entry) in entries.iter().take(3).enumerate() {
737 debug!(
738 "Entry {}: key='{:?}' ({})",
739 i, entry.key, entry.format_info
740 );
741 }
742 }
743 Err(e) => {
744 warn!("Parsing failed (this is expected for now): {}", e);
745 }
746 }
747 }
748 Err(e) => {
749 warn!("Failed to read data: {}", e);
750 }
751 }
752 }
753 Err(e) => {
754 warn!("Failed to open SSTable: {}", e);
755 }
756 }
757 }
758 }
759
760 Ok(())
761}
762
763#[cfg(test)]
764mod tests {
765 use super::*;
766
767 #[test]
768 fn test_vint_reading() -> Result<()> {
769 let reader = BulletproofReader {
770 info: SSTableInfo::from_path(&std::path::PathBuf::from("nb-1-big-Data.db")).unwrap(),
771 base_dir: std::path::PathBuf::new(),
772 decompressor: None,
773 data_reader: None,
774 };
775
776 let data = [0x0A]; let (value, bytes_read) = reader.read_vint(&data)?;
780 assert_eq!(value, 5);
781 assert_eq!(bytes_read, 1);
782
783 let data = [0x81, 0x00]; let (value, bytes_read) = reader.read_vint(&data)?;
787 assert_eq!(value, 128);
788 assert_eq!(bytes_read, 2);
789
790 let data = [0x80, 0x01]; let (value, bytes_read) = reader.read_varint(&data)?;
793 assert_eq!(value, 128);
794 assert_eq!(bytes_read, 2);
795 Ok(())
796 }
797}