use super::{compression::extract_sstable_base_name, SSTableReader};
use crate::platform::Platform;
use crate::storage::sstable::{
bloom::BloomFilter, index::SSTableIndex, index_reader::IndexReader,
statistics_reader::StatisticsReader, summary_reader::SummaryReader,
};
use crate::{Error, Result, RowKey};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncSeekExt, BufReader};
use super::source::BlockSource;
impl SSTableReader {
pub(super) async fn load_index(
file: &Arc<tokio::sync::Mutex<BlockSource>>,
header: &crate::parser::SSTableHeader,
platform: &Arc<Platform>,
data_file_path: &Path,
) -> Result<Option<SSTableIndex>> {
if let Some(index_offset) = header.properties.get("index_offset") {
let offset: u64 = index_offset
.parse()
.map_err(|_| Error::corruption("Invalid index offset in header"))?;
{
let mut file_guard = file.lock().await;
file_guard.seek(std::io::SeekFrom::Start(offset)).await?;
let index = SSTableIndex::load(&mut *file_guard).await?;
log::debug!("Loaded integrated index from Data.db at offset {}", offset);
return Ok(Some(index));
}
}
if let Some(base_name) = extract_sstable_base_name(data_file_path) {
let index_path = data_file_path
.parent()
.ok_or_else(|| {
Error::invalid_operation("Cannot determine parent directory for Index.db")
})?
.join(format!("{}-Index.db", base_name));
if tokio::fs::metadata(&index_path).await.is_ok() {
match IndexReader::open(&index_path, platform.clone()).await {
Ok(index_reader) => {
log::debug!(
"Found separate Index.db component at {}",
index_path.display()
);
match Self::convert_index_reader_to_sstable_index(
index_reader,
data_file_path,
)
.await
{
Ok(sstable_index) => {
log::debug!(
"Successfully converted Index.db component to SSTableIndex"
);
return Ok(Some(sstable_index));
}
Err(e) => {
log::warn!(
"Failed to convert Index.db component to SSTableIndex: {}. This may indicate an incompatible Index.db format or corruption.",
e
);
}
}
}
Err(e) => {
log::debug!(
"Failed to load Index.db component: {}. This may indicate file corruption, permission issues, or format incompatibility.",
e
);
}
}
} else {
log::debug!(
"No Index.db component file found at {}",
index_path.display()
);
}
}
log::debug!("No index source available (neither header offset nor Index.db component)");
Ok(None)
}
pub(super) async fn load_bloom_filter(
file: &Arc<tokio::sync::Mutex<BlockSource>>,
header: &crate::parser::SSTableHeader,
_platform: &Arc<Platform>,
data_file_path: &Path,
) -> Result<Option<BloomFilter>> {
if let Some(bloom_offset) = header.properties.get("bloom_filter_offset") {
let offset: u64 = bloom_offset
.parse()
.map_err(|_| Error::corruption("Invalid bloom filter offset in header"))?;
{
let mut file_guard = file.lock().await;
file_guard.seek(std::io::SeekFrom::Start(offset)).await?;
let bloom_filter = BloomFilter::load(&mut *file_guard).await?;
log::debug!(
"Loaded integrated bloom filter from Data.db at offset {}",
offset
);
return Ok(Some(bloom_filter));
}
}
if let Some(base_name) = extract_sstable_base_name(data_file_path) {
let filter_path = data_file_path
.parent()
.ok_or_else(|| {
Error::invalid_operation("Cannot determine parent directory for Filter.db")
})?
.join(format!("{}-Filter.db", base_name));
if tokio::fs::metadata(&filter_path).await.is_ok() {
match tokio::fs::File::open(&filter_path).await {
Ok(filter_file) => {
let mut reader = BufReader::new(filter_file);
match BloomFilter::load(&mut reader).await {
Ok(bloom_filter) => {
log::debug!(
"Loaded separate Filter.db component from {}",
filter_path.display()
);
return Ok(Some(bloom_filter));
}
Err(e) => {
log::warn!(
"Failed to parse Filter.db component: {}. Bloom filter functionality will be unavailable.",
e
);
}
}
}
Err(e) => {
log::debug!(
"Failed to open Filter.db component: {}. Bloom filter functionality will be unavailable.",
e
);
}
}
} else {
log::debug!(
"No Filter.db component file found at {}",
filter_path.display()
);
}
}
log::debug!(
"No bloom filter source available (neither header offset nor Filter.db component)"
);
Ok(None)
}
pub(super) async fn load_index_reader(
path: &Path,
platform: &Arc<Platform>,
) -> Option<IndexReader> {
let base_name = extract_sstable_base_name(path)?;
let index_path = path.parent()?.join(format!("{}-Index.db", base_name));
match IndexReader::open(&index_path, platform.clone()).await {
Ok(reader) => {
log::debug!("Loaded Index.db reader for {}", index_path.display());
Some(reader)
}
Err(e) => {
log::debug!("Failed to load Index.db reader: {}", e);
None
}
}
}
pub(super) async fn load_summary_reader(
path: &Path,
platform: &Arc<Platform>,
) -> Option<SummaryReader> {
let base_name = extract_sstable_base_name(path)?;
let summary_path = path.parent()?.join(format!("{}-Summary.db", base_name));
match SummaryReader::open(&summary_path, platform.clone()).await {
Ok(reader) => {
log::debug!("Loaded Summary.db reader for {}", summary_path.display());
Some(reader)
}
Err(e) => {
log::debug!("Failed to load Summary.db reader: {}", e);
None
}
}
}
pub(super) async fn load_statistics_reader(
path: &Path,
platform: &Arc<Platform>,
) -> Option<StatisticsReader> {
let base_name = extract_sstable_base_name(path)?;
let statistics_path = path.parent()?.join(format!("{}-Statistics.db", base_name));
match StatisticsReader::open(&statistics_path, platform.clone()).await {
Ok(reader) => {
log::debug!(
"Loaded Statistics.db reader for {}",
statistics_path.display()
);
Some(reader)
}
Err(e) => {
log::warn!(
"Failed to load Statistics.db from {}: {}. Timestamp delta decoding will use zero base values.",
statistics_path.display(),
e
);
None
}
}
}
fn extract_keyspace_and_table(sstable_path: &Path) -> Result<(String, String)> {
let table_name =
crate::storage::sstable::extract_table_name(sstable_path).ok_or_else(|| {
Error::invalid_path(format!(
"Cannot extract table name from SSTable path: {}",
sstable_path.display()
))
})?;
let keyspace = sstable_path
.parent() .and_then(|p| p.parent()) .and_then(|p| p.file_name()) .and_then(|n| n.to_str())
.map(|s| s.to_string())
.ok_or_else(|| {
Error::invalid_path(format!(
"Cannot extract keyspace from SSTable path: {}. \
Expected Cassandra directory structure: <data_dir>/<keyspace>/<table-uuid>/file",
sstable_path.display()
))
})?;
log::debug!(
"Extracted keyspace='{}', table='{}' from path: {}",
keyspace,
table_name,
sstable_path.display()
);
Ok((keyspace, table_name))
}
pub(super) async fn convert_index_reader_to_sstable_index(
index_reader: IndexReader,
data_file_path: &Path,
) -> Result<SSTableIndex> {
use crate::storage::sstable::index::{Index, IndexEntry};
let (keyspace, table_name) = Self::extract_keyspace_and_table(data_file_path)?;
let table_id = crate::types::TableId::new(format!("{}.{}", keyspace, table_name));
let mut index = Index::new();
let partition_entries = index_reader.get_partition_entries();
for partition_entry in partition_entries {
let index_entry = IndexEntry {
table_id: table_id.clone(),
key: RowKey::new(partition_entry.key_digest.to_vec()),
offset: partition_entry.data_offset,
size: partition_entry.data_size,
compressed: false,
};
index.add_entry(index_entry);
}
log::debug!(
"Converted {} partition entries from IndexReader to SSTableIndex for table '{}' (keyspace: {}, table: {})",
partition_entries.len(),
table_id.name(),
keyspace,
table_name
);
Ok(index)
}
pub(super) async fn detect_component_files(
data_path: &Path,
) -> Result<HashMap<String, PathBuf>> {
let mut components = HashMap::new();
let base_name = match extract_sstable_base_name(data_path) {
Some(name) => name,
None => {
log::warn!(
"Could not extract base name from path: {}. Component file discovery requires standard SSTable naming convention.",
data_path.display()
);
return Ok(components);
}
};
let parent_dir = data_path.parent().ok_or_else(|| {
Error::invalid_operation("Cannot determine parent directory for component files")
})?;
let component_types = [
("Index", true), ("Filter", false), ("Summary", false), ("Statistics", false), ("CompressionInfo", false), ("TOC", false), ("Digest", false), ];
let mut critical_missing = Vec::new();
for (component_type, is_critical) in &component_types {
let component_path = parent_dir.join(format!("{}-{}.db", base_name, component_type));
match tokio::fs::metadata(&component_path).await {
Ok(metadata) => {
if metadata.len() == 0 {
log::warn!("Component file is empty: {}", component_path.display());
if *is_critical {
critical_missing.push(component_type.to_string());
}
} else {
log::debug!(
"Found component file: {} (size: {} bytes)",
component_path.display(),
metadata.len()
);
components.insert(component_type.to_string(), component_path);
}
}
Err(_) => {
log::debug!("Component file not found: {}", component_path.display());
if *is_critical {
critical_missing.push(component_type.to_string());
}
}
}
}
if components.is_empty() {
log::debug!(
"No component files found for base name: {}. This SSTable likely uses integrated format (all data in Data.db).",
base_name
);
} else {
log::debug!(
"Detected {} component files for {} (component-based architecture)",
components.len(),
base_name
);
if !critical_missing.is_empty() {
log::warn!(
"Missing critical component files: {:?}. Index-based lookups may be unavailable.",
critical_missing
);
}
}
Ok(components)
}
pub(super) async fn validate_component_integrity(
data_path: &Path,
components: &HashMap<String, PathBuf>,
) -> Result<Vec<String>> {
let mut issues = Vec::new();
match tokio::fs::metadata(data_path).await {
Ok(data_metadata) => {
if data_metadata.len() == 0 {
issues.push("Data.db file is empty".to_string());
}
}
Err(e) => {
issues.push(format!("Cannot access Data.db file: {}", e));
return Ok(issues); }
}
for (component_type, component_path) in components {
match tokio::fs::metadata(component_path).await {
Ok(metadata) => {
let size = metadata.len();
match component_type.as_str() {
"Index" if size < 8 => {
issues
.push(format!("Index.db file suspiciously small: {} bytes", size));
}
"Filter" if size < 8 => {
issues
.push(format!("Filter.db file suspiciously small: {} bytes", size));
}
_ => {} }
}
Err(e) => {
issues.push(format!(
"Cannot access component file {}: {}",
component_path.display(),
e
));
}
}
}
if issues.is_empty() {
log::debug!(
"Component integrity validation passed for {}",
data_path.display()
);
} else {
log::warn!(
"Component integrity issues detected for {}: {:?}",
data_path.display(),
issues
);
}
Ok(issues)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn test_extract_keyspace_and_table_standard_path() {
let path = PathBuf::from(
"/var/lib/cassandra/data/test_basic/simple_table-6aa08200a25111f0a3fef1a551383fb9/nb-1-big-Data.db",
);
let result = SSTableReader::extract_keyspace_and_table(&path);
assert!(result.is_ok(), "Should extract from standard path");
let (keyspace, table) = result.unwrap();
assert_eq!(keyspace, "test_basic");
assert_eq!(table, "simple_table");
}
#[test]
fn test_extract_keyspace_and_table_different_keyspace() {
let path =
PathBuf::from("/data/system/local-6aa08200a25111f0a3fef1a551383fb9/nb-1-big-Data.db");
let result = SSTableReader::extract_keyspace_and_table(&path);
assert!(result.is_ok(), "Should extract from system keyspace path");
let (keyspace, table) = result.unwrap();
assert_eq!(keyspace, "system");
assert_eq!(table, "local");
}
#[test]
fn test_extract_keyspace_and_table_complex_table_name() {
let path = PathBuf::from(
"/data/my_keyspace/complex_table_name-6aa08200a25111f0a3fef1a551383fb9/nb-1-big-Data.db",
);
let result = SSTableReader::extract_keyspace_and_table(&path);
assert!(result.is_ok(), "Should handle complex table names");
let (keyspace, table) = result.unwrap();
assert_eq!(keyspace, "my_keyspace");
assert_eq!(table, "complex_table_name");
}
#[test]
fn test_extract_keyspace_and_table_too_shallow_path() {
let path = PathBuf::from("nb-1-big-Data.db");
let result = SSTableReader::extract_keyspace_and_table(&path);
assert!(result.is_err(), "Should fail for too shallow path");
}
#[test]
fn test_component_path_construction() {
let data_path = PathBuf::from("/test/keyspace/table-uuid/nb-1-big-Data.db");
let base_name = extract_sstable_base_name(&data_path).unwrap();
let parent = data_path.parent().unwrap();
let index_path = parent.join(format!("{}-Index.db", base_name));
assert_eq!(
index_path.file_name().unwrap().to_str().unwrap(),
"nb-1-big-Index.db"
);
let filter_path = parent.join(format!("{}-Filter.db", base_name));
assert_eq!(
filter_path.file_name().unwrap().to_str().unwrap(),
"nb-1-big-Filter.db"
);
let summary_path = parent.join(format!("{}-Summary.db", base_name));
assert_eq!(
summary_path.file_name().unwrap().to_str().unwrap(),
"nb-1-big-Summary.db"
);
let statistics_path = parent.join(format!("{}-Statistics.db", base_name));
assert_eq!(
statistics_path.file_name().unwrap().to_str().unwrap(),
"nb-1-big-Statistics.db"
);
}
#[test]
fn test_component_path_with_different_generation() {
let data_path = PathBuf::from("/test/keyspace/table-uuid/nb-45-big-Data.db");
let base_name = extract_sstable_base_name(&data_path).unwrap();
let parent = data_path.parent().unwrap();
let compression_info_path = parent.join(format!("{}-CompressionInfo.db", base_name));
assert_eq!(
compression_info_path.file_name().unwrap().to_str().unwrap(),
"nb-45-big-CompressionInfo.db"
);
}
#[tokio::test]
async fn test_detect_component_files_with_real_data() {
let datasets_root = match std::env::var("CQLITE_DATASETS_ROOT") {
Ok(root) => PathBuf::from(root),
Err(_) => {
eprintln!("CQLITE_DATASETS_ROOT not set, skipping test");
return;
}
};
let simple_table_dir = datasets_root.join("sstables/test_basic");
if !simple_table_dir.exists() {
eprintln!("test_basic directory not found, skipping test");
return;
}
let table_dir = std::fs::read_dir(&simple_table_dir)
.expect("Should read directory")
.filter_map(|e| e.ok())
.find(|e| {
e.path().is_dir()
&& e.file_name()
.to_str()
.map(|n| n.starts_with("simple_table"))
.unwrap_or(false)
});
let Some(table_entry) = table_dir else {
eprintln!("simple_table not found, skipping test");
return;
};
let data_file = std::fs::read_dir(table_entry.path())
.expect("Should read table dir")
.filter_map(|e| e.ok())
.find(|e| {
e.file_name()
.to_str()
.map(|n| n.ends_with("-Data.db"))
.unwrap_or(false)
});
let Some(data_entry) = data_file else {
eprintln!("Data.db not found, skipping test");
return;
};
let data_path = data_entry.path();
let components = SSTableReader::detect_component_files(&data_path)
.await
.expect("Should detect component files");
eprintln!("Detected {} component files:", components.len());
for (component_type, path) in &components {
eprintln!(" {}: {}", component_type, path.display());
}
assert!(
components.contains_key("Index") || components.contains_key("Statistics"),
"Should detect at least Index or Statistics component"
);
}
#[tokio::test]
async fn test_validate_component_integrity_with_real_data() {
let datasets_root = match std::env::var("CQLITE_DATASETS_ROOT") {
Ok(root) => PathBuf::from(root),
Err(_) => {
eprintln!("CQLITE_DATASETS_ROOT not set, skipping test");
return;
}
};
let simple_table_dir = datasets_root.join("sstables/test_basic");
if !simple_table_dir.exists() {
eprintln!("test_basic directory not found, skipping test");
return;
}
let table_dir = std::fs::read_dir(&simple_table_dir)
.expect("Should read directory")
.filter_map(|e| e.ok())
.find(|e| {
e.path().is_dir()
&& e.file_name()
.to_str()
.map(|n| n.starts_with("simple_table"))
.unwrap_or(false)
});
let Some(table_entry) = table_dir else {
eprintln!("simple_table not found, skipping test");
return;
};
let data_file = std::fs::read_dir(table_entry.path())
.expect("Should read table dir")
.filter_map(|e| e.ok())
.find(|e| {
e.file_name()
.to_str()
.map(|n| n.ends_with("-Data.db"))
.unwrap_or(false)
});
let Some(data_entry) = data_file else {
eprintln!("Data.db not found, skipping test");
return;
};
let data_path = data_entry.path();
let components = SSTableReader::detect_component_files(&data_path)
.await
.expect("Should detect component files");
let issues = SSTableReader::validate_component_integrity(&data_path, &components)
.await
.expect("Should validate integrity");
eprintln!("Validation issues: {:?}", issues);
assert!(
issues.is_empty(),
"Real test data should have no integrity issues: {:?}",
issues
);
}
#[tokio::test]
async fn test_detect_component_files_nonexistent_path() {
let nonexistent_path = PathBuf::from("/nonexistent/path/nb-1-big-Data.db");
let result = SSTableReader::detect_component_files(&nonexistent_path).await;
match result {
Ok(components) => {
assert!(
components.is_empty(),
"Should return empty components for nonexistent path"
);
}
Err(_) => {
}
}
}
}