use serde::{Deserialize, Serialize};
use std::fs;
use std::path::{Path, PathBuf};
#[derive(Debug, thiserror::Error)]
pub enum DatasetError {
#[error("Dataset not found: {keyspace}.{table}. Available datasets: {available}")]
DatasetNotFound {
keyspace: String,
table: String,
available: String,
},
#[error("Metadata file not found at {path}")]
MetadataNotFound { path: String },
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("YAML parsing error: {0}")]
Yaml(#[from] serde_yaml::Error),
#[error("Directory not found: {path}")]
DirectoryNotFound { path: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Keyspace {
pub name: String,
pub tables: Vec<Table>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Table {
pub name: String,
pub row_count: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Metadata {
pub keyspaces: Vec<Keyspace>,
}
#[derive(Debug, Clone)]
pub struct TableInfo {
pub keyspace: String,
pub table: String,
pub row_count: u64,
}
fn get_datasets_root() -> PathBuf {
if let Ok(root) = std::env::var("CQLITE_DATASETS_ROOT") {
PathBuf::from(root)
} else {
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
manifest_dir
.parent()
.map(|workspace| workspace.join("test-data/datasets"))
.unwrap_or_else(|| PathBuf::from("test-data/datasets"))
}
}
pub fn load_metadata() -> Result<Metadata, DatasetError> {
load_metadata_at(&get_datasets_root())
}
pub fn load_metadata_at(root: &Path) -> Result<Metadata, DatasetError> {
let metadata_path = root.join("metadata.yml");
if !metadata_path.exists() {
return Err(DatasetError::MetadataNotFound {
path: metadata_path.to_string_lossy().to_string(),
});
}
let content = fs::read_to_string(metadata_path)?;
let metadata: Metadata = serde_yaml::from_str(&content)?;
Ok(metadata)
}
pub fn resolve_table_to_sstable_path(keyspace: &str, table: &str) -> Result<PathBuf, DatasetError> {
resolve_table_to_sstable_path_at(&get_datasets_root(), keyspace, table)
}
pub fn resolve_table_to_sstable_path_at(
root: &Path,
keyspace: &str,
table: &str,
) -> Result<PathBuf, DatasetError> {
let metadata = load_metadata_at(root)?;
let mut found = false;
for ks in &metadata.keyspaces {
if ks.name == keyspace {
for tbl in &ks.tables {
if tbl.name == table {
found = true;
break;
}
}
}
}
if !found {
let available = list_tables_at(root, None)?
.into_iter()
.map(|t| format!("{}.{}", t.keyspace, t.table))
.collect::<Vec<_>>()
.join(", ");
return Err(DatasetError::DatasetNotFound {
keyspace: keyspace.to_string(),
table: table.to_string(),
available,
});
}
let sstables_dir = root.join("sstables").join(keyspace);
if !sstables_dir.exists() {
return Err(DatasetError::DirectoryNotFound {
path: sstables_dir.to_string_lossy().to_string(),
});
}
let mut data_db_candidate: Option<PathBuf> = None;
let mut index_db_candidate: Option<PathBuf> = None;
let mut any_candidate: Option<PathBuf> = None;
for entry in fs::read_dir(&sstables_dir)? {
let entry = entry?;
let path = entry.path();
if !path.is_dir() {
continue;
}
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if !name.starts_with(&format!("{}-", table)) {
continue;
}
}
let mut has_data = false;
let mut has_index = false;
let mut has_any = false;
if let Ok(files) = fs::read_dir(&path) {
for f in files.flatten() {
if let Some(fname) = f.file_name().to_str() {
if should_ignore_file(fname) {
continue;
}
if fname.ends_with("-Data.db") {
has_data = true;
has_any = true;
break;
} else if fname.ends_with("-Index.db") {
has_index = true;
has_any = true;
} else if fname.ends_with("-Data.db.jsonl")
|| fname.ends_with("-Statistics.db.txt")
|| fname.ends_with("-Summary.db.txt")
{
has_any = true;
}
}
}
}
if has_data {
data_db_candidate = Some(path);
break;
}
if has_index && index_db_candidate.is_none() {
index_db_candidate = Some(path.clone());
}
if has_any && any_candidate.is_none() {
any_candidate = Some(path.clone());
}
}
if let Some(p) = data_db_candidate.or(index_db_candidate).or(any_candidate) {
return Ok(p);
}
Err(DatasetError::DirectoryNotFound {
path: format!("{}/{}-*", sstables_dir.to_string_lossy(), table),
})
}
pub fn list_tables(keyspace_filter: Option<&str>) -> Result<Vec<TableInfo>, DatasetError> {
list_tables_at(&get_datasets_root(), keyspace_filter)
}
pub fn list_tables_at(
root: &Path,
keyspace_filter: Option<&str>,
) -> Result<Vec<TableInfo>, DatasetError> {
let metadata = load_metadata_at(root)?;
let mut tables = Vec::new();
for keyspace in &metadata.keyspaces {
if let Some(filter) = keyspace_filter {
if keyspace.name != filter {
continue;
}
}
for table in &keyspace.tables {
tables.push(TableInfo {
keyspace: keyspace.name.clone(),
table: table.name.clone(),
row_count: table.row_count,
});
}
}
Ok(tables)
}
pub fn should_ignore_file(filename: &str) -> bool {
filename.starts_with("._")
}
pub fn derive_reference_paths_from_data_db(data_db: &Path) -> Option<(PathBuf, PathBuf, PathBuf)> {
let file_name = data_db.file_name()?.to_str()?;
if !file_name.ends_with("-Data.db") {
return None;
}
let prefix = &file_name[..file_name.len() - "-Data.db".len()];
let dir = data_db.parent()?;
let expected_jsonl = dir.join(format!("{}-Data.db.jsonl", prefix));
let expected_stats = dir.join(format!("{}-Statistics.db.txt", prefix));
let expected_summary = dir.join(format!("{}-Summary.db.txt", prefix));
if expected_jsonl.exists() && expected_stats.exists() && expected_summary.exists() {
return Some((expected_jsonl, expected_stats, expected_summary));
}
let mut found_jsonl = None;
let mut found_stats = None;
let mut found_summary = None;
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
if let Some(name) = entry.file_name().to_str() {
if should_ignore_file(name) {
continue;
}
if found_jsonl.is_none() && name.ends_with("-Data.db.jsonl") {
found_jsonl = Some(entry.path());
}
if found_stats.is_none() && name.ends_with("-Statistics.db.txt") {
found_stats = Some(entry.path());
}
if found_summary.is_none() && name.ends_with("-Summary.db.txt") {
found_summary = Some(entry.path());
}
}
}
}
let data_jsonl = found_jsonl.unwrap_or(expected_jsonl);
let stats_txt = found_stats.unwrap_or(expected_stats);
let summary_txt = found_summary.unwrap_or(expected_summary);
Some((data_jsonl, stats_txt, summary_txt))
}
pub fn derive_companion_file(data_file: &Path, companion_type: &str) -> Option<PathBuf> {
let data_name = data_file.file_name()?.to_str()?;
if !data_name.ends_with("-Data.db") {
return None;
}
let prefix = &data_name[..data_name.len() - "-Data.db".len()];
let dir = data_file.parent()?;
let expected_companion = dir.join(format!("{}-{}", prefix, companion_type));
if expected_companion.exists() {
return Some(expected_companion);
}
let companion_suffix = format!("-{}", companion_type);
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
if let Some(name) = entry.file_name().to_str() {
if should_ignore_file(name) {
continue;
}
if name.ends_with(&companion_suffix) {
return Some(entry.path());
}
}
}
}
Some(expected_companion)
}
pub fn read_jsonl_rows(
path: &Path,
) -> Result<impl Iterator<Item = serde_json::Value>, DatasetError> {
use std::io::{BufRead, BufReader};
let file = std::fs::File::open(path)?;
let reader = BufReader::new(file);
let iter = reader
.lines()
.map_while(Result::ok)
.filter(|l| !l.trim().is_empty())
.filter_map(|l| serde_json::from_str::<serde_json::Value>(&l).ok());
Ok(iter)
}
pub fn parse_sstablemetadata_text(
path: &Path,
) -> Result<std::collections::HashMap<String, String>, DatasetError> {
let content = std::fs::read_to_string(path)?;
let mut map = std::collections::HashMap::new();
for line in content.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
if let Some((k, v)) = line.split_once(':') {
map.insert(k.trim().to_string(), v.trim().to_string());
}
}
Ok(map)
}
#[derive(Debug, Clone, Deserialize)]
struct RefManifest {
#[allow(dead_code)]
refs_version: Option<u32>,
#[allow(dead_code)]
generated_at: Option<String>,
tables: Vec<RefEntry>,
}
#[derive(Debug, Clone, Deserialize)]
struct RefEntry {
keyspace: String,
table: String,
#[allow(dead_code)]
sstable_dir: String,
#[allow(dead_code)]
prefix: String,
}
pub fn load_references_manifest_at(root: &Path) -> Option<RefManifest> {
let path = root.join("references.yml");
let content = std::fs::read_to_string(path).ok()?;
serde_yaml::from_str::<RefManifest>(&content).ok()
}
pub fn resolve_table_dir_via_manifest(root: &Path, keyspace: &str, table: &str) -> Option<PathBuf> {
let manifest = load_references_manifest_at(root)?;
let entry = manifest
.tables
.into_iter()
.find(|e| e.keyspace == keyspace && e.table == table)?;
let hashed_dir_name = std::path::Path::new(&entry.sstable_dir)
.file_name()
.and_then(|n| n.to_str())?
.to_string();
let normalized = root.join("sstables").join(keyspace).join(hashed_dir_name);
Some(normalized)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::path::Path;
use tempfile::TempDir;
fn create_test_metadata(datasets_root: &Path) -> Result<(), Box<dyn std::error::Error>> {
fs::create_dir_all(datasets_root)?;
let metadata = Metadata {
keyspaces: vec![Keyspace {
name: "test_basic".to_string(),
tables: vec![Table {
name: "simple_table".to_string(),
row_count: 1000,
}],
}],
};
let metadata_content = serde_yaml::to_string(&metadata)?;
fs::write(datasets_root.join("metadata.yml"), metadata_content)?;
let sstables_dir = datasets_root.join("sstables/test_basic/simple_table-abc123def456");
fs::create_dir_all(&sstables_dir)?;
fs::write(sstables_dir.join("nb-1-big-Data.db"), "test data")?;
Ok(())
}
#[test]
fn test_load_metadata() {
let temp_dir = TempDir::new().unwrap();
let datasets_root = temp_dir.path().join("datasets");
create_test_metadata(&datasets_root).unwrap();
let metadata = load_metadata_at(&datasets_root).unwrap();
assert_eq!(metadata.keyspaces.len(), 1);
assert_eq!(metadata.keyspaces[0].name, "test_basic");
assert_eq!(metadata.keyspaces[0].tables.len(), 1);
assert_eq!(metadata.keyspaces[0].tables[0].name, "simple_table");
}
#[test]
fn test_resolve_table_to_sstable_path() {
let temp_dir = TempDir::new().unwrap();
let datasets_root = temp_dir.path().join("datasets");
create_test_metadata(&datasets_root).unwrap();
let path =
resolve_table_to_sstable_path_at(&datasets_root, "test_basic", "simple_table").unwrap();
assert!(path.ends_with("simple_table-abc123def456"));
assert!(path.exists());
}
#[test]
fn test_list_tables() {
let temp_dir = TempDir::new().unwrap();
let datasets_root = temp_dir.path().join("datasets");
create_test_metadata(&datasets_root).unwrap();
let tables = list_tables_at(&datasets_root, None).unwrap();
assert_eq!(tables.len(), 1);
assert_eq!(tables[0].keyspace, "test_basic");
assert_eq!(tables[0].table, "simple_table");
assert_eq!(tables[0].row_count, 1000);
let filtered = list_tables_at(&datasets_root, Some("test_basic")).unwrap();
assert_eq!(filtered.len(), 1);
let empty = list_tables_at(&datasets_root, Some("nonexistent")).unwrap();
assert_eq!(empty.len(), 0);
}
#[test]
fn test_table_not_found() {
let temp_dir = TempDir::new().unwrap();
let datasets_root = temp_dir.path().join("datasets");
create_test_metadata(&datasets_root).unwrap();
let result = resolve_table_to_sstable_path_at(&datasets_root, "test_basic", "nonexistent");
assert!(result.is_err());
assert!(matches!(result, Err(DatasetError::DatasetNotFound { .. })));
}
}