Skip to main content

cqlite_core/storage/sstable/
bulletproof_reader.rs

1//! Bulletproof SSTable reader with universal format support
2//!
3//! # DEPRECATED - DO NOT USE IN PRODUCTION
4//!
5//! This module is DEPRECATED for production use. Use `SSTableReader` instead.
6//!
7//! **Deprecation Notice (Issue #190):**
8//! - This reader is marked EXPERIMENTAL and should not be used in production code paths
9//! - For production use, prefer `crate::storage::sstable::reader::SSTableReader`
10//! - This module is retained only for testing and legacy compatibility purposes
11//!
12//! ⚠️  **EXPERIMENTAL WARNING for Modern Formats (4.x/5.x)**
13//!
14//! The 'oa' format parsing implementation in this module is EXPERIMENTAL and
15//! based on reverse engineering. It may not fully align with the official
16//! Cassandra Big format specification (CEP-25). For production use with modern
17//! formats, prefer the spec-accurate readers:
18//!
19//! - `crate::storage::sstable::reader::SSTableReader` - Production-ready spec-accurate reader
20//! - `row_cell_state_machine.rs` - Implements schema-driven parsing without heuristics
21//! - Follows exact Cassandra specification for BIG format row/cell parsing
22//! - Eliminates type guessing in favor of schema-aware decoding
23
24// Allow deprecated warnings within this module since the entire module is deprecated
25#![allow(deprecated)]
26
27use log::{debug, info, warn};
28use std::fs::File;
29use std::io::BufReader;
30use std::path::{Path, PathBuf};
31
32use super::{
33    chunk_decompressor::{create_decompressor_from_file, ChunkDecompressor},
34    compression_info::CompressionInfo,
35    format_detector::{SSTableComponent, SSTableFormat, SSTableInfo},
36};
37use crate::parser::vint::parse_vint;
38use crate::{Error, Result};
39
40/// Bulletproof SSTable reader with automatic format detection
41///
42/// # Deprecated
43///
44/// This reader is DEPRECATED for production use (Issue #190).
45/// Use `crate::storage::sstable::reader::SSTableReader` instead.
46#[deprecated(
47    since = "0.1.0",
48    note = "Use SSTableReader instead. This reader is EXPERIMENTAL and not suitable for production. See Issue #190."
49)]
50pub struct BulletproofReader {
51    /// SSTable information (format, generation, etc.)
52    info: SSTableInfo,
53    /// Base directory containing SSTable files
54    base_dir: PathBuf,
55    /// Chunk decompressor (if compression is used)
56    decompressor: Option<ChunkDecompressor>,
57    /// Data file reader
58    data_reader: Option<BufReader<File>>,
59}
60
61impl Default for BulletproofReader {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67impl BulletproofReader {
68    /// Create a new bulletproof reader with default settings (for testing)
69    pub fn new() -> Self {
70        Self {
71            info: SSTableInfo::default(),
72            base_dir: PathBuf::new(),
73            decompressor: None,
74            data_reader: None,
75        }
76    }
77
78    /// Create a new bulletproof reader from any SSTable file path
79    ///
80    /// This will automatically detect the format version and set up
81    /// proper compression handling if needed.
82    pub fn open<P: AsRef<Path>>(sstable_path: P) -> Result<Self> {
83        let path = sstable_path.as_ref();
84        let info = SSTableInfo::from_path(path)?;
85
86        let base_dir = path
87            .parent()
88            .ok_or_else(|| Error::InvalidPath("No parent directory".to_string()))?
89            .to_path_buf();
90
91        info!(
92            "Opening SSTable with bulletproof reader: format={:?}, generation={}, size={}, component={:?}, base={}",
93            info.format, info.generation_numeric().unwrap_or(0), info.size, info.component, info.base_name
94        );
95
96        let mut reader = Self {
97            info,
98            base_dir,
99            decompressor: None,
100            data_reader: None,
101        };
102
103        reader.initialize()?;
104        Ok(reader)
105    }
106
107    /// Initialize the reader by setting up compression and opening files
108    fn initialize(&mut self) -> Result<()> {
109        // Set up compression if the format supports it
110        if self.info.format.supports_compression() {
111            if let Err(e) = self.setup_compression() {
112                warn!(
113                    "Compression setup failed: {}, trying without compression",
114                    e
115                );
116            }
117        }
118
119        // Open the Data.db file
120        self.open_data_file()?;
121
122        Ok(())
123    }
124
125    /// Set up compression by reading CompressionInfo.db if it exists
126    fn setup_compression(&mut self) -> Result<()> {
127        let compression_info_path = self
128            .info
129            .companion_path(SSTableComponent::CompressionInfo, &self.base_dir);
130
131        if compression_info_path.exists() {
132            debug!("Found CompressionInfo.db, setting up decompression");
133
134            let decompressor = create_decompressor_from_file(&compression_info_path)?;
135            self.decompressor = Some(decompressor);
136
137            debug!("Compression setup complete");
138        } else {
139            debug!("No CompressionInfo.db found, assuming uncompressed data");
140        }
141
142        Ok(())
143    }
144
145    /// Open the Data.db file for reading
146    fn open_data_file(&mut self) -> Result<()> {
147        let data_path = self
148            .info
149            .companion_path(SSTableComponent::Data, &self.base_dir);
150
151        if !data_path.exists() {
152            return Err(Error::InvalidPath(format!(
153                "Data.db file not found: {:?}",
154                data_path
155            )));
156        }
157
158        let file = File::open(&data_path).map_err(Error::Io)?;
159        let reader = BufReader::new(file);
160
161        self.data_reader = Some(reader);
162
163        debug!("Data.db file opened: {:?}", data_path);
164        Ok(())
165    }
166
167    /// Read raw data from the SSTable at specified offset and length
168    ///
169    /// This automatically handles compression if present
170    pub fn read_raw_data(&mut self, offset: u64, length: usize) -> Result<Vec<u8>> {
171        let reader = self
172            .data_reader
173            .as_mut()
174            .ok_or_else(|| Error::InvalidState("Data reader not initialized".to_string()))?;
175
176        if let Some(decompressor) = &mut self.decompressor {
177            // Use chunk-based decompression
178            decompressor.read_data(reader, offset, length)
179        } else {
180            // Read directly from uncompressed file
181            use std::io::{Read, Seek, SeekFrom};
182
183            reader.seek(SeekFrom::Start(offset)).map_err(Error::Io)?;
184
185            let mut buffer = vec![0u8; length];
186            reader.read_exact(&mut buffer).map_err(Error::Io)?;
187
188            Ok(buffer)
189        }
190    }
191
192    /// Read the entire SSTable data (for debugging)
193    pub fn read_all_data(&mut self) -> Result<Vec<u8>> {
194        if let Some(decompressor) = &mut self.decompressor {
195            let reader = self
196                .data_reader
197                .as_mut()
198                .ok_or_else(|| Error::InvalidState("Data reader not initialized".to_string()))?;
199
200            decompressor.read_all_data(reader)
201        } else {
202            let reader = self
203                .data_reader
204                .as_mut()
205                .ok_or_else(|| Error::InvalidState("Data reader not initialized".to_string()))?;
206
207            use std::io::{Read, Seek, SeekFrom};
208
209            // Get file size
210            let current_pos = reader.stream_position().map_err(Error::Io)?;
211            let file_size = reader.seek(SeekFrom::End(0)).map_err(Error::Io)?;
212            reader
213                .seek(SeekFrom::Start(current_pos))
214                .map_err(Error::Io)?;
215
216            // Read entire file
217            reader.seek(SeekFrom::Start(0)).map_err(Error::Io)?;
218
219            let mut buffer = Vec::with_capacity(file_size as usize);
220            reader.read_to_end(&mut buffer).map_err(Error::Io)?;
221
222            Ok(buffer)
223        }
224    }
225
226    /// Parse SSTable data using format-specific parser
227    ///
228    /// This is where we'll implement the actual SSTable parsing
229    /// based on the detected format version
230    pub fn parse_sstable_data(&mut self) -> Result<Vec<SSTableEntry>> {
231        let data = self.read_all_data()?;
232
233        info!(
234            "Parsing SSTable data ({} bytes) with format {:?}",
235            data.len(),
236            self.info.format
237        );
238
239        match &self.info.format {
240            SSTableFormat::V4x(_) | SSTableFormat::V5x(_) => self.parse_modern_format(&data),
241            SSTableFormat::V3x(_) => self.parse_v3_format(&data),
242            SSTableFormat::V2x(_) => self.parse_v2_format(&data),
243            SSTableFormat::Unknown(version) => Err(Error::UnsupportedFormat(format!(
244                "Unknown SSTable version: {}",
245                version
246            ))),
247        }
248    }
249
250    /// Parse modern SSTable format (4.x, 5.x) with EXPERIMENTAL 'oa' format parsing
251    ///
252    /// ⚠️ WARNING: EXPERIMENTAL IMPLEMENTATION
253    /// This 'oa' format parser is experimental and may not fully align with
254    /// the official Cassandra Big format specification. For production use,
255    /// prefer the spec-accurate readers in row_cell_state_machine.rs which
256    /// implement schema-driven parsing without heuristics.
257    ///
258    /// TODO: Align with CEP-25 Big format specification or deprecate in favor
259    /// of the spec-accurate state machine implementation.
260    fn parse_modern_format(&self, data: &[u8]) -> Result<Vec<SSTableEntry>> {
261        warn!("EXPERIMENTAL: Parsing modern SSTable format with custom 'oa' format parsing");
262        warn!("This implementation may not fully align with Cassandra Big format specification");
263
264        if data.len() < 8 {
265            return Err(Error::InvalidFormat(
266                "Data too short for 'oa' format header".to_string(),
267            ));
268        }
269
270        // Parse the 'oa' format header
271        let header = self.parse_oa_header(data)?;
272        debug!(
273            "Parsed 'oa' header: version={}, partition_count={}",
274            header.format_version, header.partition_count
275        );
276
277        // Parse data blocks following the header
278        let entries = self.parse_data_blocks(data, &header)?;
279
280        info!(
281            "Parsed {} entries from {} bytes using structured parsing",
282            entries.len(),
283            data.len()
284        );
285        Ok(entries)
286    }
287
288    /// Parse Cassandra 'oa' format header (EXPERIMENTAL)
289    ///
290    /// ⚠️ EXPERIMENTAL: This header parsing implementation is based on
291    /// reverse engineering and may not match the official Cassandra Big
292    /// format specification. The magic number check and field interpretations
293    /// should be verified against CEP-25 specification.
294    ///
295    /// This function strictly parses only the 32-byte header portion as per
296    /// the Cassandra SSTable format specification, handling oversized input
297    /// by reading only the first 32 bytes.
298    pub fn parse_oa_header(&self, data: &[u8]) -> Result<OaFormatHeader> {
299        if data.len() < 32 {
300            return Err(Error::InvalidFormat(
301                "OA header must be exactly 32 bytes".to_string(),
302            ));
303        }
304
305        // For header size compliance, we only read the first 32 bytes
306        // This ensures oversized input is handled correctly by ignoring extra data
307        let header_data = &data[..32];
308
309        // Read magic number (first 4 bytes) - should be 0x6F61_0000 for 'oa' format
310        let magic = u32::from_be_bytes([
311            header_data[0],
312            header_data[1],
313            header_data[2],
314            header_data[3],
315        ]);
316        if magic != 0x6F61_0000 {
317            return Err(Error::InvalidFormat(format!(
318                "Invalid magic number: expected 0x6F61_0000, got 0x{:08x}",
319                magic
320            )));
321        }
322
323        // Read format version (next 2 bytes, big-endian)
324        let format_version = u16::from_be_bytes([header_data[4], header_data[5]]);
325        debug!("'oa' format version: {}", format_version);
326
327        // Validate version - only version 1 is supported
328        if format_version != 1 {
329            return Err(Error::InvalidFormat(format!(
330                "Unsupported OA format version: {}. Only version 1 is supported.",
331                format_version
332            )));
333        }
334
335        // Read flags (4 bytes, big-endian)
336        let _flags = u32::from_be_bytes([
337            header_data[6],
338            header_data[7],
339            header_data[8],
340            header_data[9],
341        ]);
342
343        // The remaining bytes (10-31) are reserved and should be zero per spec
344        // We don't validate they are zero to maintain compatibility, but we acknowledge them
345
346        // Return header with basic structure - the 32-byte header is now fully parsed
347        // Additional metadata parsing (like partition count) should be done separately
348        // when actually parsing the SSTable content, not during header validation
349        Ok(OaFormatHeader {
350            magic_number: magic,
351            format_version,
352            partition_count: 0, // Will be populated during full SSTable parsing
353            metadata_size: 0,   // Will be populated during full SSTable parsing
354            header_size: 32,    // Fixed header size per specification
355        })
356    }
357
358    /// Parse data blocks following the 'oa' header
359    fn parse_data_blocks(&self, data: &[u8], header: &OaFormatHeader) -> Result<Vec<SSTableEntry>> {
360        let mut entries = Vec::new();
361        let mut offset = header.header_size;
362
363        // If we're only doing header compliance testing (partition_count = 0),
364        // we don't need to parse actual data blocks
365        if header.partition_count == 0 {
366            debug!("Header-only parsing mode - no data blocks to parse");
367            return Ok(entries);
368        }
369
370        debug!(
371            "Parsing {} partitions starting at offset {}",
372            header.partition_count, offset
373        );
374
375        for partition_idx in 0..header.partition_count {
376            if offset >= data.len() {
377                warn!(
378                    "Reached end of data while parsing partition {}",
379                    partition_idx
380                );
381                break;
382            }
383
384            match self.parse_partition_block(&data[offset..], partition_idx) {
385                Ok((entry, bytes_consumed)) => {
386                    entries.push(entry);
387                    offset += bytes_consumed;
388
389                    if offset >= data.len() {
390                        break;
391                    }
392                }
393                Err(e) => {
394                    warn!("Failed to parse partition {}: {}", partition_idx, e);
395                    // Try to advance by a reasonable amount to recover
396                    offset += 16; // Skip forward and try next potential partition
397                    continue;
398                }
399            }
400        }
401
402        Ok(entries)
403    }
404
405    /// Parse a single partition block
406    fn parse_partition_block(
407        &self,
408        data: &[u8],
409        partition_idx: u64,
410    ) -> Result<(SSTableEntry, usize)> {
411        if data.len() < 4 {
412            return Err(Error::InvalidFormat(
413                "Insufficient data for partition block".to_string(),
414            ));
415        }
416
417        let mut offset = 0;
418
419        // Read partition key length using VInt
420        let (key_length, vint_bytes) = self.read_vint(&data[offset..])?;
421        offset += vint_bytes;
422
423        if offset + key_length as usize > data.len() {
424            return Err(Error::InvalidFormat(
425                "Partition key extends beyond data".to_string(),
426            ));
427        }
428
429        // Read partition key
430        let key_data = &data[offset..offset + key_length as usize];
431        offset += key_length as usize;
432
433        // Read row count using VInt
434        let (row_count, vint_bytes) = self.read_vint(&data[offset..])?;
435        offset += vint_bytes;
436
437        debug!(
438            "Partition {}: key_length={}, row_count={}",
439            partition_idx, key_length, row_count
440        );
441
442        // Format key data as hex string without type assumptions
443        let key_str = key_data
444            .iter()
445            .map(|b| format!("{:02x}", b))
446            .collect::<Vec<_>>()
447            .join("");
448
449        // Skip row data for now (would need more complex parsing)
450        // For each row, we'd need to parse clustering keys, column data, etc.
451
452        let entry = SSTableEntry {
453            key: crate::RowKey::from(key_data.to_vec()),
454            values: vec![crate::Value::Text(key_str)],
455            timestamp: Some(
456                std::time::SystemTime::now()
457                    .duration_since(std::time::UNIX_EPOCH)
458                    .unwrap()
459                    .as_millis() as i64,
460            ),
461            generation: Some(self.info.generation_numeric().unwrap_or(0)),
462            format_info: format!("oa_format:partition={}", partition_idx),
463        };
464
465        Ok((entry, offset))
466    }
467
468    /// Read Variable Length Integer (VInt) from data using Cassandra format
469    pub fn read_vint(&self, data: &[u8]) -> Result<(u64, usize)> {
470        match parse_vint(data) {
471            Ok((remaining, value)) => {
472                let bytes_consumed = data.len() - remaining.len();
473                // VInts in SSTable can be negative (ZigZag encoded), but we return as u64
474                // The caller needs to interpret the sign appropriately
475                Ok((value as u64, bytes_consumed))
476            }
477            Err(nom_error) => Err(Error::InvalidFormat(format!(
478                "VInt parsing failed: {:?}",
479                nom_error
480            ))),
481        }
482    }
483
484    /// Read legacy varint format for backwards compatibility
485    #[allow(dead_code)]
486    fn read_varint(&self, data: &[u8]) -> Result<(u64, usize)> {
487        if data.is_empty() {
488            return Err(Error::InvalidFormat("Empty data for varint".to_string()));
489        }
490
491        let mut result = 0u64;
492        let mut shift = 0;
493        let mut bytes_read = 0;
494
495        for &byte in data {
496            bytes_read += 1;
497
498            if byte & 0x80 == 0 {
499                // Most significant bit is 0, this is the last byte
500                result |= (byte as u64) << shift;
501                break;
502            } else {
503                // Most significant bit is 1, more bytes follow
504                result |= ((byte & 0x7F) as u64) << shift;
505                shift += 7;
506
507                if shift >= 64 {
508                    return Err(Error::InvalidFormat("Varint overflow".to_string()));
509                }
510            }
511        }
512
513        Ok((result, bytes_read))
514    }
515    /// Parse V3.x format
516    fn parse_v3_format(&self, _data: &[u8]) -> Result<Vec<SSTableEntry>> {
517        debug!("Parsing V3.x SSTable format");
518        // TODO: Implement V3.x specific parsing
519        Ok(Vec::new())
520    }
521
522    /// Parse V2.x format
523    fn parse_v2_format(&self, _data: &[u8]) -> Result<Vec<SSTableEntry>> {
524        debug!("Parsing V2.x SSTable format");
525        // TODO: Implement V2.x specific parsing
526        Ok(Vec::new())
527    }
528
529    /// Get information about the SSTable
530    pub fn info(&self) -> &SSTableInfo {
531        &self.info
532    }
533
534    /// Get compression information if available
535    pub fn compression_info(&self) -> Option<&CompressionInfo> {
536        self.decompressor.as_ref().map(|d| d.compression_info())
537    }
538
539    /// Get cache statistics if compression is enabled
540    pub fn cache_stats(&self) -> Option<(usize, usize)> {
541        self.decompressor.as_ref().map(|d| d.cache_stats())
542    }
543
544    /// Get header information (for compatibility)
545    pub async fn get_header(&self) -> Result<crate::parser::header::SSTableHeader> {
546        // Return a basic header based on format detection
547        Ok(crate::parser::header::SSTableHeader {
548            cassandra_version: match &self.info.format {
549                SSTableFormat::V5x(_) => crate::parser::header::CassandraVersion::V5_0Release,
550                SSTableFormat::V4x(_) => crate::parser::header::CassandraVersion::Legacy, // Use Legacy for V4
551                SSTableFormat::V3x(_) => crate::parser::header::CassandraVersion::Legacy, // Use Legacy for V3
552                SSTableFormat::V2x(_) => crate::parser::header::CassandraVersion::Legacy, // Use Legacy for V2
553                SSTableFormat::Unknown(_) => crate::parser::header::CassandraVersion::V5_0Release,
554            },
555            version: 1,
556            table_id: [0; 16], // Placeholder
557            keyspace: "unknown".to_string(),
558            table_name: "unknown".to_string(),
559            generation: self.info.generation_numeric().unwrap_or(0),
560            compression: crate::parser::header::CompressionInfo {
561                algorithm: "NONE".to_string(),
562                chunk_size: 65536,
563                parameters: std::collections::HashMap::new(),
564            },
565            stats: crate::parser::header::SSTableStats::default(),
566            columns: vec![],
567            properties: std::collections::HashMap::new(),
568        })
569    }
570
571    /// Stream entries from the SSTable (for compatibility)
572    pub async fn stream_entries(&self) -> Result<SSTableEntryStream> {
573        let entries = self.parse_sstable_data_readonly()?;
574        Ok(SSTableEntryStream {
575            entries,
576            position: 0,
577        })
578    }
579
580    /// Get file path for the SSTable
581    pub fn get_file_path(&self) -> &Path {
582        &self.base_dir
583    }
584
585    /// Verify integrity of the SSTable
586    pub async fn verify_integrity(&self) -> Result<bool> {
587        // Basic integrity check - try to parse the data
588        match self.parse_sstable_data_readonly() {
589            Ok(_) => Ok(true),
590            Err(_) => Ok(false),
591        }
592    }
593
594    /// Parse SSTable data without mutable access (for compatibility)
595    fn parse_sstable_data_readonly(&self) -> Result<Vec<SSTableEntry>> {
596        // Read the data file directly
597        use std::fs::File;
598        use std::io::Read;
599
600        let data_file_path = self
601            .base_dir
602            .join(format!("{}-Data.db", self.info.base_name));
603        let mut file = File::open(&data_file_path)?;
604        let mut data = Vec::new();
605        file.read_to_end(&mut data)?;
606
607        // Parse using the existing logic but with dummy values for new fields
608        self.parse_modern_format_readonly(&data)
609    }
610
611    /// Parse modern format without mutable access using EXPERIMENTAL 'oa' format
612    ///
613    /// ⚠️ EXPERIMENTAL: This readonly parsing is experimental and should be
614    /// replaced with the spec-accurate row_cell_state_machine implementation
615    /// for production use.
616    fn parse_modern_format_readonly(&self, data: &[u8]) -> Result<Vec<SSTableEntry>> {
617        if data.len() < 8 {
618            return Err(Error::InvalidFormat(
619                "Data too short for 'oa' format".to_string(),
620            ));
621        }
622
623        // Parse using the EXPERIMENTAL 'oa' format (may not align with Big format spec)
624        match self.parse_oa_header(data) {
625            Ok(header) => self.parse_data_blocks(data, &header),
626            Err(_) => {
627                // Fallback to basic parsing if header parsing fails
628                warn!("EXPERIMENTAL: 'oa' header parsing failed, using fallback");
629                warn!("Consider using spec-accurate readers for production");
630                Ok(Vec::new())
631            }
632        }
633    }
634}
635
636/// EXPERIMENTAL Cassandra 'oa' format header structure
637///
638/// ⚠️ WARNING: This structure is based on reverse engineering and may not
639/// accurately represent the official Cassandra Big format header as specified
640/// in CEP-25. Field interpretations and byte layouts should be verified
641/// against the official specification.
642#[derive(Debug, Clone)]
643pub struct OaFormatHeader {
644    /// Magic number (EXPERIMENTAL: assumed to be 0x6F61_0000)
645    /// TODO: Verify against CEP-25 Big format specification
646    #[allow(dead_code)]
647    pub magic_number: u32,
648    /// Format version (interpretation may not match Big format spec)
649    pub format_version: u16,
650    /// Number of partitions in this SSTable (experimental field interpretation)
651    partition_count: u64,
652    /// Size of metadata section (experimental field interpretation)
653    #[allow(dead_code)]
654    metadata_size: u64,
655    /// Total header size in bytes
656    header_size: usize,
657}
658
659/// Parsed SSTable entry
660#[derive(Debug, Clone)]
661pub struct SSTableEntry {
662    /// Row key
663    pub key: crate::RowKey,
664    /// Column values
665    pub values: Vec<crate::Value>,
666    /// Write timestamp
667    pub timestamp: Option<i64>,
668    /// Generation number
669    pub generation: Option<u64>,
670    /// Format-specific information
671    pub format_info: String,
672}
673
674/// Stream of SSTable entries for iterating over large datasets
675pub struct SSTableEntryStream {
676    entries: Vec<SSTableEntry>,
677    position: usize,
678}
679
680impl SSTableEntryStream {
681    /// Get the next entry from the stream
682    pub async fn next(&mut self) -> Result<Option<SSTableEntry>> {
683        if self.position < self.entries.len() {
684            let entry = self.entries[self.position].clone();
685            self.position += 1;
686            Ok(Some(entry))
687        } else {
688            Ok(None)
689        }
690    }
691}
692
693/// Utility function to test reading an SSTable directory
694pub fn test_read_sstable_directory<P: AsRef<Path>>(dir_path: P) -> Result<()> {
695    let dir = dir_path.as_ref();
696
697    info!("Testing bulletproof SSTable reading in: {:?}", dir);
698
699    // Find Data.db files
700    let entries = std::fs::read_dir(dir).map_err(Error::Io)?;
701
702    for entry in entries {
703        let entry = entry.map_err(Error::Io)?;
704        let path = entry.path();
705
706        if path
707            .file_name()
708            .and_then(|s| s.to_str())
709            .map(|s| s.ends_with("-Data.db"))
710            .unwrap_or(false)
711        {
712            debug!("Testing SSTable: {:?}", path);
713
714            match BulletproofReader::open(path) {
715                Ok(mut reader) => {
716                    info!("Successfully opened SSTable");
717
718                    if let Some(compression_info) = reader.compression_info() {
719                        debug!("Compression: {}", compression_info.algorithm);
720                        debug!("Chunk size: {} bytes", compression_info.chunk_length);
721                    }
722
723                    // Try to read first 1KB of data
724                    match reader.read_raw_data(0, 1024) {
725                        Ok(data) => {
726                            debug!("Read {} bytes successfully", data.len());
727                            debug!(
728                                "First 32 bytes: {:02x?}",
729                                &data[..std::cmp::min(32, data.len())]
730                            );
731
732                            // Try to parse the data
733                            match reader.parse_sstable_data() {
734                                Ok(entries) => {
735                                    info!("Parsed {} entries", entries.len());
736                                    for (i, entry) in entries.iter().take(3).enumerate() {
737                                        debug!(
738                                            "Entry {}: key='{:?}' ({})",
739                                            i, entry.key, entry.format_info
740                                        );
741                                    }
742                                }
743                                Err(e) => {
744                                    warn!("Parsing failed (this is expected for now): {}", e);
745                                }
746                            }
747                        }
748                        Err(e) => {
749                            warn!("Failed to read data: {}", e);
750                        }
751                    }
752                }
753                Err(e) => {
754                    warn!("Failed to open SSTable: {}", e);
755                }
756            }
757        }
758    }
759
760    Ok(())
761}
762
763#[cfg(test)]
764mod tests {
765    use super::*;
766
767    #[test]
768    fn test_vint_reading() -> Result<()> {
769        let reader = BulletproofReader {
770            info: SSTableInfo::from_path(&std::path::PathBuf::from("nb-1-big-Data.db")).unwrap(),
771            base_dir: std::path::PathBuf::new(),
772            decompressor: None,
773            data_reader: None,
774        };
775
776        // Test simple VInt (single byte)
777        // Value 5 ZigZag-encodes to 10 (0x0A)
778        let data = [0x0A]; // Value 5 in ZigZag VInt encoding
779        let (value, bytes_read) = reader.read_vint(&data)?;
780        assert_eq!(value, 5);
781        assert_eq!(bytes_read, 1);
782
783        // Test multi-byte VInt
784        // Value 128 ZigZag-encodes to 256, which needs [0x81, 0x00]
785        let data = [0x81, 0x00]; // Value 128 in ZigZag VInt encoding (256 raw -> 128 decoded)
786        let (value, bytes_read) = reader.read_vint(&data)?;
787        assert_eq!(value, 128);
788        assert_eq!(bytes_read, 2);
789
790        // Test legacy varint for backwards compatibility
791        let data = [0x80, 0x01]; // Value 128 in legacy varint
792        let (value, bytes_read) = reader.read_varint(&data)?;
793        assert_eq!(value, 128);
794        assert_eq!(bytes_read, 2);
795        Ok(())
796    }
797}