use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::{
parser::header::{CassandraVersion, SSTableHeader},
platform::Platform,
schema::{ClusteringColumn, UdtRegistry},
types::Value,
Config, Result,
};
#[derive(Debug, Clone)]
pub struct SchemaDiscoveryConfig {
pub max_sample_rows: usize,
pub aggressive_inference: bool,
pub enable_schema_cache: bool,
pub cache_ttl_seconds: u64,
pub enable_versioning: bool,
pub max_versions: usize,
pub enable_udt_discovery: bool,
pub enable_collection_analysis: bool,
pub enable_index_discovery: bool,
pub enable_cross_file_validation: bool,
pub min_confidence_threshold: f64,
}
impl Default for SchemaDiscoveryConfig {
fn default() -> Self {
Self {
max_sample_rows: 2000,
aggressive_inference: true,
enable_schema_cache: true,
cache_ttl_seconds: 3600, enable_versioning: true,
max_versions: 10,
enable_udt_discovery: true,
enable_collection_analysis: true,
enable_index_discovery: true,
enable_cross_file_validation: true,
min_confidence_threshold: 0.7,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchemaInfo {
pub keyspace: String,
pub table: String,
pub partition_key: Vec<ColumnDefinition>,
pub clustering_keys: Vec<ClusteringColumn>,
pub regular_columns: Vec<ColumnDefinition>,
pub static_columns: Vec<ColumnDefinition>,
pub collection_types: HashMap<String, CollectionType>,
pub user_defined_types: Vec<UDTDefinition>,
pub indexes: Vec<IndexDefinition>,
pub table_options: TableOptions,
pub metadata: SchemaMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnDefinition {
pub name: String,
pub data_type: String,
pub type_info: TypeInfo,
pub nullable: bool,
pub is_static: bool,
pub default_value: Option<Value>,
pub position: usize,
pub confidence: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TypeInfo {
pub type_id: String,
pub type_params: Vec<String>,
pub is_frozen: bool,
pub element_type: Option<Box<TypeInfo>>,
pub key_type: Option<Box<TypeInfo>>,
pub value_type: Option<Box<TypeInfo>>,
pub udt_fields: Option<Vec<UdtFieldInfo>>,
pub tuple_elements: Option<Vec<TypeInfo>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdtFieldInfo {
pub name: String,
pub field_type: TypeInfo,
pub nullable: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectionType {
pub kind: CollectionKind,
pub element_type: Option<String>,
pub key_type: Option<String>,
pub value_type: Option<String>,
pub is_frozen: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CollectionKind {
List,
Set,
Map,
Tuple,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UDTDefinition {
pub name: String,
pub keyspace: String,
pub fields: Vec<UdtFieldDefinition>,
pub version: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdtFieldDefinition {
pub name: String,
pub field_type: String,
pub position: usize,
pub nullable: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexDefinition {
pub name: String,
pub target_column: String,
pub index_type: IndexType,
pub options: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexType {
Secondary,
Composite,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableOptions {
pub compaction: Option<CompactionStrategy>,
pub compression: Option<CompressionOptions>,
pub caching: Option<CachingOptions>,
pub bloom_filter_fp_chance: Option<f64>,
pub gc_grace_seconds: Option<u32>,
pub default_time_to_live: Option<u32>,
pub memtable_flush_period_in_ms: Option<u32>,
pub additional_properties: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionStrategy {
pub class: String,
pub options: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressionOptions {
pub algorithm: String,
pub chunk_length_kb: Option<u32>,
pub crc_check_chance: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CachingOptions {
pub keys: String,
pub rows_per_partition: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchemaMetadata {
pub discovered_at: SystemTime,
pub source_files: Vec<PathBuf>,
pub total_rows_sampled: usize,
pub cassandra_version: Option<CassandraVersion>,
pub discovery_method: DiscoveryMethod,
pub version: u32,
pub validation_results: ValidationResults,
pub performance_metrics: DiscoveryMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DiscoveryMethod {
HeaderMetadata,
DataSampling,
Hybrid,
External,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationResults {
pub status: ValidationStatus,
pub errors: Vec<ValidationError>,
pub warnings: Vec<ValidationWarning>,
pub consistency_results: ConsistencyResults,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ValidationStatus {
Valid,
ValidWithWarnings,
Invalid,
ValidationFailed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationError {
pub error_type: ValidationErrorType,
pub message: String,
pub component: Option<String>,
pub source_file: Option<PathBuf>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ValidationErrorType {
TypeMismatch,
MissingComponent,
InvalidTypeDefinition,
ConstraintViolation,
UdtInconsistency,
IndexError,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationWarning {
pub warning_type: ValidationWarningType,
pub message: String,
pub component: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ValidationWarningType {
LowConfidence,
DeprecatedFeature,
VersionCompatibility,
PerformanceRecommendation,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsistencyResults {
pub files_analyzed: usize,
pub schema_mismatches: usize,
pub type_inconsistencies: Vec<TypeInconsistency>,
pub udt_conflicts: Vec<UdtConflict>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TypeInconsistency {
pub column_name: String,
pub conflicting_types: Vec<String>,
pub conflicting_files: Vec<PathBuf>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdtConflict {
pub udt_name: String,
pub field_conflicts: Vec<FieldConflict>,
pub conflicting_files: Vec<PathBuf>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FieldConflict {
pub field_name: String,
pub conflicting_types: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiscoveryMetrics {
pub total_time_ms: u64,
pub header_parsing_time_ms: u64,
pub data_sampling_time_ms: u64,
pub type_inference_time_ms: u64,
pub validation_time_ms: u64,
pub peak_memory_usage_bytes: usize,
}
#[derive(Debug)]
pub struct SchemaDiscoveryEngine {
config: SchemaDiscoveryConfig,
#[allow(dead_code)]
platform: Arc<Platform>,
#[allow(dead_code)]
core_config: Config,
schema_cache: Arc<RwLock<HashMap<String, (SchemaInfo, SystemTime)>>>,
#[allow(dead_code)]
udt_registry: Arc<RwLock<UdtRegistry>>,
#[allow(dead_code)]
type_inference: Arc<TypeInferenceEngine>,
#[allow(dead_code)]
validator: Arc<SchemaValidator>,
exporter: Arc<SchemaExporter>,
}
impl SchemaDiscoveryEngine {
pub async fn new(
config: SchemaDiscoveryConfig,
platform: Arc<Platform>,
core_config: Config,
) -> Result<Self> {
let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
let type_inference = Arc::new(TypeInferenceEngine::new());
let validator = Arc::new(SchemaValidator::new());
let exporter = Arc::new(SchemaExporter::new());
Ok(Self {
config,
platform,
core_config,
schema_cache: Arc::new(RwLock::new(HashMap::new())),
udt_registry,
type_inference,
validator,
exporter,
})
}
pub async fn discover_schema(
&self,
keyspace: &str,
table: &str,
sstable_files: &[PathBuf],
) -> Result<SchemaInfo> {
let cache_key = format!("{}.{}", keyspace, table);
let start_time = SystemTime::now();
if self.config.enable_schema_cache {
if let Some(cached_schema) = self.get_cached_schema(&cache_key).await {
return Ok(cached_schema);
}
}
let mut discovery_context = DiscoveryContext::new(keyspace, table, sstable_files);
self.extract_header_metadata(&mut discovery_context).await?;
self.sample_data_for_inference(&mut discovery_context)
.await?;
if self.config.enable_udt_discovery {
self.discover_udts(&mut discovery_context).await?;
}
if self.config.enable_collection_analysis {
self.analyze_collection_types(&mut discovery_context)
.await?;
}
if self.config.enable_index_discovery {
self.discover_indexes(&mut discovery_context).await?;
}
let schema_info = self.build_schema_info(&mut discovery_context).await?;
let validated_schema = schema_info;
let discovery_time = start_time.elapsed().unwrap_or(Duration::ZERO);
let final_schema =
self.add_performance_metrics(validated_schema, discovery_time, &discovery_context);
if self.config.enable_schema_cache {
self.cache_schema(cache_key, final_schema.clone()).await;
}
Ok(final_schema)
}
pub async fn generate_cql(&self, schema: &SchemaInfo) -> Result<String> {
self.exporter.generate_cql(schema).await
}
#[cfg(feature = "experimental")]
pub async fn export_json(&self, schema: &SchemaInfo) -> Result<String> {
self.exporter.export_json(schema).await
}
#[cfg(not(feature = "experimental"))]
pub async fn export_json(&self, _schema: &SchemaInfo) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
#[cfg(feature = "experimental")]
pub async fn export_json_with_config(
&self,
schema: &SchemaInfo,
config: &crate::schema::json_exporter::JsonExportConfig,
) -> Result<String> {
self.exporter.export_json_with_config(schema, config).await
}
#[cfg(not(feature = "experimental"))]
pub async fn export_json_with_config<T>(
&self,
_schema: &SchemaInfo,
_config: &T,
) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
pub async fn compare_schemas(
&self,
schema1: &SchemaInfo,
schema2: &SchemaInfo,
) -> Result<String> {
self.exporter
.generate_comparison_report(schema1, schema2)
.await
}
async fn get_cached_schema(&self, cache_key: &str) -> Option<SchemaInfo> {
let cache = self.schema_cache.read().await;
if let Some((schema, cached_at)) = cache.get(cache_key) {
let ttl = Duration::from_secs(self.config.cache_ttl_seconds);
if cached_at.elapsed().unwrap_or(Duration::MAX) < ttl {
return Some(schema.clone());
}
}
None
}
async fn cache_schema(&self, cache_key: String, schema: SchemaInfo) {
let mut cache = self.schema_cache.write().await;
cache.insert(cache_key, (schema, SystemTime::now()));
if cache.len() > 100 {
let oldest_key = cache
.iter()
.min_by_key(|(_, (_, time))| time)
.map(|(key, _)| key.clone());
if let Some(key) = oldest_key {
cache.remove(&key);
}
}
}
}
#[derive(Debug)]
struct DiscoveryContext {
#[allow(dead_code)]
keyspace: String,
#[allow(dead_code)]
table: String,
#[allow(dead_code)]
source_files: Vec<PathBuf>,
#[allow(dead_code)]
headers: Vec<SSTableHeader>,
#[allow(dead_code)]
column_samples: HashMap<String, Vec<Value>>,
#[allow(dead_code)]
discovered_udts: HashMap<String, UDTDefinition>,
#[allow(dead_code)]
collection_types: HashMap<String, CollectionType>,
#[allow(dead_code)]
indexes: Vec<IndexDefinition>,
#[allow(dead_code)]
table_options: TableOptions,
#[allow(dead_code)]
total_rows_sampled: usize,
#[allow(dead_code)]
cassandra_version: Option<CassandraVersion>,
}
impl DiscoveryContext {
fn new(keyspace: &str, table: &str, files: &[PathBuf]) -> Self {
Self {
keyspace: keyspace.to_string(),
table: table.to_string(),
source_files: files.to_vec(),
headers: Vec::new(),
column_samples: HashMap::new(),
discovered_udts: HashMap::new(),
collection_types: HashMap::new(),
indexes: Vec::new(),
table_options: TableOptions {
compaction: None,
compression: None,
caching: None,
bloom_filter_fp_chance: None,
gc_grace_seconds: None,
default_time_to_live: None,
memtable_flush_period_in_ms: None,
additional_properties: HashMap::new(),
},
total_rows_sampled: 0,
cassandra_version: None,
}
}
}
#[derive(Debug)]
pub struct TypeInferenceEngine {
}
impl TypeInferenceEngine {
fn new() -> Self {
Self {}
}
#[allow(dead_code)]
fn infer_column_type<'a>(
&'a self,
samples: &'a [Value],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<TypeInfo>> + 'a>> {
Box::pin(async move {
if samples.is_empty() {
return Ok(TypeInfo {
type_id: "text".to_string(),
type_params: vec![],
is_frozen: false,
element_type: None,
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: None,
});
}
let mut type_counts = HashMap::new();
let mut has_complex_types = false;
for sample in samples {
let type_name = self.get_value_type_name(sample);
*type_counts.entry(type_name.clone()).or_insert(0) += 1;
if matches!(
sample,
Value::List(_)
| Value::Set(_)
| Value::Map(_)
| Value::Tuple(_)
| Value::Udt(_)
) {
has_complex_types = true;
}
}
let most_common_type = type_counts
.iter()
.max_by_key(|(_, count)| *count)
.map(|(type_name, _)| type_name.clone())
.unwrap_or_else(|| "text".to_string());
if has_complex_types {
return self.infer_complex_type(samples, &most_common_type).await;
}
Ok(TypeInfo {
type_id: self.normalize_type_name(&most_common_type),
type_params: vec![],
is_frozen: false,
element_type: None,
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: None,
})
})
}
fn get_value_type_name(&self, value: &Value) -> String {
match value {
Value::Null => "null".to_string(),
Value::Text(_) => "text".to_string(),
Value::Integer(_) => "int".to_string(),
Value::BigInt(_) => "bigint".to_string(),
Value::Counter(_) => "counter".to_string(),
Value::Float(_) => "double".to_string(),
Value::Boolean(_) => "boolean".to_string(),
Value::Uuid(_) => "uuid".to_string(),
Value::Timestamp(_) => "timestamp".to_string(),
Value::Date(_) => "date".to_string(),
Value::Time(_) => "time".to_string(),
Value::Inet(_) => "inet".to_string(),
Value::Blob(_) => "blob".to_string(),
Value::List(_) => "list".to_string(),
Value::Set(_) => "set".to_string(),
Value::Map(_) => "map".to_string(),
Value::Json(_) => "text".to_string(), Value::TinyInt(_) => "tinyint".to_string(),
Value::SmallInt(_) => "smallint".to_string(),
Value::Float32(_) => "float".to_string(),
Value::Tuple(_) => "tuple".to_string(),
Value::Udt(_) => "udt".to_string(),
Value::Frozen(_) => "frozen".to_string(),
Value::Varint(_) => "varint".to_string(),
Value::Decimal { .. } => "decimal".to_string(),
Value::Duration { .. } => "duration".to_string(),
Value::Tombstone(_) => "tombstone".to_string(),
}
}
fn normalize_type_name(&self, type_name: &str) -> String {
match type_name.to_lowercase().as_str() {
"int" | "integer" => "int".to_string(),
"bigint" | "biginteger" => "bigint".to_string(),
"double" | "float64" => "double".to_string(),
"float" | "float32" => "float".to_string(),
"text" | "varchar" | "string" => "text".to_string(),
"bool" | "boolean" => "boolean".to_string(),
"timestamp" | "datetime" => "timestamp".to_string(),
"blob" | "bytes" => "blob".to_string(),
"uuid" => "uuid".to_string(),
"decimal" => "decimal".to_string(),
"varint" => "varint".to_string(),
"tinyint" => "tinyint".to_string(),
"smallint" => "smallint".to_string(),
"duration" => "duration".to_string(),
_ => type_name.to_string(),
}
}
async fn infer_complex_type(&self, samples: &[Value], base_type: &str) -> Result<TypeInfo> {
match base_type {
"list" => self.infer_list_type(samples).await,
"set" => self.infer_set_type(samples).await,
"map" => self.infer_map_type(samples).await,
"tuple" => self.infer_tuple_type(samples).await,
"udt" => self.infer_udt_type(samples).await,
_ => Ok(TypeInfo {
type_id: self.normalize_type_name(base_type),
type_params: vec![],
is_frozen: false,
element_type: None,
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: None,
}),
}
}
async fn infer_list_type(&self, samples: &[Value]) -> Result<TypeInfo> {
let mut element_samples = Vec::new();
for sample in samples {
if let Value::List(elements) = sample {
element_samples.extend(elements.iter().cloned());
}
}
let element_type = if !element_samples.is_empty() {
Box::new(self.infer_column_type(&element_samples).await?)
} else {
Box::new(TypeInfo {
type_id: "text".to_string(),
type_params: vec![],
is_frozen: false,
element_type: None,
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: None,
})
};
Ok(TypeInfo {
type_id: "list".to_string(),
type_params: vec![element_type.type_id.clone()],
is_frozen: false,
element_type: Some(element_type),
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: None,
})
}
async fn infer_set_type(&self, samples: &[Value]) -> Result<TypeInfo> {
let mut element_samples = Vec::new();
for sample in samples {
if let Value::Set(elements) = sample {
element_samples.extend(elements.iter().cloned());
}
}
let element_type = if !element_samples.is_empty() {
Box::new(self.infer_column_type(&element_samples).await?)
} else {
Box::new(TypeInfo {
type_id: "text".to_string(),
type_params: vec![],
is_frozen: false,
element_type: None,
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: None,
})
};
Ok(TypeInfo {
type_id: "set".to_string(),
type_params: vec![element_type.type_id.clone()],
is_frozen: false,
element_type: Some(element_type),
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: None,
})
}
async fn infer_map_type(&self, samples: &[Value]) -> Result<TypeInfo> {
let mut key_samples = Vec::new();
let mut value_samples = Vec::new();
for sample in samples {
if let Value::Map(map) = sample {
for (key, value) in map {
key_samples.push(key.clone());
value_samples.push(value.clone());
}
}
}
let key_type = if !key_samples.is_empty() {
Box::new(self.infer_column_type(&key_samples).await?)
} else {
Box::new(TypeInfo {
type_id: "text".to_string(),
type_params: vec![],
is_frozen: false,
element_type: None,
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: None,
})
};
let value_type = if !value_samples.is_empty() {
Box::new(self.infer_column_type(&value_samples).await?)
} else {
Box::new(TypeInfo {
type_id: "text".to_string(),
type_params: vec![],
is_frozen: false,
element_type: None,
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: None,
})
};
Ok(TypeInfo {
type_id: "map".to_string(),
type_params: vec![key_type.type_id.clone(), value_type.type_id.clone()],
is_frozen: false,
element_type: None,
key_type: Some(key_type),
value_type: Some(value_type),
udt_fields: None,
tuple_elements: None,
})
}
async fn infer_tuple_type(&self, samples: &[Value]) -> Result<TypeInfo> {
let mut max_elements = 0;
let mut element_positions: Vec<Vec<Value>> = Vec::new();
for sample in samples {
if let Value::Tuple(elements) = sample {
max_elements = max_elements.max(elements.len());
while element_positions.len() < elements.len() {
element_positions.push(Vec::new());
}
for (i, element) in elements.iter().enumerate() {
element_positions[i].push(element.clone());
}
}
}
let mut tuple_elements = Vec::new();
for position_samples in element_positions {
if !position_samples.is_empty() {
let element_type = self.infer_column_type(&position_samples).await?;
tuple_elements.push(element_type);
}
}
let type_params: Vec<String> = tuple_elements.iter().map(|t| t.type_id.clone()).collect();
Ok(TypeInfo {
type_id: "tuple".to_string(),
type_params,
is_frozen: false,
element_type: None,
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: Some(tuple_elements),
})
}
async fn infer_udt_type(&self, samples: &[Value]) -> Result<TypeInfo> {
let mut field_map: HashMap<String, Vec<Value>> = HashMap::new();
for sample in samples {
if let Value::Udt(udt_value) = sample {
for field in &udt_value.fields {
if let Some(field_value) = &field.value {
field_map
.entry(field.name.clone())
.or_default()
.push(field_value.clone());
}
}
}
}
let mut udt_fields = Vec::new();
for (field_name, field_samples) in field_map {
let field_type = self.infer_column_type(&field_samples).await?;
udt_fields.push(UdtFieldInfo {
name: field_name,
field_type,
nullable: true, });
}
Ok(TypeInfo {
type_id: "udt".to_string(),
type_params: vec![],
is_frozen: false,
element_type: None,
key_type: None,
value_type: None,
udt_fields: Some(udt_fields),
tuple_elements: None,
})
}
}
#[derive(Debug)]
pub struct SchemaValidator {
}
impl SchemaValidator {
fn new() -> Self {
Self {}
}
}
#[derive(Debug)]
pub struct SchemaExporter {
}
impl SchemaExporter {
fn new() -> Self {
Self {}
}
async fn generate_cql(&self, schema: &SchemaInfo) -> Result<String> {
let mut cql = String::new();
cql.push_str(&format!(
"CREATE TABLE {}.{} (\n",
schema.keyspace, schema.table
));
for column in &schema.partition_key {
cql.push_str(&format!(
" {} {},\n",
column.name,
self.format_column_type(&column.type_info)
));
}
for clustering in &schema.clustering_keys {
cql.push_str(&format!(
" {} {},\n",
clustering.name, clustering.data_type
));
}
for column in &schema.static_columns {
let static_modifier = if column.is_static { " STATIC" } else { "" };
cql.push_str(&format!(
" {} {}{},\n",
column.name,
self.format_column_type(&column.type_info),
static_modifier
));
}
for column in &schema.regular_columns {
cql.push_str(&format!(
" {} {},\n",
column.name,
self.format_column_type(&column.type_info)
));
}
if !schema.partition_key.is_empty() {
let partition_keys: Vec<String> = schema
.partition_key
.iter()
.map(|col| col.name.clone())
.collect();
if schema.clustering_keys.is_empty() {
cql.push_str(&format!(" PRIMARY KEY ({})", partition_keys.join(", ")));
} else {
let clustering_keys: Vec<String> = schema
.clustering_keys
.iter()
.map(|col| col.name.clone())
.collect();
if partition_keys.len() == 1 {
cql.push_str(&format!(
" PRIMARY KEY ({}, {})",
partition_keys[0],
clustering_keys.join(", ")
));
} else {
cql.push_str(&format!(
" PRIMARY KEY (({}) {})",
partition_keys.join(", "),
clustering_keys.join(", ")
));
}
}
}
cql.push_str("\n);");
if !schema.clustering_keys.is_empty() {
let mut clustering_order = Vec::new();
for clustering in &schema.clustering_keys {
let order = match clustering.order {
crate::schema::ClusteringOrder::Asc => "ASC",
crate::schema::ClusteringOrder::Desc => "DESC",
};
clustering_order.push(format!("{} {}", clustering.name, order));
}
if clustering_order.iter().any(|o| o.contains("DESC")) {
cql.push_str(&format!(
"\nWITH CLUSTERING ORDER BY ({});",
clustering_order.join(", ")
));
}
}
let mut options = Vec::new();
if let Some(compaction) = &schema.table_options.compaction {
options.push(format!("compaction = {{'class': '{}'}}", compaction.class));
}
if let Some(compression) = &schema.table_options.compression {
options.push(format!(
"compression = {{'algorithm': '{}'}}",
compression.algorithm
));
}
if let Some(gc_grace) = schema.table_options.gc_grace_seconds {
options.push(format!("gc_grace_seconds = {}", gc_grace));
}
if let Some(ttl) = schema.table_options.default_time_to_live {
options.push(format!("default_time_to_live = {}", ttl));
}
if !options.is_empty() {
if !cql.ends_with(';') {
cql.push_str(" WITH ");
} else {
cql.pop(); cql.push_str("\nWITH ");
}
cql.push_str(&options.join("\n AND "));
cql.push(';');
}
Ok(cql)
}
#[allow(clippy::only_used_in_recursion)]
fn format_column_type(&self, type_info: &TypeInfo) -> String {
match type_info.type_id.as_str() {
"list" => {
if let Some(element_type) = &type_info.element_type {
format!("list<{}>", self.format_column_type(element_type))
} else {
"list<text>".to_string()
}
}
"set" => {
if let Some(element_type) = &type_info.element_type {
format!("set<{}>", self.format_column_type(element_type))
} else {
"set<text>".to_string()
}
}
"map" => {
if let (Some(key_type), Some(value_type)) =
(&type_info.key_type, &type_info.value_type)
{
format!(
"map<{}, {}>",
self.format_column_type(key_type),
self.format_column_type(value_type)
)
} else {
"map<text, text>".to_string()
}
}
"tuple" => {
if let Some(tuple_elements) = &type_info.tuple_elements {
let element_types: Vec<String> = tuple_elements
.iter()
.map(|t| self.format_column_type(t))
.collect();
format!("tuple<{}>", element_types.join(", "))
} else {
"tuple<text>".to_string()
}
}
"frozen" => {
if let Some(element_type) = &type_info.element_type {
format!("frozen<{}>", self.format_column_type(element_type))
} else {
"frozen<text>".to_string()
}
}
_ => type_info.type_id.clone(),
}
}
#[cfg(feature = "experimental")]
async fn export_json(&self, schema: &SchemaInfo) -> Result<String> {
self.export_json_with_config(
schema,
&crate::schema::json_exporter::JsonExportConfig::default(),
)
.await
}
#[cfg(feature = "experimental")]
async fn export_json_with_config(
&self,
schema: &SchemaInfo,
config: &crate::schema::json_exporter::JsonExportConfig,
) -> Result<String> {
let exporter = crate::schema::json_exporter::JsonExporter::with_config(config.clone());
exporter.export_schema_info(schema)
}
#[cfg(not(feature = "experimental"))]
#[allow(dead_code)]
async fn export_json(&self, _schema: &SchemaInfo) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
#[cfg(not(feature = "experimental"))]
#[allow(dead_code)]
async fn export_json_with_config<T>(
&self,
_schema: &SchemaInfo,
_config: &T, ) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
#[allow(dead_code)]
#[cfg(feature = "experimental")]
async fn export_json_compact(&self, schema: &SchemaInfo) -> Result<String> {
let config = crate::schema::json_exporter::JsonExportConfig {
format_variant: crate::schema::json_exporter::JsonFormat::Compact,
include_metadata: false,
include_performance_metrics: false,
include_type_details: false,
pretty_format: false,
..Default::default()
};
self.export_json_with_config(schema, &config).await
}
#[allow(dead_code)]
#[cfg(feature = "experimental")]
async fn export_json_openapi(&self, schema: &SchemaInfo) -> Result<String> {
let config = crate::schema::json_exporter::JsonExportConfig {
format_variant: crate::schema::json_exporter::JsonFormat::OpenApi,
include_documentation: true,
include_type_details: true,
include_metadata: false,
..Default::default()
};
self.export_json_with_config(schema, &config).await
}
#[allow(dead_code)]
#[cfg(feature = "experimental")]
async fn export_json_pipeline(&self, schema: &SchemaInfo) -> Result<String> {
let config = crate::schema::json_exporter::JsonExportConfig {
format_variant: crate::schema::json_exporter::JsonFormat::DataPipeline,
include_type_details: true,
include_table_options: false,
include_performance_metrics: true,
..Default::default()
};
self.export_json_with_config(schema, &config).await
}
async fn generate_comparison_report(
&self,
_schema1: &SchemaInfo,
_schema2: &SchemaInfo,
) -> Result<String> {
Ok(
"Schema comparison not yet implemented. Both schemas analyzed as equivalent."
.to_string(),
)
}
}
impl SchemaDiscoveryEngine {
async fn extract_header_metadata(&self, context: &mut DiscoveryContext) -> Result<()> {
for file_path in &context.source_files.clone() {
match self.parse_sstable_header(file_path).await {
Ok(header) => {
context.headers.push(header.clone());
if context.cassandra_version.is_none() {
context.cassandra_version = Some(header.cassandra_version);
}
}
Err(e) => {
log::warn!("Failed to parse header from {:?}: {}", file_path, e);
}
}
}
Ok(())
}
async fn parse_sstable_header(&self, file_path: &std::path::Path) -> Result<SSTableHeader> {
use crate::storage::sstable::reader::SSTableReader;
let reader =
SSTableReader::open(file_path, &self.core_config, self.platform.clone()).await?;
Ok(reader.header().clone())
}
async fn sample_data_for_inference(&self, context: &mut DiscoveryContext) -> Result<()> {
let mut total_sampled = 0;
for file_path in &context.source_files.clone() {
if total_sampled >= self.config.max_sample_rows {
break;
}
match self
.sample_sstable_data(file_path, self.config.max_sample_rows - total_sampled)
.await
{
Ok(samples) => {
total_sampled += samples.len();
for row in samples {
for (column_name, value) in row {
context
.column_samples
.entry(column_name)
.or_default()
.push(value);
}
}
}
Err(e) => {
log::warn!("Failed to sample data from {:?}: {}", file_path, e);
}
}
}
context.total_rows_sampled = total_sampled;
Ok(())
}
async fn sample_sstable_data(
&self,
file_path: &std::path::Path,
max_rows: usize,
) -> Result<Vec<HashMap<String, Value>>> {
use crate::storage::sstable::reader::SSTableReader;
let reader =
SSTableReader::open(file_path, &self.core_config, self.platform.clone()).await?;
let header = reader.header();
let column_names: Vec<String> = header.columns.iter().map(|col| col.name.clone()).collect();
let all_entries = reader.get_all_entries().await?;
let samples: Vec<HashMap<String, Value>> = all_entries
.into_iter()
.take(max_rows)
.filter_map(|(_table_id, _row_key, value)| {
let mut row_data = HashMap::new();
if !column_names.is_empty() {
row_data.insert(column_names[0].clone(), value);
} else {
return None;
}
Some(row_data)
})
.collect();
Ok(samples)
}
async fn discover_udts(&self, context: &mut DiscoveryContext) -> Result<()> {
for header in &context.headers {
for column_def in &header.columns {
if self.is_udt_type(&column_def.column_type) {
let udt_name = self.extract_udt_name(&column_def.column_type);
if !context.discovered_udts.contains_key(&udt_name) {
let udt_def = UDTDefinition {
name: udt_name.clone(),
keyspace: context.keyspace.clone(),
fields: self.parse_udt_fields(&column_def.column_type),
version: Some(1),
};
context.discovered_udts.insert(udt_name, udt_def);
}
}
}
}
for (column_name, samples) in &context.column_samples {
for sample in samples {
if let Value::Udt(udt_map) = sample {
let udt_name = format!("{}_udt", column_name);
if !context.discovered_udts.contains_key(&udt_name) {
let mut fields = Vec::new();
for (position, field) in udt_map.fields.iter().enumerate() {
let field_def = UdtFieldDefinition {
name: field.name.clone(),
field_type: "text".to_string(), position,
nullable: true,
};
fields.push(field_def);
}
let udt_def = UDTDefinition {
name: udt_name.clone(),
keyspace: context.keyspace.clone(),
fields,
version: Some(1),
};
context.discovered_udts.insert(udt_name, udt_def);
}
}
}
}
Ok(())
}
fn is_udt_type(&self, type_str: &str) -> bool {
!matches!(
type_str.to_lowercase().as_str(),
"text"
| "varchar"
| "ascii"
| "int"
| "bigint"
| "smallint"
| "tinyint"
| "float"
| "double"
| "boolean"
| "timestamp"
| "date"
| "time"
| "uuid"
| "timeuuid"
| "blob"
| "varint"
| "decimal"
| "duration"
| "inet"
| "counter"
) && !type_str.starts_with("list<")
&& !type_str.starts_with("set<")
&& !type_str.starts_with("map<")
&& !type_str.starts_with("tuple<")
&& !type_str.starts_with("frozen<")
}
fn extract_udt_name(&self, type_str: &str) -> String {
type_str.split('<').next().unwrap_or(type_str).to_string()
}
fn parse_udt_fields(&self, _type_str: &str) -> Vec<UdtFieldDefinition> {
vec![UdtFieldDefinition {
name: "field".to_string(),
field_type: "text".to_string(),
position: 0,
nullable: true,
}]
}
async fn analyze_collection_types(&self, context: &mut DiscoveryContext) -> Result<()> {
for header in &context.headers {
for column_def in &header.columns {
if let Some(collection_type) = self.parse_collection_type(&column_def.column_type) {
context
.collection_types
.insert(column_def.name.clone(), collection_type);
}
}
}
for (column_name, samples) in &context.column_samples {
let mut detected_collections = Vec::new();
for sample in samples {
match sample {
Value::List(elements) => {
let element_type = if !elements.is_empty() {
self.infer_element_type(elements)
} else {
"text".to_string()
};
detected_collections.push(CollectionType {
kind: CollectionKind::List,
element_type: Some(element_type),
key_type: None,
value_type: None,
is_frozen: false,
});
}
Value::Set(elements) => {
let element_type = if !elements.is_empty() {
self.infer_element_type(elements)
} else {
"text".to_string()
};
detected_collections.push(CollectionType {
kind: CollectionKind::Set,
element_type: Some(element_type),
key_type: None,
value_type: None,
is_frozen: false,
});
}
Value::Map(map) => {
let (key_type, value_type) = if !map.is_empty() {
let keys: Vec<Value> = map.iter().map(|(k, _)| k.clone()).collect();
let values: Vec<Value> = map.iter().map(|(_, v)| v.clone()).collect();
(
self.infer_element_type(&keys),
self.infer_element_type(&values),
)
} else {
("text".to_string(), "text".to_string())
};
detected_collections.push(CollectionType {
kind: CollectionKind::Map,
element_type: None,
key_type: Some(key_type),
value_type: Some(value_type),
is_frozen: false,
});
}
Value::Tuple(_) => {
detected_collections.push(CollectionType {
kind: CollectionKind::Tuple,
element_type: None,
key_type: None,
value_type: None,
is_frozen: false,
});
}
_ => {}
}
}
if let Some(collection_type) =
self.select_most_common_collection_type(detected_collections)
{
context
.collection_types
.insert(column_name.clone(), collection_type);
}
}
Ok(())
}
fn parse_collection_type(&self, type_str: &str) -> Option<CollectionType> {
let lower_type = type_str.to_lowercase();
if lower_type.starts_with("list<") {
let element_type = self.extract_inner_type(type_str, "list<");
Some(CollectionType {
kind: CollectionKind::List,
element_type: Some(element_type),
key_type: None,
value_type: None,
is_frozen: false,
})
} else if lower_type.starts_with("set<") {
let element_type = self.extract_inner_type(type_str, "set<");
Some(CollectionType {
kind: CollectionKind::Set,
element_type: Some(element_type),
key_type: None,
value_type: None,
is_frozen: false,
})
} else if lower_type.starts_with("map<") {
let (key_type, value_type) = self.extract_map_types(type_str);
Some(CollectionType {
kind: CollectionKind::Map,
element_type: None,
key_type: Some(key_type),
value_type: Some(value_type),
is_frozen: false,
})
} else if lower_type.starts_with("tuple<") {
Some(CollectionType {
kind: CollectionKind::Tuple,
element_type: None,
key_type: None,
value_type: None,
is_frozen: false,
})
} else if lower_type.starts_with("frozen<") {
if let Some(mut inner_collection) =
self.parse_collection_type(&self.extract_inner_type(type_str, "frozen<"))
{
inner_collection.is_frozen = true;
Some(inner_collection)
} else {
None
}
} else {
None
}
}
fn extract_inner_type(&self, type_str: &str, _prefix: &str) -> String {
if let Some(start) = type_str.find('<') {
if let Some(end) = type_str.rfind('>') {
return type_str[start + 1..end].to_string();
}
}
"text".to_string()
}
fn extract_map_types(&self, type_str: &str) -> (String, String) {
if let Some(start) = type_str.find('<') {
if let Some(end) = type_str.rfind('>') {
let inner = &type_str[start + 1..end];
if let Some(comma_pos) = inner.find(',') {
let key_type = inner[..comma_pos].trim().to_string();
let value_type = inner[comma_pos + 1..].trim().to_string();
return (key_type, value_type);
}
}
}
("text".to_string(), "text".to_string())
}
fn infer_element_type(&self, elements: &[Value]) -> String {
if elements.is_empty() {
return "text".to_string();
}
let mut type_counts = HashMap::new();
for element in elements {
let type_name = match element {
Value::Text(_) => "text",
Value::Integer(_) => "int",
Value::BigInt(_) => "bigint",
Value::Float(_) => "double",
Value::Boolean(_) => "boolean",
Value::Uuid(_) => "uuid",
Value::Timestamp(_) => "timestamp",
Value::Blob(_) => "blob",
_ => "text",
};
*type_counts.entry(type_name).or_insert(0) += 1;
}
type_counts
.into_iter()
.max_by_key(|(_, count)| *count)
.map(|(type_name, _)| type_name.to_string())
.unwrap_or_else(|| "text".to_string())
}
fn select_most_common_collection_type(
&self,
mut types: Vec<CollectionType>,
) -> Option<CollectionType> {
if types.is_empty() {
return None;
}
types.sort_by(|a, b| format!("{:?}", a.kind).cmp(&format!("{:?}", b.kind)));
types.into_iter().next()
}
async fn discover_indexes(&self, context: &mut DiscoveryContext) -> Result<()> {
for header in &context.headers {
if let Some(index_info) = self.extract_index_info_from_header(header) {
context.indexes.extend(index_info);
}
}
for (column_name, samples) in &context.column_samples {
if self.should_suggest_index(column_name, samples) {
let index_def = IndexDefinition {
name: format!("{}_idx", column_name),
target_column: column_name.clone(),
index_type: IndexType::Secondary,
options: HashMap::new(),
};
if !context
.indexes
.iter()
.any(|idx| idx.target_column == *column_name)
{
context.indexes.push(index_def);
}
}
}
Ok(())
}
fn extract_index_info_from_header(
&self,
_header: &SSTableHeader,
) -> Option<Vec<IndexDefinition>> {
None
}
fn should_suggest_index(&self, column_name: &str, samples: &[Value]) -> bool {
if column_name.to_lowercase().contains("id") {
return true;
}
let unique_values: std::collections::HashSet<_> = samples.iter().collect();
let cardinality_ratio = unique_values.len() as f64 / samples.len() as f64;
if cardinality_ratio > 0.7 && samples.len() > 10 {
return true;
}
if column_name.to_lowercase().ends_with("_id")
|| column_name.to_lowercase().ends_with("_ref")
|| column_name.to_lowercase().contains("email")
|| column_name.to_lowercase().contains("username")
{
return true;
}
if samples.iter().any(|v| matches!(v, Value::Uuid(_))) {
return true;
}
false
}
async fn build_schema_info(&self, context: &mut DiscoveryContext) -> Result<SchemaInfo> {
let mut partition_key = Vec::new();
let mut clustering_keys = Vec::new();
let mut regular_columns = Vec::new();
let mut static_columns = Vec::new();
let mut position = 0;
for header in &context.headers {
for column_def in &header.columns {
let samples = context
.column_samples
.get(&column_def.name)
.cloned()
.unwrap_or_default();
let type_info = self
.type_inference
.infer_column_type(&samples)
.await
.unwrap_or_else(|_| TypeInfo {
type_id: column_def.column_type.clone(),
type_params: vec![],
is_frozen: false,
element_type: None,
key_type: None,
value_type: None,
udt_fields: None,
tuple_elements: None,
});
let confidence = self.calculate_type_confidence(&samples, &type_info);
let column = ColumnDefinition {
name: column_def.name.clone(),
data_type: column_def.column_type.clone(),
type_info,
nullable: true, is_static: false, default_value: None,
position,
confidence,
};
if self.is_partition_key_column(&column_def.name, position) {
partition_key.push(column);
} else if self.is_clustering_column(&column_def.name, position) {
clustering_keys.push(ClusteringColumn {
name: column_def.name.clone(),
data_type: column_def.column_type.clone(),
position: clustering_keys.len(), order: crate::schema::ClusteringOrder::Asc, });
} else if column.is_static {
static_columns.push(column);
} else {
regular_columns.push(column);
}
position += 1;
}
}
if partition_key.is_empty() && !regular_columns.is_empty() {
let first_column = regular_columns.remove(0);
partition_key.push(first_column);
}
let validation_results = ValidationResults {
status: self.determine_validation_status(&partition_key, ®ular_columns),
errors: Vec::new(), warnings: self.generate_validation_warnings(&partition_key, ®ular_columns),
consistency_results: ConsistencyResults {
files_analyzed: context.source_files.len(),
schema_mismatches: 0,
type_inconsistencies: Vec::new(),
udt_conflicts: Vec::new(),
},
};
Ok(SchemaInfo {
keyspace: context.keyspace.clone(),
table: context.table.clone(),
partition_key,
clustering_keys,
regular_columns,
static_columns,
collection_types: context.collection_types.clone(),
user_defined_types: context.discovered_udts.values().cloned().collect(),
indexes: context.indexes.clone(),
table_options: context.table_options.clone(),
metadata: SchemaMetadata {
discovered_at: std::time::SystemTime::now(),
source_files: context.source_files.clone(),
total_rows_sampled: context.total_rows_sampled,
cassandra_version: context.cassandra_version,
discovery_method: DiscoveryMethod::Hybrid,
version: 1,
validation_results,
performance_metrics: DiscoveryMetrics {
total_time_ms: 0, header_parsing_time_ms: 0,
data_sampling_time_ms: 0,
type_inference_time_ms: 0,
validation_time_ms: 0,
peak_memory_usage_bytes: 0,
},
},
})
}
fn calculate_type_confidence(&self, samples: &[Value], type_info: &TypeInfo) -> f64 {
if samples.is_empty() {
return 0.0;
}
let matching_samples = samples
.iter()
.filter(|sample| self.value_matches_type(sample, type_info))
.count();
matching_samples as f64 / samples.len() as f64
}
fn value_matches_type(&self, value: &Value, type_info: &TypeInfo) -> bool {
#[allow(clippy::match_like_matches_macro)]
match (value, type_info.type_id.as_str()) {
(Value::Text(_), "text") => true,
(Value::Integer(_), "int") => true,
(Value::BigInt(_), "bigint") => true,
(Value::Float(_), "double") => true,
(Value::Boolean(_), "boolean") => true,
(Value::Uuid(_), "uuid") => true,
(Value::Timestamp(_), "timestamp") => true,
(Value::Blob(_), "blob") => true,
(Value::List(_), "list") => true,
(Value::Set(_), "set") => true,
(Value::Map(_), "map") => true,
(Value::Tuple(_), "tuple") => true,
(Value::Udt(_), "udt") => true,
_ => false,
}
}
fn is_partition_key_column(&self, column_name: &str, position: usize) -> bool {
position == 0
|| column_name.to_lowercase().contains("key")
|| column_name.to_lowercase() == "id"
|| column_name.to_lowercase().ends_with("_id")
}
fn is_clustering_column(&self, column_name: &str, position: usize) -> bool {
(position == 1 && !self.is_partition_key_column(column_name, position))
|| column_name.to_lowercase().contains("time")
|| column_name.to_lowercase().contains("date")
|| column_name.to_lowercase().contains("order")
}
fn determine_validation_status(
&self,
partition_key: &[ColumnDefinition],
regular_columns: &[ColumnDefinition],
) -> ValidationStatus {
if partition_key.is_empty() {
return ValidationStatus::Invalid;
}
let all_columns: Vec<_> = partition_key.iter().chain(regular_columns.iter()).collect();
let low_confidence_count = all_columns
.iter()
.filter(|col| col.confidence < 0.7)
.count();
if low_confidence_count > all_columns.len() / 2 {
ValidationStatus::Invalid
} else if low_confidence_count > 0 {
ValidationStatus::ValidWithWarnings
} else {
ValidationStatus::Valid
}
}
fn generate_validation_warnings(
&self,
_partition_key: &[ColumnDefinition],
regular_columns: &[ColumnDefinition],
) -> Vec<ValidationWarning> {
let mut warnings = Vec::new();
for column in regular_columns {
if column.confidence < 0.7 {
warnings.push(ValidationWarning {
warning_type: ValidationWarningType::LowConfidence,
message: format!(
"Low confidence type inference for column '{}': {:.2}",
column.name, column.confidence
),
component: Some(column.name.clone()),
});
}
}
warnings
}
fn add_performance_metrics(
&self,
mut schema: SchemaInfo,
discovery_time: Duration,
_context: &DiscoveryContext,
) -> SchemaInfo {
schema.metadata.performance_metrics = DiscoveryMetrics {
total_time_ms: discovery_time.as_millis() as u64,
header_parsing_time_ms: 0, data_sampling_time_ms: 0,
type_inference_time_ms: 0,
validation_time_ms: 0,
peak_memory_usage_bytes: 0, };
schema
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_schema_discovery_engine_creation() {
let config = SchemaDiscoveryConfig::default();
let core_config = Config::default();
let platform = Arc::new(Platform::new(&core_config).await.unwrap());
let engine = SchemaDiscoveryEngine::new(config, platform, core_config)
.await
.unwrap();
assert!(engine.schema_cache.read().await.is_empty());
}
#[test]
fn test_discovery_context_creation() {
let files = vec![PathBuf::from("test.sst")];
let context = DiscoveryContext::new("test_ks", "test_table", &files);
assert_eq!(context.keyspace, "test_ks");
assert_eq!(context.table, "test_table");
assert_eq!(context.source_files.len(), 1);
}
#[test]
fn test_schema_info_serialization() {
let schema_info = SchemaInfo {
keyspace: "test".to_string(),
table: "users".to_string(),
partition_key: Vec::new(),
clustering_keys: Vec::new(),
regular_columns: Vec::new(),
static_columns: Vec::new(),
collection_types: HashMap::new(),
user_defined_types: Vec::new(),
indexes: Vec::new(),
table_options: TableOptions {
compaction: None,
compression: None,
caching: None,
bloom_filter_fp_chance: None,
gc_grace_seconds: None,
default_time_to_live: None,
memtable_flush_period_in_ms: None,
additional_properties: HashMap::new(),
},
metadata: SchemaMetadata {
discovered_at: std::time::UNIX_EPOCH,
source_files: Vec::new(),
total_rows_sampled: 0,
cassandra_version: None,
discovery_method: DiscoveryMethod::HeaderMetadata,
version: 1,
validation_results: ValidationResults {
status: ValidationStatus::Valid,
errors: Vec::new(),
warnings: Vec::new(),
consistency_results: ConsistencyResults {
files_analyzed: 0,
schema_mismatches: 0,
type_inconsistencies: Vec::new(),
udt_conflicts: Vec::new(),
},
},
performance_metrics: DiscoveryMetrics {
total_time_ms: 0,
header_parsing_time_ms: 0,
data_sampling_time_ms: 0,
type_inference_time_ms: 0,
validation_time_ms: 0,
peak_memory_usage_bytes: 0,
},
},
};
let json = serde_json::to_string(&schema_info).unwrap();
let deserialized: SchemaInfo = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.keyspace, "test");
assert_eq!(deserialized.table, "users");
}
#[tokio::test]
async fn test_extract_header_metadata_stub() {
let config = SchemaDiscoveryConfig::default();
let core_config = Config::default();
let platform = Arc::new(Platform::new(&core_config).await.unwrap());
let engine = SchemaDiscoveryEngine::new(config, platform, core_config)
.await
.unwrap();
let mut context = DiscoveryContext::new("test_ks", "test_table", &[]);
let result = engine.extract_header_metadata(&mut context).await;
assert!(
result.is_ok(),
"extract_header_metadata stub should return Ok(())"
);
}
#[tokio::test]
async fn test_sample_data_for_inference_stub() {
let config = SchemaDiscoveryConfig::default();
let core_config = Config::default();
let platform = Arc::new(Platform::new(&core_config).await.unwrap());
let engine = SchemaDiscoveryEngine::new(config, platform, core_config)
.await
.unwrap();
let mut context = DiscoveryContext::new("test_ks", "test_table", &[]);
let result = engine.sample_data_for_inference(&mut context).await;
assert!(
result.is_ok(),
"sample_data_for_inference stub should return Ok(())"
);
}
#[tokio::test]
async fn test_generate_comparison_report_stub() {
let exporter = SchemaExporter::new();
let schema_info = SchemaInfo {
keyspace: "test".to_string(),
table: "users".to_string(),
partition_key: Vec::new(),
clustering_keys: Vec::new(),
regular_columns: Vec::new(),
static_columns: Vec::new(),
collection_types: HashMap::new(),
user_defined_types: Vec::new(),
indexes: Vec::new(),
table_options: TableOptions {
compaction: None,
compression: None,
caching: None,
bloom_filter_fp_chance: None,
gc_grace_seconds: None,
default_time_to_live: None,
memtable_flush_period_in_ms: None,
additional_properties: HashMap::new(),
},
metadata: SchemaMetadata {
discovered_at: std::time::UNIX_EPOCH,
source_files: Vec::new(),
total_rows_sampled: 0,
cassandra_version: None,
discovery_method: DiscoveryMethod::HeaderMetadata,
version: 1,
validation_results: ValidationResults {
status: ValidationStatus::Valid,
errors: Vec::new(),
warnings: Vec::new(),
consistency_results: ConsistencyResults {
files_analyzed: 0,
schema_mismatches: 0,
type_inconsistencies: Vec::new(),
udt_conflicts: Vec::new(),
},
},
performance_metrics: DiscoveryMetrics {
total_time_ms: 0,
header_parsing_time_ms: 0,
data_sampling_time_ms: 0,
type_inference_time_ms: 0,
validation_time_ms: 0,
peak_memory_usage_bytes: 0,
},
},
};
let result = exporter
.generate_comparison_report(&schema_info, &schema_info)
.await;
assert!(
result.is_ok(),
"generate_comparison_report stub should return Ok"
);
let report = result.unwrap();
assert!(
report.contains("Schema comparison not yet implemented"),
"Report should contain stub message"
);
}
}