use cqlite_core::{
platform::Platform,
schema::{
registry::{SchemaRegistry, SchemaRegistryConfig},
Column, KeyColumn, TableSchema,
},
storage::sstable::schema_aware_reader::SchemaAwareReader,
Config,
};
use std::collections::HashMap;
use std::env;
use std::path::{Path, PathBuf};
use std::sync::Arc;
fn get_test_datasets_root() -> PathBuf {
env::var("CQLITE_DATASETS_ROOT")
.map(PathBuf::from)
.unwrap_or_else(|_| {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("../test-data/datasets");
path
})
}
fn find_table_dir(datasets_root: &Path, table_name: &str) -> Option<PathBuf> {
let sstable_path = datasets_root.join("sstables/test_basic");
if let Ok(entries) = std::fs::read_dir(&sstable_path) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(dir_name) = path.file_name().and_then(|n| n.to_str()) {
if dir_name.starts_with(&format!("{}-", table_name)) {
return Some(path);
}
}
}
}
}
None
}
fn find_data_file(table_dir: &Path) -> Option<PathBuf> {
if let Ok(entries) = std::fs::read_dir(table_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("db")
&& path
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.ends_with("-Data.db"))
.unwrap_or(false)
{
return Some(path);
}
}
}
None
}
fn create_simple_table_schema() -> TableSchema {
TableSchema {
keyspace: "test_basic".to_string(),
table: "simple_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "value".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
fn create_nested_collections_schema() -> TableSchema {
TableSchema {
keyspace: "test_collections".to_string(),
table: "nested_collections_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "nested_list".to_string(),
data_type: "list<list<int>>".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "nested_map".to_string(),
data_type: "map<text, map<text, int>>".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
#[tokio::test]
async fn test_format_detection_from_real_sstable() {
let datasets_root = get_test_datasets_root();
let test_table_dir = find_table_dir(&datasets_root, "simple_table")
.expect("simple_table directory must exist in test_basic");
let data_file =
find_data_file(&test_table_dir).expect("Data.db file must exist in dataset for this test");
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
let schema = create_simple_table_schema();
let registry_config = SchemaRegistryConfig::default();
let registry = Arc::new(
SchemaRegistry::new(registry_config, platform.clone(), config.clone())
.await
.unwrap(),
);
let reader = SchemaAwareReader::new(&data_file, schema, registry, &config, platform)
.await
.unwrap();
let version = reader.cassandra_version();
println!("Detected Cassandra version: {:?}", version);
let format = reader.cassandra_version();
println!("Detected format: {:?}", format);
}
#[tokio::test]
async fn test_schema_aware_reader_deterministic_decode() {
let datasets_root = get_test_datasets_root();
let test_table_dir = find_table_dir(&datasets_root, "simple_table")
.expect("simple_table directory must exist in test_basic");
let data_file =
find_data_file(&test_table_dir).expect("Data.db file must exist in dataset for this test");
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
let schema = create_simple_table_schema();
let registry_config = SchemaRegistryConfig::default();
let registry = Arc::new(
SchemaRegistry::new(registry_config, platform.clone(), config.clone())
.await
.unwrap(),
);
let reader = SchemaAwareReader::new(&data_file, schema, registry, &config, platform)
.await
.unwrap();
println!(
"SchemaAwareReader successfully created for table: {}",
reader.table_name()
);
println!("Schema: {:?}", reader.schema());
println!(
"Format optimizations available: {}",
reader.has_format_optimizations()
);
}
#[tokio::test]
async fn test_nested_collections_consumed_byte_tracking() {
let datasets_root = get_test_datasets_root();
let collections_dir = datasets_root.join("sstables").join("test_collections");
let nested_table_dir = std::fs::read_dir(&collections_dir)
.expect("test_collections directory must exist")
.flatten()
.find(|e| {
e.file_name()
.to_string_lossy()
.starts_with("nested_collections_table")
})
.map(|e| e.path())
.expect("nested_collections_table must exist in test_collections");
let data_file = find_data_file(&nested_table_dir)
.expect("Data.db file must exist in nested_collections_table");
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await.unwrap());
let schema = create_nested_collections_schema();
let registry_config = SchemaRegistryConfig::default();
let registry = Arc::new(
SchemaRegistry::new(registry_config, platform.clone(), config.clone())
.await
.unwrap(),
);
let reader = SchemaAwareReader::new(&data_file, schema, registry, &config, platform)
.await
.unwrap();
println!(
"SchemaAwareReader successfully created for nested collections table: {}",
reader.table_name()
);
println!("Schema with nested types: {:?}", reader.schema());
let nested_list_col = reader
.schema()
.columns
.iter()
.find(|c| c.name == "nested_list");
assert!(nested_list_col.is_some(), "nested_list column should exist");
assert!(
nested_list_col.unwrap().data_type.contains("list<list"),
"Should be nested list type"
);
}
#[test]
fn test_schema_validation_acceptance_criteria() {
let incomplete_schema = TableSchema {
keyspace: "test".to_string(),
table: "incomplete".to_string(),
partition_keys: vec![], clustering_keys: vec![],
columns: vec![],
comments: HashMap::new(),
};
let result = SchemaAwareReader::validate_schema_completeness(&incomplete_schema);
assert!(
result.is_err(),
"Should reject schema without partition keys"
);
let valid_schema = create_simple_table_schema();
let result = SchemaAwareReader::validate_schema_completeness(&valid_schema);
assert!(result.is_ok(), "Should accept valid schema");
}
#[test]
fn test_no_blob_fallback_enforcement() {
let schema = create_simple_table_schema();
for column in &schema.columns {
assert!(
!column.data_type.is_empty(),
"All columns must have explicit types"
);
}
assert!(SchemaAwareReader::validate_schema_completeness(&schema).is_ok());
}