use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::{
error::{Error, Result},
parser::header::CassandraVersion,
platform::Platform,
schema::{
parser::SchemaParser,
registry::{ParsingContext, SchemaRegistry},
CqlType, TableSchema,
},
storage::sstable::{
format_detector::SSTableFormat,
reader::{SSTableReader, SSTableReaderStats},
},
types::{ComparatorType, Value},
Config, RowKey,
};
#[derive(Debug)]
#[allow(dead_code)]
pub struct SchemaAwareReader {
file_path: PathBuf,
reader: SSTableReader,
schema_parser: SchemaParser,
context: ParsingContext,
format: SSTableFormat,
version: CassandraVersion,
platform: Arc<Platform>,
}
#[derive(Debug, Clone)]
pub struct SchemaAwareReaderConfig {
pub validate_schema_completeness: bool,
pub strict_schema_validation: bool,
pub enable_format_optimizations: bool,
pub cache_parsed_values: bool,
}
impl Default for SchemaAwareReaderConfig {
fn default() -> Self {
Self {
validate_schema_completeness: true,
strict_schema_validation: true,
enable_format_optimizations: true,
cache_parsed_values: true,
}
}
}
#[derive(Debug, Clone)]
pub struct SchemaAwareStats {
pub base_stats: SSTableReaderStats,
pub schema_parsed_values: u64,
pub partition_keys_parsed: u64,
pub clustering_keys_parsed: u64,
pub column_values_parsed: u64,
pub parse_errors: u64,
pub format_optimizations_used: u64,
}
impl SchemaAwareReader {
pub async fn new(
path: &Path,
schema: TableSchema,
schema_registry: Arc<SchemaRegistry>,
config: &Config,
platform: Arc<Platform>,
) -> Result<Self> {
Self::new_with_config(
path,
schema,
schema_registry,
config,
platform,
SchemaAwareReaderConfig::default(),
)
.await
}
pub async fn new_with_config(
path: &Path,
schema: TableSchema,
schema_registry: Arc<SchemaRegistry>,
config: &Config,
platform: Arc<Platform>,
reader_config: SchemaAwareReaderConfig,
) -> Result<Self> {
if reader_config.validate_schema_completeness {
Self::validate_schema_completeness(&schema)?;
}
let context = Self::create_parsing_context(&schema, &schema_registry)?;
if reader_config.strict_schema_validation && !context.is_complete() {
return Err(Error::Schema(format!(
"Incomplete parsing context for table {}.{}: missing schema or comparators",
schema.keyspace, schema.table
)));
}
let schema_parser = SchemaParser::new(context.clone())?;
let reader = SSTableReader::open(path, config, platform.clone()).await?;
let format = Self::detect_format(&reader)?;
let version = reader.cassandra_version();
Ok(Self {
file_path: path.to_path_buf(),
reader,
schema_parser,
context,
format,
version,
platform,
})
}
pub fn validate_schema_completeness(schema: &TableSchema) -> Result<()> {
if schema.partition_keys.is_empty() {
return Err(Error::Schema(format!(
"Schema for table {}.{} must have at least one partition key",
schema.keyspace, schema.table
)));
}
for (idx, key) in schema.partition_keys.iter().enumerate() {
CqlType::parse(&key.data_type).map_err(|e| {
Error::Schema(format!(
"Invalid partition key type '{}' at position {} in {}.{}: {}",
key.data_type, idx, schema.keyspace, schema.table, e
))
})?;
}
for (idx, key) in schema.clustering_keys.iter().enumerate() {
CqlType::parse(&key.data_type).map_err(|e| {
Error::Schema(format!(
"Invalid clustering key type '{}' at position {} in {}.{}: {}",
key.data_type, idx, schema.keyspace, schema.table, e
))
})?;
}
for column in &schema.columns {
CqlType::parse(&column.data_type).map_err(|e| {
Error::Schema(format!(
"Invalid column type '{}' for column '{}' in {}.{}: {}",
column.data_type, column.name, schema.keyspace, schema.table, e
))
})?;
}
let mut positions: Vec<usize> = schema.partition_keys.iter().map(|k| k.position).collect();
positions.sort();
for (expected, &actual) in positions.iter().enumerate() {
if expected != actual {
return Err(Error::Schema(format!(
"Non-contiguous partition key positions in {}.{}: expected {}, found {}",
schema.keyspace, schema.table, expected, actual
)));
}
}
if !schema.clustering_keys.is_empty() {
let mut positions: Vec<usize> =
schema.clustering_keys.iter().map(|k| k.position).collect();
positions.sort();
for (expected, &actual) in positions.iter().enumerate() {
if expected != actual {
return Err(Error::Schema(format!(
"Non-contiguous clustering key positions in {}.{}: expected {}, found {}",
schema.keyspace, schema.table, expected, actual
)));
}
}
}
Ok(())
}
pub fn create_parsing_context(
schema: &TableSchema,
_registry: &SchemaRegistry,
) -> Result<ParsingContext> {
let partition_comparators = schema.get_partition_key_comparators()?;
let clustering_comparators = schema.get_clustering_key_comparators()?;
let column_comparators = schema.get_all_comparators()?;
Ok(ParsingContext {
schema: schema.clone(),
partition_comparators,
clustering_comparators,
column_comparators,
})
}
fn detect_format(reader: &SSTableReader) -> Result<SSTableFormat> {
use super::format_detector::FormatDetector;
let format_version = reader.format_version()?;
let detector = FormatDetector::new();
detector.detect_from_version(&format_version)
}
pub async fn get(
&self,
partition_key: &[Value],
clustering_key: Option<&[Value]>,
) -> Result<Option<HashMap<String, Value>>> {
self.validate_partition_key(partition_key)?;
if let Some(ck) = clustering_key {
self.validate_clustering_key(ck)?;
}
let row_key = self.create_row_key(partition_key, clustering_key)?;
let table_id = self.get_table_id();
if let Some(raw_value) = self.reader.get(&table_id, &row_key).await? {
let parsed_row = self.parse_row_value(&raw_value)?;
Ok(Some(parsed_row))
} else {
Ok(None)
}
}
pub async fn scan(
&self,
start_partition: Option<&[Value]>,
end_partition: Option<&[Value]>,
start_clustering: Option<&[Value]>,
end_clustering: Option<&[Value]>,
limit: Option<usize>,
) -> Result<Vec<(Vec<Value>, Vec<Value>, HashMap<String, Value>)>> {
if let Some(pk) = start_partition {
self.validate_partition_key(pk)?;
}
if let Some(pk) = end_partition {
self.validate_partition_key(pk)?;
}
if let Some(ck) = start_clustering {
self.validate_clustering_key(ck)?;
}
if let Some(ck) = end_clustering {
self.validate_clustering_key(ck)?;
}
let start_key = if let Some(pk) = start_partition {
Some(self.create_row_key(pk, start_clustering)?)
} else {
None
};
let end_key = if let Some(pk) = end_partition {
Some(self.create_row_key(pk, end_clustering)?)
} else {
None
};
let table_id = self.get_table_id();
let raw_results = self
.reader
.scan(
&table_id,
start_key.as_ref(),
end_key.as_ref(),
limit,
Some(&self.context.schema),
)
.await?;
let mut parsed_results = Vec::new();
for (row_key, raw_value) in raw_results {
let (partition_values, clustering_values) = self.parse_row_key(&row_key)?;
let column_values = self.parse_row_value(&raw_value)?;
parsed_results.push((partition_values, clustering_values, column_values));
}
Ok(parsed_results)
}
pub fn parse_row_key(&self, row_key: &RowKey) -> Result<(Vec<Value>, Vec<Value>)> {
let key_bytes = row_key.as_bytes();
let partition_values = self.schema_parser.parse_partition_key(key_bytes)?;
let partition_length = self.calculate_partition_key_length(&partition_values)?;
let clustering_values = if key_bytes.len() > partition_length {
self.schema_parser
.parse_clustering_keys(&key_bytes[partition_length..])?
} else {
Vec::new()
};
Ok((partition_values, clustering_values))
}
pub fn parse_row_value(&self, raw_value: &Value) -> Result<HashMap<String, Value>> {
let value_bytes = raw_value
.as_bytes()
.ok_or_else(|| Error::Schema("Row value is not binary data".to_string()))?;
let mut column_values = HashMap::new();
let mut offset = 0;
for column in &self.context.schema.columns {
if self.context.schema.is_partition_key(&column.name)
|| self.context.schema.is_clustering_key(&column.name)
{
continue;
}
if offset >= value_bytes.len() {
break; }
let (column_value, consumed) = self
.schema_parser
.parse_column_value(&column.name, &value_bytes[offset..])?;
offset += consumed;
column_values.insert(column.name.clone(), column_value);
}
Ok(column_values)
}
pub async fn stats(&self) -> Result<SchemaAwareStats> {
let base_stats = self.reader.stats().await?.clone();
Ok(SchemaAwareStats {
base_stats,
schema_parsed_values: 0, partition_keys_parsed: 0,
clustering_keys_parsed: 0,
column_values_parsed: 0,
parse_errors: 0,
format_optimizations_used: 0,
})
}
pub fn table_name(&self) -> String {
format!(
"{}.{}",
self.context.schema.keyspace, self.context.schema.table
)
}
pub fn schema(&self) -> &TableSchema {
&self.context.schema
}
pub fn context(&self) -> &ParsingContext {
&self.context
}
pub fn has_format_optimizations(&self) -> bool {
matches!(self.format, SSTableFormat::V5x(_))
}
pub fn cassandra_version(&self) -> CassandraVersion {
self.version
}
pub fn file_path(&self) -> &Path {
&self.file_path
}
fn validate_partition_key(&self, key: &[Value]) -> Result<()> {
if key.len() != self.context.partition_comparators.len() {
return Err(Error::Schema(format!(
"Partition key length mismatch: expected {}, got {}",
self.context.partition_comparators.len(),
key.len()
)));
}
Ok(())
}
fn validate_clustering_key(&self, key: &[Value]) -> Result<()> {
if key.len() > self.context.clustering_comparators.len() {
return Err(Error::Schema(format!(
"Clustering key too long: expected max {}, got {}",
self.context.clustering_comparators.len(),
key.len()
)));
}
Ok(())
}
fn create_row_key(
&self,
partition_key: &[Value],
clustering_key: Option<&[Value]>,
) -> Result<RowKey> {
let mut key_bytes = Vec::new();
for (value, comparator) in partition_key
.iter()
.zip(&self.context.partition_comparators)
{
let serialized = self.serialize_value_with_comparator(value, comparator)?;
key_bytes.extend_from_slice(&serialized);
}
if let Some(ck) = clustering_key {
for (value, comparator) in ck.iter().zip(&self.context.clustering_comparators) {
let serialized = self.serialize_value_with_comparator(value, comparator)?;
key_bytes.extend_from_slice(&serialized);
}
}
Ok(RowKey::new(key_bytes))
}
fn serialize_value_with_comparator(
&self,
value: &Value,
_comparator: &ComparatorType,
) -> Result<Vec<u8>> {
Ok(value.as_bytes().unwrap_or(&[]).to_vec())
}
fn calculate_partition_key_length(&self, partition_values: &[Value]) -> Result<usize> {
let mut total_length = 0;
for (value, comparator) in partition_values
.iter()
.zip(&self.context.partition_comparators)
{
let serialized = self.serialize_value_with_comparator(value, comparator)?;
total_length += serialized.len();
}
Ok(total_length)
}
fn get_table_id(&self) -> crate::types::TableId {
crate::types::TableId::from(format!(
"{}.{}",
self.context.schema.keyspace, self.context.schema.table
))
}
}
#[derive(Debug, thiserror::Error)]
pub enum SchemaAwareReaderError {
#[error("Schema validation failed: {0}")]
SchemaValidation(String),
#[error("Parsing context incomplete: {0}")]
IncompleteContext(String),
#[error("Key validation failed: {0}")]
KeyValidation(String),
#[error("Value parsing failed for column '{column}': {reason}")]
ValueParsing { column: String, reason: String },
#[error("Format-specific error: {0}")]
FormatSpecific(String),
#[error("Incompatible Cassandra version: expected {expected:?}, found {found:?}")]
VersionMismatch {
expected: CassandraVersion,
found: CassandraVersion,
},
}
impl From<SchemaAwareReaderError> for Error {
fn from(err: SchemaAwareReaderError) -> Self {
Error::Schema(err.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{ClusteringColumn, ClusteringOrder, Column, KeyColumn};
fn create_test_schema() -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "uuid".to_string(),
position: 0,
}],
clustering_keys: vec![ClusteringColumn {
name: "timestamp".to_string(),
data_type: "timestamp".to_string(),
position: 0,
order: ClusteringOrder::Asc,
}],
columns: vec![
Column {
name: "id".to_string(),
data_type: "uuid".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "timestamp".to_string(),
data_type: "timestamp".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "data".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
#[test]
fn test_schema_validation() {
let schema = create_test_schema();
assert!(SchemaAwareReader::validate_schema_completeness(&schema).is_ok());
}
#[test]
fn test_invalid_schema_validation() {
let mut schema = create_test_schema();
schema.partition_keys.clear();
assert!(SchemaAwareReader::validate_schema_completeness(&schema).is_err());
}
#[test]
fn test_non_contiguous_positions() {
let mut schema = create_test_schema();
schema.partition_keys.push(KeyColumn {
name: "other".to_string(),
data_type: "text".to_string(),
position: 2, });
assert!(SchemaAwareReader::validate_schema_completeness(&schema).is_err());
}
}