cqlite_core/storage/sstable/reader/
mod.rs1mod block_io;
13mod cache;
14mod component_loading;
15mod compression;
16mod data_access;
17mod header;
18mod header_helpers;
19mod integrity;
20mod key_digest;
21pub(crate) mod parsing; mod partition_lookup;
23mod source;
24#[cfg(test)]
25mod tests;
26mod types;
27
28pub use types::{
30 BlockMeta, CachedBlock, IntegrityCheckResult, IntegrityStatus, SSTableReader,
31 SSTableReaderConfig, SSTableReaderHealthMetrics, SSTableReaderStats,
32};
33
34#[doc(hidden)]
36pub use parsing::PublicV5CompressedLegacyParser as V5CompressedLegacyParser;
37
38#[doc(hidden)]
40pub use compression::extract_sstable_base_name;
41
42use compression::detect_and_initialize_compression;
44use header::{
45 calculate_actual_header_size, extract_generation_from_path, parse_header_with_version_detection,
46};
47
48use std::collections::HashMap;
49use std::path::Path;
50use std::sync::atomic::{AtomicU64, AtomicUsize};
51use std::sync::Arc;
52use tokio::fs::File;
53use tokio::io::{AsyncReadExt, AsyncSeekExt};
54use tokio::sync::Mutex;
55
56use source::BlockSource;
57
58use crate::{
59 parser::{header::CassandraVersion, SSTableHeader, SSTableParser},
60 platform::Platform,
61 schema::TableSchema,
62 storage::sstable::{
63 compression_info::CompressionInfo,
64 version_gate::{BigVersionGates, VersionGates},
65 },
66 Config, Error, Result, RowKey, Value,
67};
68
69use log::debug;
71
72#[cfg(feature = "tombstones")]
73use super::tombstone_merger::TombstoneMerger;
74
75fn mmap_enabled_via_env() -> bool {
82 std::env::var("CQLITE_USE_MMAP")
83 .ok()
84 .as_deref()
85 .map(parse_truthy_env)
86 .unwrap_or(false)
87}
88
89fn parse_truthy_env(value: &str) -> bool {
93 matches!(
94 value.trim().to_ascii_lowercase().as_str(),
95 "1" | "true" | "yes" | "on"
96 )
97}
98
99impl SSTableReader {
100 pub async fn open(path: &Path, config: &Config, platform: Arc<Platform>) -> Result<Self> {
102 let mut reader_config = SSTableReaderConfig::default();
108 reader_config.use_mmap = config.storage.use_mmap || mmap_enabled_via_env();
109 reader_config.mmap_min_size_bytes = config.storage.mmap_min_size_bytes;
110
111 let file_size = tokio::fs::metadata(path).await?.len();
112
113 let use_mmap = reader_config.use_mmap
118 && file_size > 0
119 && file_size >= reader_config.mmap_min_size_bytes as u64;
120 let source = if use_mmap {
121 match Self::map_file(path) {
122 Ok(mmap) => {
123 log::debug!(
124 "Opened {} via memory map ({} bytes)",
125 path.display(),
126 file_size
127 );
128 BlockSource::mapped(Arc::new(mmap))
129 }
130 Err(e) => {
131 log::warn!(
135 "Memory-mapping {} failed ({}); falling back to buffered I/O",
136 path.display(),
137 e
138 );
139 BlockSource::buffered(File::open(path).await?)
140 }
141 }
142 } else {
143 BlockSource::buffered(File::open(path).await?)
144 };
145 let file = Arc::new(Mutex::new(source));
146
147 let header_size = std::cmp::min(4096, file_size as usize);
152 let mut header_buffer = vec![0u8; header_size];
153 {
154 let mut file_guard = file.lock().await;
155 let bytes_read = file_guard.read(&mut header_buffer).await?;
156 header_buffer.truncate(bytes_read);
157 }
158
159 let version_gates = Arc::new(match VersionGates::from_path(path) {
168 Ok(gates) => gates,
169 Err(e) => {
170 log::debug!(
171 "SSTableReader::open: could not derive VersionGates from {:?} ({}); \
172 defaulting to nb-compatible BIG gates",
173 path,
174 e
175 );
176 VersionGates::Big(BigVersionGates::nb_fallback())
177 }
178 });
179
180 if matches!(*version_gates, VersionGates::Bti(_)) {
185 return Err(Error::unsupported_format(format!(
186 "BTI (da) read support not yet implemented for '{}'. \
187 da-format SSTables use Partitions.db/Rows.db trie indexes instead of \
188 Index.db/Summary.db and require a dedicated BTI read path. \
189 See docs/reports/bti-read-support-scoping.md for the implementation plan.",
190 path.display()
191 )));
192 }
193
194 let config = crate::cql::config::ParserConfig::default();
195 let parser = SSTableParser::new(config)?;
196 let header = parse_header_with_version_detection(&header_buffer, path, &version_gates)
200 .await
201 .map_err(|e| {
202 Error::corruption(format!(
203 "Failed to parse SSTable header for file '{}': {}. This indicates either \
204 file corruption or an unsupported SSTable format. File size: {} bytes, \
205 header buffer size: {} bytes.",
206 path.display(),
207 e,
208 file_size,
209 header_buffer.len()
210 ))
211 })?;
212 let header_size = calculate_actual_header_size(&header, &header_buffer)?;
213
214 {
219 let mut file_guard = file.lock().await;
220 file_guard
221 .seek(std::io::SeekFrom::Start(header_size as u64))
222 .await?;
223 }
224
225 let compression_reader = detect_and_initialize_compression(&header, path).await?;
227
228 let compression_info = Self::load_compression_info_metadata(path, &platform).await?;
230
231 let components = Self::detect_component_files(path).await?;
233 if !components.is_empty() {
234 let integrity_issues = Self::validate_component_integrity(path, &components).await?;
235 if !integrity_issues.is_empty() {
236 log::warn!(
237 "Component integrity issues detected but proceeding with loading: {:?}",
238 integrity_issues
239 );
240 }
241 }
242
243 let index = Self::load_index(&file, &header, &platform, path).await?;
245
246 let bloom_filter = Self::load_bloom_filter(&file, &header, &platform, path).await?;
248
249 let index_reader = Self::load_index_reader(path, &platform).await;
251 let summary_reader = Self::load_summary_reader(path, &platform).await;
252 let statistics_reader = Self::load_statistics_reader(path, &platform).await;
253
254 let mut header = header; if let Some(ref stats_reader) = statistics_reader {
258 let statistics = stats_reader.statistics();
259 let partition_columns = &statistics.serialization_header_partition_keys;
260 let clustering_columns = &statistics.serialization_header_clustering_keys;
261 let regular_columns = &statistics.serialization_header_columns;
262
263 if !partition_columns.is_empty()
264 || !clustering_columns.is_empty()
265 || !regular_columns.is_empty()
266 {
267 log::debug!(
268 "Populating header columns from Statistics.db SerializationHeader: {} partition keys, {} clustering keys, {} regular columns",
269 partition_columns.len(),
270 clustering_columns.len(),
271 regular_columns.len()
272 );
273
274 let mut merged_columns = Vec::with_capacity(
275 partition_columns.len() + clustering_columns.len() + regular_columns.len(),
276 );
277 merged_columns.extend_from_slice(partition_columns);
278 merged_columns.extend_from_slice(clustering_columns);
279 merged_columns.extend_from_slice(regular_columns);
280
281 header.columns = merged_columns;
282 }
283 }
284
285 let schema = if matches!(
287 header.cassandra_version,
288 CassandraVersion::V5_0NewBig
289 | CassandraVersion::V5_0Bti
290 | CassandraVersion::V5_0DataFormat
291 | CassandraVersion::V5_0FormatC
292 | CassandraVersion::V5_0FormatD
293 | CassandraVersion::V5_0FormatE
294 | CassandraVersion::V5_0FormatF
295 | CassandraVersion::V5_0FormatG
296 ) {
297 match TableSchema::from_sstable_header(&header) {
298 Ok(s) => {
299 log::debug!(
300 "Extracted schema from SSTable header: {}.{} ({} columns, {} partition keys, {} clustering keys)",
301 s.keyspace,
302 s.table,
303 s.columns.len(),
304 s.partition_keys.len(),
305 s.clustering_keys.len()
306 );
307 Some(Arc::new(s))
308 }
309 Err(e) => {
310 log::warn!(
311 "Failed to extract schema from SSTable header for {}: {}. Schema-aware parsing will not be available.",
312 path.display(),
313 e
314 );
315 None
316 }
317 }
318 } else {
319 None
321 };
322
323 let block_count = compression_info
327 .as_ref()
328 .map(|ci| ci.chunk_offsets.len() as u64)
329 .unwrap_or(0);
330
331 let stats = SSTableReaderStats {
332 file_size,
333 entry_count: header.stats.row_count,
334 table_count: 1, block_count,
336 index_size: 0, bloom_filter_size: 0, compression_ratio: header.stats.compression_ratio,
339 cache_hit_rate: 0.0,
340 };
341
342 let generation = extract_generation_from_path(path);
344
345 Ok(Self {
346 file_path: path.to_path_buf(),
347 file,
348 header,
349 parser,
350 index,
351 bloom_filter,
352 compression_reader,
353 block_meta_cache: HashMap::new(),
354 block_cache: HashMap::new(),
355 config: reader_config,
356 platform,
357 stats,
358 cache_hits: AtomicU64::new(0),
359 cache_misses: AtomicU64::new(0),
360 #[cfg(feature = "tombstones")]
361 tombstone_merger: TombstoneMerger::new(),
362 generation,
363 actual_header_size: header_size,
364 index_reader,
365 summary_reader,
366 statistics_reader,
367 schema_registry: None, schema,
369 udt_registry: None, compression_info: compression_info.map(Arc::new),
371 current_chunk_index: AtomicUsize::new(0),
372 version_gates,
373 })
374 }
375
376 #[cfg(test)]
381 pub(crate) async fn is_mmap_backed(&self) -> bool {
382 self.file.lock().await.is_mmap()
383 }
384
385 fn map_file(path: &Path) -> Result<memmap2::Mmap> {
401 let std_file = std::fs::File::open(path)?;
402 let mmap = unsafe { memmap2::MmapOptions::new().map(&std_file)? };
405 Ok(mmap)
406 }
407
408 async fn load_compression_info_metadata(
410 path: &Path,
411 _platform: &Arc<Platform>,
412 ) -> Result<Option<CompressionInfo>> {
413 use tokio::fs::File;
414 use tokio::io::AsyncReadExt;
415
416 let parent_dir = path.parent().unwrap_or(Path::new("."));
418 let base_name = path.file_stem().and_then(|s| s.to_str()).and_then(|s| {
419 let parts: Vec<&str> = s.split('-').collect();
421 if parts.len() >= 4 {
422 Some(parts[0..3].join("-"))
423 } else {
424 None
425 }
426 });
427
428 if let Some(base) = base_name {
429 let compression_info_path = parent_dir.join(format!("{}-CompressionInfo.db", base));
430 if compression_info_path.exists() {
431 let mut file = File::open(&compression_info_path).await?;
432 let mut buffer = Vec::new();
433 file.read_to_end(&mut buffer).await?;
434
435 match CompressionInfo::parse(&buffer) {
436 Ok(info) => {
437 log::debug!(
438 "Loaded CompressionInfo: algorithm={}, chunk_length={}, chunks={}",
439 info.algorithm,
440 info.chunk_length,
441 info.chunk_offsets.len()
442 );
443 return Ok(Some(info));
444 }
445 Err(e) => {
446 log::warn!("Failed to parse CompressionInfo.db: {}", e);
447 }
448 }
449 }
450 }
451
452 Ok(None)
453 }
454
455 #[cfg(feature = "state_machine")]
457 pub fn set_schema_registry(
458 &mut self,
459 schema_registry: Arc<tokio::sync::RwLock<crate::schema::SchemaRegistry>>,
460 ) {
461 self.schema_registry = Some(schema_registry);
462 log::debug!(
463 "Schema registry set for {}.{} - enabling schema-driven digest computation",
464 self.header.keyspace,
465 self.header.table_name
466 );
467 }
468
469 #[cfg(not(feature = "state_machine"))]
471 pub fn set_schema_registry(&mut self, schema_registry: Arc<crate::schema::SchemaRegistry>) {
472 self.schema_registry = Some(schema_registry);
473 log::debug!(
474 "Schema registry set for {}.{} - enabling schema-driven digest computation",
475 self.header.keyspace,
476 self.header.table_name
477 );
478 }
479
480 pub fn set_udt_registry(&mut self, registry: crate::schema::UdtRegistry) {
485 self.udt_registry = Some(registry);
486 log::debug!(
487 "UDT registry set for {}.{} - enabling UDT-aware collection parsing",
488 self.header.keyspace,
489 self.header.table_name
490 );
491 }
492
493 pub async fn stats(&self) -> Result<&SSTableReaderStats> {
495 Ok(&self.stats)
496 }
497
498 pub async fn close(mut self) -> Result<()> {
500 debug!("Closing SSTable reader for {:?}", self.file_path);
501
502 let cache_entries = self.block_cache.len();
504 let meta_entries = self.block_meta_cache.len();
505
506 self.block_cache.clear();
507 self.block_meta_cache.clear();
508
509 debug!(
510 "Cleared {} block cache entries and {} metadata entries",
511 cache_entries, meta_entries
512 );
513
514 Ok(())
516 }
517
518 pub fn calculate_header_size(&self) -> usize {
520 self.actual_header_size
521 }
522
523 pub fn cassandra_version(&self) -> CassandraVersion {
525 self.header.cassandra_version
526 }
527
528 pub fn format_version(&self) -> Result<String> {
530 let filename = self
531 .file_path
532 .file_name()
533 .and_then(|f| f.to_str())
534 .ok_or_else(|| {
535 Error::InvalidPath(format!("Invalid SSTable filename: {:?}", self.file_path))
536 })?;
537
538 let parts: Vec<&str> = filename.split('-').collect();
539 if parts.is_empty() {
540 return Err(Error::InvalidFormat(format!(
541 "Cannot extract format version from filename: {}",
542 filename
543 )));
544 }
545
546 Ok(parts[0].to_string())
547 }
548
549 pub fn header(&self) -> &SSTableHeader {
551 &self.header
552 }
553
554 pub fn schema(&self) -> Option<&TableSchema> {
558 self.schema.as_deref()
559 }
560
561 pub fn extract_write_time_from_entry(&self, _key: &RowKey, value: &Value) -> i64 {
563 use log::warn;
564
565 match value {
566 Value::Tombstone(info) => info.deletion_time,
567 _ => std::time::SystemTime::now()
568 .duration_since(std::time::UNIX_EPOCH)
569 .map(|d| d.as_micros() as i64)
570 .unwrap_or_else(|e| {
571 warn!("Failed to get system time: {}; using fallback value 0", e);
572 0
573 }),
574 }
575 }
576}
577
578impl std::fmt::Debug for SSTableReader {
579 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
580 f.debug_struct("SSTableReader")
581 .field("file_path", &self.file_path)
582 .field("header", &self.header)
583 .field("has_index", &self.index.is_some())
584 .field("has_bloom_filter", &self.bloom_filter.is_some())
585 .field("compression", &self.header.compression.algorithm)
586 .field("stats", &self.stats)
587 .finish()
588 }
589}
590
591pub async fn open_sstable_reader(
593 path: &Path,
594 config: &Config,
595 platform: Arc<Platform>,
596) -> Result<SSTableReader> {
597 SSTableReader::open(path, config, platform).await
598}