use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::str::FromStr;
use crate::error::{Error, Result};
use crate::storage::sstable::version_gate::SsTableDescriptor;
use super::types::{SSTableComponent, SSTableGeneration, SecondaryIndex};
pub(crate) fn extract_table_name(dir_name: &str) -> Result<String> {
if let Some(hyphen_pos) = dir_name.rfind('-') {
let table_name = &dir_name[..hyphen_pos];
if table_name.is_empty() {
return Err(Error::invalid_path(format!(
"Empty table name in directory: {}",
dir_name
)));
}
Ok(table_name.to_string())
} else {
Ok(dir_name.to_string())
}
}
pub(crate) fn scan_sstable_files(path: &Path, table_name: &str) -> Result<Vec<SSTableGeneration>> {
let entries = fs::read_dir(path)
.map_err(|e| Error::storage(format!("Failed to read directory: {:?}: {}", path, e)))?;
let mut generations_map: HashMap<(u32, String), SSTableGeneration> = HashMap::new();
let mut found_files = 0;
let mut valid_sstable_files = 0;
for entry in entries {
let entry = entry?;
let file_path = entry.path();
found_files += 1;
if let Some(file_name) = file_path.file_name().and_then(|n| n.to_str()) {
if !file_path.is_file() {
continue; }
if let Err(e) = fs::metadata(&file_path) {
log::warn!("Cannot access file {:?}: {}", file_path, e);
continue;
}
if let Some((version, generation, format, component)) =
parse_sstable_filename(file_name)?
{
valid_sstable_files += 1;
let key = (generation, format.clone());
let generation_obj =
generations_map
.entry(key.clone())
.or_insert_with(|| SSTableGeneration {
version: version.clone(),
generation,
format,
table_name: table_name.to_string(),
components: HashMap::new(),
base_path: path.to_path_buf(),
});
generation_obj.components.insert(component, file_path);
}
}
}
if found_files == 0 {
return Err(Error::not_found(format!(
"Directory appears to be empty: {:?}",
path
)));
}
if valid_sstable_files == 0 {
return Err(Error::invalid_format(format!(
"No valid SSTable files found in directory: {:?}. Found {} files total, but none match the expected SSTable naming pattern (e.g., nb-1-big-Data.db)",
path,
found_files
)));
}
let mut generations: Vec<SSTableGeneration> = generations_map.into_values().collect();
generations.sort_by(|a, b| b.generation.cmp(&a.generation));
log::debug!(
"Directory scan completed: {} total files, {} SSTable files, {} generations found",
found_files,
valid_sstable_files,
generations.len()
);
Ok(generations)
}
pub(crate) fn parse_sstable_filename(
filename: &str,
) -> Result<Option<(String, u32, String, SSTableComponent)>> {
use std::path::Path;
let desc = match SsTableDescriptor::parse(Path::new(filename)) {
Ok(d) => d,
Err(_) => {
return Ok(None);
}
};
let generation: u32 = match desc.sstable_id.parse() {
Ok(n) => n,
Err(_) => {
log::debug!(
"parse_sstable_filename: skipping UUID-id file {} (not a plain integer generation)",
filename
);
return Ok(None);
}
};
let format = desc.format.as_str().to_string();
let version = desc.version.clone();
let component = match SSTableComponent::from_str(&desc.component) {
Ok(c) => c,
Err(_) => {
log::debug!(
"Ignoring file with unrecognized component extension: {}",
filename
);
return Ok(None);
}
};
Ok(Some((version, generation, format, component)))
}
pub(crate) fn scan_secondary_indexes(path: &Path, table_name: &str) -> Result<Vec<SecondaryIndex>> {
let entries = fs::read_dir(path)
.map_err(|e| Error::storage(format!("Failed to read directory: {:?}: {}", path, e)))?;
let mut secondary_indexes = Vec::new();
for entry in entries {
let entry = entry?;
let entry_path = entry.path();
if entry_path.is_dir() {
if let Some(dir_name) = entry_path.file_name().and_then(|n| n.to_str()) {
if dir_name.starts_with('.') && dir_name.ends_with("_idx") {
let index_name = dir_name[1..].to_string();
let expected_prefix = format!("{}_", table_name);
if index_name.starts_with(&expected_prefix) {
let index_generations = scan_sstable_files(&entry_path, table_name)?;
if !index_generations.is_empty() {
secondary_indexes.push(SecondaryIndex {
index_name,
index_path: entry_path,
generations: index_generations,
});
}
}
}
}
}
}
Ok(secondary_indexes)
}