Skip to main content

cqlite_core/storage/sstable/reader/
mod.rs

1//! SSTable reader implementation
2//!
3//! This module provides efficient reading of SSTable files in Cassandra 5+ format.
4//! It supports:
5//! - Block-based reading with compression
6//! - Index-based lookups for efficient queries
7//! - Memory-efficient streaming
8//! - Bloom filter integration
9//! - Multiple compression algorithms
10
11// Submodules
12mod block_io;
13mod cache;
14mod component_loading;
15mod compression;
16mod data_access;
17mod header;
18mod header_helpers;
19mod integrity;
20mod key_digest;
21pub(crate) mod parsing; // Needs to be accessible from row_cell_state_machine
22mod partition_lookup;
23mod source;
24#[cfg(test)]
25mod tests;
26mod types;
27
28// Re-export public types
29pub use types::{
30    BlockMeta, CachedBlock, IntegrityCheckResult, IntegrityStatus, SSTableReader,
31    SSTableReaderConfig, SSTableReaderHealthMetrics, SSTableReaderStats,
32};
33
34// Re-export V5CompressedLegacyParser for integration testing (Issue #166 regression tests)
35#[doc(hidden)]
36pub use parsing::PublicV5CompressedLegacyParser as V5CompressedLegacyParser;
37
38// Re-export compression utilities for testing (Issue #202)
39#[doc(hidden)]
40pub use compression::extract_sstable_base_name;
41
42// Internal imports from submodules
43use compression::detect_and_initialize_compression;
44use header::{
45    calculate_actual_header_size, extract_generation_from_path, parse_header_with_version_detection,
46};
47
48use std::collections::HashMap;
49use std::path::Path;
50use std::sync::atomic::{AtomicU64, AtomicUsize};
51use std::sync::Arc;
52use tokio::fs::File;
53use tokio::io::{AsyncReadExt, AsyncSeekExt};
54use tokio::sync::Mutex;
55
56use source::BlockSource;
57
58use crate::{
59    parser::{header::CassandraVersion, SSTableHeader, SSTableParser},
60    platform::Platform,
61    schema::TableSchema,
62    storage::sstable::{
63        compression_info::CompressionInfo,
64        version_gate::{BigVersionGates, VersionGates},
65    },
66    Config, Error, Result, RowKey, Value,
67};
68
69// Structured logging
70use log::debug;
71
72#[cfg(feature = "tombstones")]
73use super::tombstone_merger::TombstoneMerger;
74
75/// Returns `true` when memory-mapped reads are force-enabled via the
76/// `CQLITE_USE_MMAP` environment variable.
77///
78/// Accepts `1`, `true`, `yes`, `on` (case-insensitive). Any other value — or
79/// an unset variable — leaves the decision to [`Config`]. This is an opt-in
80/// escape hatch for ad-hoc local use without threading a custom config.
81fn mmap_enabled_via_env() -> bool {
82    std::env::var("CQLITE_USE_MMAP")
83        .ok()
84        .as_deref()
85        .map(parse_truthy_env)
86        .unwrap_or(false)
87}
88
89/// Parse a truthy environment-variable value (`1`/`true`/`yes`/`on`,
90/// case-insensitive). Split out so it can be unit-tested without mutating the
91/// process-global environment (which would race other `open()` tests).
92fn parse_truthy_env(value: &str) -> bool {
93    matches!(
94        value.trim().to_ascii_lowercase().as_str(),
95        "1" | "true" | "yes" | "on"
96    )
97}
98
99impl SSTableReader {
100    /// Open an SSTable file for reading
101    pub async fn open(path: &Path, config: &Config, platform: Arc<Platform>) -> Result<Self> {
102        // Honor the caller's storage config for the mmap decision. Memory
103        // mapping is opt-in (buffered I/O is the portable default); it can be
104        // turned on via `config.storage.use_mmap` or the `CQLITE_USE_MMAP`
105        // environment variable. See `Config::storage.use_mmap` for the
106        // platform/filesystem safety constraints.
107        let mut reader_config = SSTableReaderConfig::default();
108        reader_config.use_mmap = config.storage.use_mmap || mmap_enabled_via_env();
109        reader_config.mmap_min_size_bytes = config.storage.mmap_min_size_bytes;
110
111        let file_size = tokio::fs::metadata(path).await?.len();
112
113        // Select the backing store for block I/O. Memory-map the file when mmap
114        // is enabled and the file is large enough to be worth it; otherwise use
115        // buffered file I/O. Mapping a zero-length file is invalid, so empty
116        // files always fall back to buffered reads.
117        let use_mmap = reader_config.use_mmap
118            && file_size > 0
119            && file_size >= reader_config.mmap_min_size_bytes as u64;
120        let source = if use_mmap {
121            match Self::map_file(path) {
122                Ok(mmap) => {
123                    log::debug!(
124                        "Opened {} via memory map ({} bytes)",
125                        path.display(),
126                        file_size
127                    );
128                    BlockSource::mapped(Arc::new(mmap))
129                }
130                Err(e) => {
131                    // Memory mapping can fail on some filesystems (e.g. certain
132                    // network mounts). Degrade gracefully to buffered I/O rather
133                    // than failing the open outright.
134                    log::warn!(
135                        "Memory-mapping {} failed ({}); falling back to buffered I/O",
136                        path.display(),
137                        e
138                    );
139                    BlockSource::buffered(File::open(path).await?)
140                }
141            }
142        } else {
143            BlockSource::buffered(File::open(path).await?)
144        };
145        let file = Arc::new(Mutex::new(source));
146
147        // Parse header - read available bytes, not a fixed size
148        // NOTE: For NB format files (Cassandra 4.x+), Data.db often contains compressed row data
149        // with no embedded header. The header.rs module detects this via filename pattern and
150        // returns a minimal header loaded from Statistics.db instead.
151        let header_size = std::cmp::min(4096, file_size as usize);
152        let mut header_buffer = vec![0u8; header_size];
153        {
154            let mut file_guard = file.lock().await;
155            let bytes_read = file_guard.read(&mut header_buffer).await?;
156            header_buffer.truncate(bytes_read);
157        }
158
159        // Derive VersionGates from the SSTable filename BEFORE header parsing so
160        // parse_header_with_version_detection can receive them.  Gates are derived
161        // solely from the filename and need no file I/O, so this is safe to do here.
162        //
163        // Falls back to nb-compatible BIG gates when the filename is not a valid
164        // SSTable descriptor (e.g. paths used in unit tests).  Using nb-fallback
165        // maintains existing behaviour — the gates will not change parsing
166        // decisions until VG3 actually flips behaviour.
167        let version_gates = Arc::new(match VersionGates::from_path(path) {
168            Ok(gates) => gates,
169            Err(e) => {
170                log::debug!(
171                    "SSTableReader::open: could not derive VersionGates from {:?} ({}); \
172                     defaulting to nb-compatible BIG gates",
173                    path,
174                    e
175                );
176                VersionGates::Big(BigVersionGates::nb_fallback())
177            }
178        });
179
180        // VG5: BTI (da) read support is not yet implemented.
181        // Detect the BTI format early — before header parsing — and return a
182        // structured, actionable error rather than a confusing parse failure.
183        // Full BTI reading is tracked in the scoping issue created by issue #657.
184        if matches!(*version_gates, VersionGates::Bti(_)) {
185            return Err(Error::unsupported_format(format!(
186                "BTI (da) read support not yet implemented for '{}'. \
187                 da-format SSTables use Partitions.db/Rows.db trie indexes instead of \
188                 Index.db/Summary.db and require a dedicated BTI read path. \
189                 See docs/reports/bti-read-support-scoping.md for the implementation plan.",
190                path.display()
191            )));
192        }
193
194        let config = crate::cql::config::ParserConfig::default();
195        let parser = SSTableParser::new(config)?;
196        // Parse the header using enhanced version detection - strict error propagation.
197        // VersionGates are passed so VG3 can flip version-sensitive parsing decisions
198        // inside header parsing without re-deriving gates from the filename.
199        let header = parse_header_with_version_detection(&header_buffer, path, &version_gates)
200            .await
201            .map_err(|e| {
202                Error::corruption(format!(
203                    "Failed to parse SSTable header for file '{}': {}. This indicates either \
204                     file corruption or an unsupported SSTable format. File size: {} bytes, \
205                     header buffer size: {} bytes.",
206                    path.display(),
207                    e,
208                    file_size,
209                    header_buffer.len()
210                ))
211            })?;
212        let header_size = calculate_actual_header_size(&header, &header_buffer)?;
213
214        // Schema extraction deferred until after Statistics.db columns are loaded (Issue #163)
215        // See schema extraction code after statistics_reader loading below
216
217        // Seek to start of data section
218        {
219            let mut file_guard = file.lock().await;
220            file_guard
221                .seek(std::io::SeekFrom::Start(header_size as u64))
222                .await?;
223        }
224
225        // Initialize compression reader with improved format detection
226        let compression_reader = detect_and_initialize_compression(&header, path).await?;
227
228        // Load CompressionInfo.db for chunked decompression (if it exists)
229        let compression_info = Self::load_compression_info_metadata(path, &platform).await?;
230
231        // Pre-validate component architecture for better error handling
232        let components = Self::detect_component_files(path).await?;
233        if !components.is_empty() {
234            let integrity_issues = Self::validate_component_integrity(path, &components).await?;
235            if !integrity_issues.is_empty() {
236                log::warn!(
237                    "Component integrity issues detected but proceeding with loading: {:?}",
238                    integrity_issues
239                );
240            }
241        }
242
243        // Load index if available (supports both integrated and component-based)
244        let index = Self::load_index(&file, &header, &platform, path).await?;
245
246        // Load bloom filter if available (supports both integrated and component-based)
247        let bloom_filter = Self::load_bloom_filter(&file, &header, &platform, path).await?;
248
249        // Load spec readers for enhanced metadata and lookups
250        let index_reader = Self::load_index_reader(path, &platform).await;
251        let summary_reader = Self::load_summary_reader(path, &platform).await;
252        let statistics_reader = Self::load_statistics_reader(path, &platform).await;
253
254        // Extract SerializationHeader columns from Statistics.db (Issue #163)
255        // This enables schema extraction for V5CompressedLegacy format
256        let mut header = header; // Make mutable to populate columns
257        if let Some(ref stats_reader) = statistics_reader {
258            let statistics = stats_reader.statistics();
259            let partition_columns = &statistics.serialization_header_partition_keys;
260            let clustering_columns = &statistics.serialization_header_clustering_keys;
261            let regular_columns = &statistics.serialization_header_columns;
262
263            if !partition_columns.is_empty()
264                || !clustering_columns.is_empty()
265                || !regular_columns.is_empty()
266            {
267                log::debug!(
268                    "Populating header columns from Statistics.db SerializationHeader: {} partition keys, {} clustering keys, {} regular columns",
269                    partition_columns.len(),
270                    clustering_columns.len(),
271                    regular_columns.len()
272                );
273
274                let mut merged_columns = Vec::with_capacity(
275                    partition_columns.len() + clustering_columns.len() + regular_columns.len(),
276                );
277                merged_columns.extend_from_slice(partition_columns);
278                merged_columns.extend_from_slice(clustering_columns);
279                merged_columns.extend_from_slice(regular_columns);
280
281                header.columns = merged_columns;
282            }
283        }
284
285        // Extract schema from header for V5.0+ formats (after columns are populated)
286        let schema = if matches!(
287            header.cassandra_version,
288            CassandraVersion::V5_0NewBig
289                | CassandraVersion::V5_0Bti
290                | CassandraVersion::V5_0DataFormat
291                | CassandraVersion::V5_0FormatC
292                | CassandraVersion::V5_0FormatD
293                | CassandraVersion::V5_0FormatE
294                | CassandraVersion::V5_0FormatF
295                | CassandraVersion::V5_0FormatG
296        ) {
297            match TableSchema::from_sstable_header(&header) {
298                Ok(s) => {
299                    log::debug!(
300                        "Extracted schema from SSTable header: {}.{} ({} columns, {} partition keys, {} clustering keys)",
301                        s.keyspace,
302                        s.table,
303                        s.columns.len(),
304                        s.partition_keys.len(),
305                        s.clustering_keys.len()
306                    );
307                    Some(Arc::new(s))
308                }
309                Err(e) => {
310                    log::warn!(
311                        "Failed to extract schema from SSTable header for {}: {}. Schema-aware parsing will not be available.",
312                        path.display(),
313                        e
314                    );
315                    None
316                }
317            }
318        } else {
319            // Legacy formats don't have schema in header
320            None
321        };
322
323        // Derive block_count from CompressionInfo.db when available — this is the
324        // authoritative source for compressed SSTables (no-heuristics mandate #28).
325        // Each entry in chunk_offsets corresponds to one compressed block in Data.db.
326        let block_count = compression_info
327            .as_ref()
328            .map(|ci| ci.chunk_offsets.len() as u64)
329            .unwrap_or(0);
330
331        let stats = SSTableReaderStats {
332            file_size,
333            entry_count: header.stats.row_count,
334            table_count: 1, // Will be updated as we discover tables
335            block_count,
336            index_size: 0,        // Will be updated if index is loaded
337            bloom_filter_size: 0, // Will be updated if bloom filter is loaded
338            compression_ratio: header.stats.compression_ratio,
339            cache_hit_rate: 0.0,
340        };
341
342        // Extract generation from filename or use default
343        let generation = extract_generation_from_path(path);
344
345        Ok(Self {
346            file_path: path.to_path_buf(),
347            file,
348            header,
349            parser,
350            index,
351            bloom_filter,
352            compression_reader,
353            block_meta_cache: HashMap::new(),
354            block_cache: HashMap::new(),
355            config: reader_config,
356            platform,
357            stats,
358            cache_hits: AtomicU64::new(0),
359            cache_misses: AtomicU64::new(0),
360            #[cfg(feature = "tombstones")]
361            tombstone_merger: TombstoneMerger::new(),
362            generation,
363            actual_header_size: header_size,
364            index_reader,
365            summary_reader,
366            statistics_reader,
367            schema_registry: None, // Will be set by set_schema_registry() after construction
368            schema,
369            udt_registry: None, // Will be set when available for UDT-aware parsing
370            compression_info: compression_info.map(Arc::new),
371            current_chunk_index: AtomicUsize::new(0),
372            version_gates,
373        })
374    }
375
376    /// Whether this reader's block source is backed by a memory map.
377    ///
378    /// Test-only hook used to verify that the `use_mmap` config / env wiring
379    /// actually selects the intended backend end-to-end.
380    #[cfg(test)]
381    pub(crate) async fn is_mmap_backed(&self) -> bool {
382        self.file.lock().await.is_mmap()
383    }
384
385    /// Memory-map an SSTable file read-only.
386    ///
387    /// # Safety / correctness
388    ///
389    /// The returned [`Mmap`](memmap2::Mmap) aliases the file's bytes for its
390    /// entire lifetime. SSTables are immutable once written, and CQLite treats
391    /// them as read-only inputs, so this matches Cassandra's own mmap read
392    /// strategy. Mutating the underlying file while a reader is open is
393    /// undefined behaviour — callers must not do so.
394    ///
395    /// Note that only the initial mapping is fallible here. Once mapped, a later
396    /// page fault — caused by truncation, deletion, or a network/overlay
397    /// filesystem hiccup — raises `SIGBUS` and **cannot** be recovered as an
398    /// `io::Error`. This is why mmap is opt-in and gated on immutable local
399    /// files; see [`Config`]'s `storage.use_mmap` for the full constraints.
400    fn map_file(path: &Path) -> Result<memmap2::Mmap> {
401        let std_file = std::fs::File::open(path)?;
402        // SAFETY: read-only mapping of a file assumed immutable for the
403        // reader's lifetime; see the function-level note above.
404        let mmap = unsafe { memmap2::MmapOptions::new().map(&std_file)? };
405        Ok(mmap)
406    }
407
408    /// Load CompressionInfo.db metadata for chunked reading
409    async fn load_compression_info_metadata(
410        path: &Path,
411        _platform: &Arc<Platform>,
412    ) -> Result<Option<CompressionInfo>> {
413        use tokio::fs::File;
414        use tokio::io::AsyncReadExt;
415
416        // Try to find CompressionInfo.db in same directory
417        let parent_dir = path.parent().unwrap_or(Path::new("."));
418        let base_name = path.file_stem().and_then(|s| s.to_str()).and_then(|s| {
419            // Extract base name: "nb-1-big-Data.db" -> "nb-1-big"
420            let parts: Vec<&str> = s.split('-').collect();
421            if parts.len() >= 4 {
422                Some(parts[0..3].join("-"))
423            } else {
424                None
425            }
426        });
427
428        if let Some(base) = base_name {
429            let compression_info_path = parent_dir.join(format!("{}-CompressionInfo.db", base));
430            if compression_info_path.exists() {
431                let mut file = File::open(&compression_info_path).await?;
432                let mut buffer = Vec::new();
433                file.read_to_end(&mut buffer).await?;
434
435                match CompressionInfo::parse(&buffer) {
436                    Ok(info) => {
437                        log::debug!(
438                            "Loaded CompressionInfo: algorithm={}, chunk_length={}, chunks={}",
439                            info.algorithm,
440                            info.chunk_length,
441                            info.chunk_offsets.len()
442                        );
443                        return Ok(Some(info));
444                    }
445                    Err(e) => {
446                        log::warn!("Failed to parse CompressionInfo.db: {}", e);
447                    }
448                }
449            }
450        }
451
452        Ok(None)
453    }
454
455    /// Set the schema registry for schema-driven operations
456    #[cfg(feature = "state_machine")]
457    pub fn set_schema_registry(
458        &mut self,
459        schema_registry: Arc<tokio::sync::RwLock<crate::schema::SchemaRegistry>>,
460    ) {
461        self.schema_registry = Some(schema_registry);
462        log::debug!(
463            "Schema registry set for {}.{} - enabling schema-driven digest computation",
464            self.header.keyspace,
465            self.header.table_name
466        );
467    }
468
469    /// Set the schema registry for schema-driven operations (non-state_machine builds)
470    #[cfg(not(feature = "state_machine"))]
471    pub fn set_schema_registry(&mut self, schema_registry: Arc<crate::schema::SchemaRegistry>) {
472        self.schema_registry = Some(schema_registry);
473        log::debug!(
474            "Schema registry set for {}.{} - enabling schema-driven digest computation",
475            self.header.keyspace,
476            self.header.table_name
477        );
478    }
479
480    /// Set the UDT registry for UDT-aware parsing in collections
481    ///
482    /// This enables proper parsing of UDTs inside collections (List, Set, Map)
483    /// by providing the UDT field definitions needed for nested type resolution.
484    pub fn set_udt_registry(&mut self, registry: crate::schema::UdtRegistry) {
485        self.udt_registry = Some(registry);
486        log::debug!(
487            "UDT registry set for {}.{} - enabling UDT-aware collection parsing",
488            self.header.keyspace,
489            self.header.table_name
490        );
491    }
492
493    /// Get reader statistics
494    pub async fn stats(&self) -> Result<&SSTableReaderStats> {
495        Ok(&self.stats)
496    }
497
498    /// Close the reader and release resources
499    pub async fn close(mut self) -> Result<()> {
500        debug!("Closing SSTable reader for {:?}", self.file_path);
501
502        // Clear caches and log cache statistics
503        let cache_entries = self.block_cache.len();
504        let meta_entries = self.block_meta_cache.len();
505
506        self.block_cache.clear();
507        self.block_meta_cache.clear();
508
509        debug!(
510            "Cleared {} block cache entries and {} metadata entries",
511            cache_entries, meta_entries
512        );
513
514        // File will be closed automatically when dropped
515        Ok(())
516    }
517
518    /// Calculate header size based on format and actual header content
519    pub fn calculate_header_size(&self) -> usize {
520        self.actual_header_size
521    }
522
523    /// Get the Cassandra version from the SSTable header
524    pub fn cassandra_version(&self) -> CassandraVersion {
525        self.header.cassandra_version
526    }
527
528    /// Get the SSTable format version string
529    pub fn format_version(&self) -> Result<String> {
530        let filename = self
531            .file_path
532            .file_name()
533            .and_then(|f| f.to_str())
534            .ok_or_else(|| {
535                Error::InvalidPath(format!("Invalid SSTable filename: {:?}", self.file_path))
536            })?;
537
538        let parts: Vec<&str> = filename.split('-').collect();
539        if parts.is_empty() {
540            return Err(Error::InvalidFormat(format!(
541                "Cannot extract format version from filename: {}",
542                filename
543            )));
544        }
545
546        Ok(parts[0].to_string())
547    }
548
549    /// Get a reference to the SSTable header
550    pub fn header(&self) -> &SSTableHeader {
551        &self.header
552    }
553
554    /// Get the table schema extracted from the SSTable header
555    ///
556    /// Returns `None` for legacy formats or if schema extraction failed.
557    pub fn schema(&self) -> Option<&TableSchema> {
558        self.schema.as_deref()
559    }
560
561    /// Extract write time from entry metadata
562    pub fn extract_write_time_from_entry(&self, _key: &RowKey, value: &Value) -> i64 {
563        use log::warn;
564
565        match value {
566            Value::Tombstone(info) => info.deletion_time,
567            _ => std::time::SystemTime::now()
568                .duration_since(std::time::UNIX_EPOCH)
569                .map(|d| d.as_micros() as i64)
570                .unwrap_or_else(|e| {
571                    warn!("Failed to get system time: {}; using fallback value 0", e);
572                    0
573                }),
574        }
575    }
576}
577
578impl std::fmt::Debug for SSTableReader {
579    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
580        f.debug_struct("SSTableReader")
581            .field("file_path", &self.file_path)
582            .field("header", &self.header)
583            .field("has_index", &self.index.is_some())
584            .field("has_bloom_filter", &self.bloom_filter.is_some())
585            .field("compression", &self.header.compression.algorithm)
586            .field("stats", &self.stats)
587            .finish()
588    }
589}
590
591/// Helper function to create a reader with default configuration
592pub async fn open_sstable_reader(
593    path: &Path,
594    config: &Config,
595    platform: Arc<Platform>,
596) -> Result<SSTableReader> {
597    SSTableReader::open(path, config, platform).await
598}