use crate::cli_types::SchemaCommands;
use anyhow::{Context, Result};
use cqlite_core::{
schema::{
parse_cql_schema, AggregatorConfig, ClusteringColumn, ClusteringOrder, Column, KeyColumn,
SchemaAggregator, TableSchema,
},
Database,
};
use serde_json;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
#[cfg(feature = "state_machine")]
pub async fn handle_schema_command(database: &Database, command: SchemaCommands) -> Result<()> {
match command {
SchemaCommands::List => list_tables(database).await,
SchemaCommands::Describe { table } => describe_table(database, &table).await,
SchemaCommands::Create { schema } => create_table_from_file(database, &schema).await,
SchemaCommands::Drop { table, force } => drop_table(database, &table, force).await,
SchemaCommands::Load { paths } => load_schemas(database, &paths).await,
}
}
#[cfg(not(feature = "state_machine"))]
pub async fn handle_schema_command(_database: &Database, _command: SchemaCommands) -> Result<()> {
Err(anyhow::anyhow!(
"Schema commands requiring query execution are not available in M1.\n\
Build with --features state_machine or use SSTableReader directly.\n\
See CLAUDE.md for M1 API examples."
))
}
#[cfg(feature = "state_machine")]
#[allow(dead_code)]
async fn list_tables(_database: &Database) -> Result<()> {
println!("Tables in database:");
println!("- users");
println!("- orders");
println!("- products");
println!("\nNote: Table listing not yet implemented");
Ok(())
}
#[cfg(feature = "state_machine")]
#[allow(dead_code)]
async fn describe_table(_database: &Database, table: &str) -> Result<()> {
println!("Describing table '{}'", table);
println!("Columns:");
println!("- id: UUID (primary key)");
println!("- name: TEXT");
println!("- created_at: TIMESTAMP");
println!("\nNote: Table description not yet implemented");
Ok(())
}
#[cfg(feature = "state_machine")]
async fn create_table_from_file(database: &Database, file: &Path) -> Result<()> {
println!("Creating table from DDL file: {}", file.display());
let ddl_content = std::fs::read_to_string(file)
.with_context(|| format!("Failed to read DDL file: {}", file.display()))?;
match database.execute(&ddl_content).await {
Ok(result) => {
println!("Table created successfully");
if result.rows_affected > 0 {
println!("Rows affected: {}", result.rows_affected);
}
}
Err(e) => {
println!("Failed to create table: {}", e);
return Err(anyhow::anyhow!("Table creation failed: {}", e));
}
}
Ok(())
}
#[cfg(feature = "state_machine")]
async fn drop_table(database: &Database, table: &str, force: bool) -> Result<()> {
if !force {
println!("Are you sure you want to drop table '{}'? [y/N]", table);
let mut input = String::new();
std::io::stdin().read_line(&mut input)?;
if !input.trim().to_lowercase().starts_with('y') {
println!("Table drop cancelled");
return Ok(());
}
} else {
println!("Force dropping table '{}'", table);
}
let drop_sql = format!("DROP TABLE {}", table);
match database.execute(&drop_sql).await {
Ok(result) => {
println!("Table '{}' dropped successfully", table);
if result.rows_affected > 0 {
println!("Rows affected: {}", result.rows_affected);
}
}
Err(e) => {
println!("Failed to drop table: {}", e);
return Err(anyhow::anyhow!("Table drop failed: {}", e));
}
}
Ok(())
}
#[cfg(feature = "state_machine")]
async fn load_schemas(_database: &Database, paths: &[PathBuf]) -> Result<()> {
use cqlite_core::{
platform::Platform,
schema::{
registry::{SchemaRegistry, SchemaRegistryConfig},
UdtRegistry,
},
Config,
};
use std::sync::Arc;
use tokio::sync::RwLock;
println!("Loading schemas from {} paths...", paths.len());
let config = Config::default();
let platform = Arc::new(
Platform::new(&config)
.await
.context("Failed to initialize platform")?,
);
let registry_config = SchemaRegistryConfig::default();
let schema_registry = Arc::new(RwLock::new(
SchemaRegistry::new(registry_config, platform, config.clone())
.await
.context("Failed to create schema registry")?,
));
let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
let aggregator_config = AggregatorConfig {
graceful_degradation: true,
validate_udt_dependencies: true,
};
let mut aggregator = SchemaAggregator::new(
schema_registry.clone(),
udt_registry.clone(),
aggregator_config,
);
let result = aggregator
.load_from_paths(paths)
.await
.context("Failed to load schemas")?;
if !result.errors.is_empty() {
eprintln!("\nErrors encountered during schema loading:");
for error in &result.errors {
if let Some(path) = &error.file_path {
eprintln!(" Error in file {}: {}", path.display(), error.message);
} else {
eprintln!(" Error: {}", error.message);
}
}
eprintln!(
"\nSchema loading failed with {} errors. Please fix the schemas and retry.",
result.errors.len()
);
std::process::exit(3);
}
if !result.warnings.is_empty() {
println!("\nWarnings:");
for warning in &result.warnings {
if let Some(path) = &warning.file_path {
println!(" Warning in {}: {}", path.display(), warning.message);
} else {
println!(" Warning: {}", warning.message);
}
}
}
if result.schemas_loaded > 0 || result.udts_loaded > 0 {
println!(
"\nSuccessfully loaded {} schemas and {} UDTs",
result.schemas_loaded, result.udts_loaded
);
}
let registry_read = schema_registry.read().await;
let registered_schemas = registry_read.list_schemas(None).await?;
if !registered_schemas.is_empty() {
println!("\nRegistered schemas:");
for schema in ®istered_schemas {
println!(
" {}.{} ({} columns)",
schema.keyspace,
schema.table,
schema.columns.len()
);
}
}
let udt_read = udt_registry.read().await;
let total_udts = udt_read.total_udts();
if total_udts > 0 {
println!("\nRegistered {} UDTs", total_udts);
}
println!("\nSchema loading completed successfully!");
Ok(())
}
#[allow(dead_code)]
async fn validate_schema(file_path: &Path) -> Result<()> {
println!("Validating schema: {}", file_path.display());
let extension = file_path
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("");
match extension.to_lowercase().as_str() {
"json" => validate_json_schema(file_path).await,
"cql" | "sql" => validate_cql_schema(file_path).await,
_ => {
let content = std::fs::read_to_string(file_path)
.with_context(|| format!("Failed to read schema file: {}", file_path.display()))?;
if content.trim_start().starts_with('{') {
println!("📝 Auto-detected JSON format");
validate_json_schema(file_path).await
} else if content.to_uppercase().contains("CREATE TABLE") {
println!("📝 Auto-detected CQL DDL format");
validate_cql_schema(file_path).await
} else {
println!("❌ Unable to determine file format. Supported formats:");
println!(" - .json files: JSON schema format");
println!(" - .cql/.sql files: CQL DDL format");
println!("\nExample JSON schema:");
println!(
"{{\n \"keyspace\": \"example\",\n \"table\": \"users\",\n \"partition_keys\": [{{\"name\": \"id\", \"type\": \"uuid\", \"position\": 0}}],\n \"clustering_keys\": [],\n \"columns\": [{{\"name\": \"id\", \"type\": \"uuid\", \"nullable\": false}}]\n}}"
);
println!("\nExample CQL DDL:");
println!(
"CREATE TABLE example.users (\n id uuid PRIMARY KEY,\n name text,\n email text\n);"
);
Err(anyhow::anyhow!("Unsupported file format"))
}
}
}
}
#[allow(dead_code)]
async fn validate_json_schema(json_path: &Path) -> Result<()> {
let schema_content = std::fs::read_to_string(json_path)
.with_context(|| format!("Failed to read JSON schema file: {}", json_path.display()))?;
match serde_json::from_str::<TableSchema>(&schema_content) {
Ok(schema) => {
println!("✅ JSON Schema validation successful!");
print_schema_details(&schema);
}
Err(e) => {
println!("❌ JSON Schema validation failed!");
println!("Error: {}", e);
if e.to_string().contains("missing field") {
println!("\n💡 Hint: Make sure all required fields are present:");
println!("- keyspace (string)");
println!("- table (string)");
println!("- partition_keys (array)");
println!("- clustering_keys (array)");
println!("- columns (array)");
} else if e.to_string().contains("unknown variant") {
println!("\n💡 Hint: Check that all data types are valid CQL types");
println!("Valid types: text, bigint, int, uuid, timestamp, etc.");
}
return Err(e.into());
}
}
Ok(())
}
#[allow(dead_code)]
async fn validate_cql_schema(cql_path: &Path) -> Result<()> {
let cql_content = std::fs::read_to_string(cql_path)
.with_context(|| format!("Failed to read CQL schema file: {}", cql_path.display()))?;
match parse_cql_schema(&cql_content) {
Ok(schema) => {
println!("✅ CQL DDL validation successful!");
print_schema_details(&schema);
}
Err(e) => {
println!("❌ CQL DDL validation failed!");
println!("Error: {}", e);
println!("\n💡 Hints for CQL DDL:");
println!("- Use CREATE TABLE keyspace.table_name syntax");
println!("- Define PRIMARY KEY explicitly");
println!("- Use valid CQL data types");
println!("\nExample:");
println!("CREATE TABLE example.users (");
println!(" id uuid PRIMARY KEY,");
println!(" name text,");
println!(" created_at timestamp");
println!(");");
return Err(e.into());
}
}
Ok(())
}
#[allow(dead_code)]
fn print_schema_details(schema: &TableSchema) {
println!("📋 Table: {}.{}", schema.keyspace, schema.table);
println!("📊 Columns: {}", schema.columns.len());
for (i, column) in schema.columns.iter().enumerate() {
let nullable_str = if column.nullable {
"nullable"
} else {
"not null"
};
println!(
" {}. {} ({}, {})",
i + 1,
column.name,
column.data_type,
nullable_str
);
}
if !schema.partition_keys.is_empty() {
let key_names: Vec<String> = schema
.partition_keys
.iter()
.map(|k| k.name.clone())
.collect();
println!("🔑 Partition keys: {}", key_names.join(", "));
}
if !schema.clustering_keys.is_empty() {
let clustering_names: Vec<String> = schema
.clustering_keys
.iter()
.map(|k| k.name.clone())
.collect();
println!("🔗 Clustering keys: {}", clustering_names.join(", "));
}
}
#[allow(dead_code)]
fn parse_cql_ddl(cql_content: &str) -> Result<TableSchema> {
let cql_content = cql_content.trim().to_uppercase();
let create_table_start = cql_content
.find("CREATE TABLE")
.ok_or_else(|| anyhow::anyhow!("No CREATE TABLE statement found"))?;
let table_part = &cql_content[create_table_start + 12..].trim();
let paren_start = table_part
.find('(')
.ok_or_else(|| anyhow::anyhow!("Missing opening parenthesis in CREATE TABLE"))?;
let table_name_part = &table_part[..paren_start].trim();
let (keyspace, table_name) = if let Some(dot_pos) = table_name_part.find('.') {
let keyspace = table_name_part[..dot_pos].trim().to_lowercase();
let table = table_name_part[dot_pos + 1..].trim().to_lowercase();
(keyspace, table)
} else {
("default".to_string(), table_name_part.trim().to_lowercase())
};
let mut paren_depth = 0;
let mut column_end = paren_start;
let table_chars: Vec<char> = table_part.chars().collect();
for (i, &ch) in table_chars.iter().enumerate().skip(paren_start) {
match ch {
'(' => paren_depth += 1,
')' => {
paren_depth -= 1;
if paren_depth == 0 {
column_end = i;
break;
}
}
_ => {}
}
}
if paren_depth != 0 {
return Err(anyhow::anyhow!("Unmatched parentheses in CREATE TABLE"));
}
let column_definitions = &table_part[paren_start + 1..column_end];
let (columns, partition_keys, clustering_keys) = parse_column_definitions(column_definitions)?;
let schema = TableSchema {
keyspace,
table: table_name,
partition_keys,
clustering_keys,
columns,
comments: HashMap::new(),
};
schema
.validate()
.with_context(|| "Generated schema validation failed")?;
Ok(schema)
}
#[allow(dead_code)]
fn parse_column_definitions(
definitions: &str,
) -> Result<(Vec<Column>, Vec<KeyColumn>, Vec<ClusteringColumn>)> {
let mut columns = Vec::new();
let mut partition_keys = Vec::new();
let mut clustering_keys = Vec::new();
let mut primary_key_found = false;
let column_parts = split_column_definitions(definitions)?;
for part in column_parts {
let part = part.trim();
if part.to_uppercase().starts_with("PRIMARY KEY") {
parse_primary_key_constraint(
part,
&columns,
&mut partition_keys,
&mut clustering_keys,
)?;
primary_key_found = true;
} else {
let column_parts: Vec<&str> = part.split_whitespace().collect();
if column_parts.len() < 2 {
return Err(anyhow::anyhow!("Invalid column definition: {}", part));
}
let column_name = column_parts[0].to_string();
let column_type = column_parts[1].to_string();
let is_primary_key = part.to_uppercase().contains("PRIMARY KEY");
let column = Column {
name: column_name.clone(),
data_type: column_type.clone(),
nullable: !is_primary_key, default: None,
is_static: false, };
columns.push(column);
if is_primary_key && !primary_key_found {
partition_keys.push(KeyColumn {
name: column_name,
data_type: column_type,
position: partition_keys.len(),
});
}
}
}
if partition_keys.is_empty() && !columns.is_empty() {
let first_col = &columns[0];
partition_keys.push(KeyColumn {
name: first_col.name.clone(),
data_type: first_col.data_type.clone(),
position: 0,
});
if let Some(col) = columns.get_mut(0) {
col.nullable = false;
}
}
Ok((columns, partition_keys, clustering_keys))
}
#[allow(dead_code)]
fn split_column_definitions(definitions: &str) -> Result<Vec<String>> {
let mut parts = Vec::new();
let mut current_part = String::new();
let mut paren_depth = 0;
let mut angle_depth = 0;
for ch in definitions.chars() {
match ch {
'(' => paren_depth += 1,
')' => paren_depth -= 1,
'<' => angle_depth += 1,
'>' => angle_depth -= 1,
',' if paren_depth == 0 && angle_depth == 0 => {
if !current_part.trim().is_empty() {
parts.push(current_part.trim().to_string());
}
current_part.clear();
continue;
}
_ => {}
}
current_part.push(ch);
}
if !current_part.trim().is_empty() {
parts.push(current_part.trim().to_string());
}
Ok(parts)
}
#[allow(dead_code)]
fn parse_primary_key_constraint(
constraint: &str,
columns: &[Column],
partition_keys: &mut Vec<KeyColumn>,
clustering_keys: &mut Vec<ClusteringColumn>,
) -> Result<()> {
let paren_start = constraint
.find('(')
.ok_or_else(|| anyhow::anyhow!("Missing opening parenthesis in PRIMARY KEY"))?;
let mut paren_depth = 0;
let mut paren_end = paren_start;
let constraint_chars: Vec<char> = constraint.chars().collect();
for (i, &ch) in constraint_chars.iter().enumerate().skip(paren_start) {
match ch {
'(' => paren_depth += 1,
')' => {
paren_depth -= 1;
if paren_depth == 0 {
paren_end = i;
break;
}
}
_ => {}
}
}
if paren_depth != 0 {
return Err(anyhow::anyhow!("Unmatched parentheses in PRIMARY KEY"));
}
let key_spec = &constraint[paren_start + 1..paren_end].trim();
if key_spec.trim_start().starts_with('(') && key_spec.contains("),") {
parse_composite_primary_key(key_spec, columns, partition_keys, clustering_keys)
} else {
let key_names: Vec<&str> = key_spec.split(',').map(|s| s.trim()).collect();
for (position, key_name) in key_names.iter().enumerate() {
let column = columns
.iter()
.find(|c| c.name == *key_name)
.ok_or_else(|| {
anyhow::anyhow!(
"Primary key column '{}' not found in column definitions",
key_name
)
})?;
partition_keys.push(KeyColumn {
name: column.name.clone(),
data_type: column.data_type.clone(),
position,
});
}
Ok(())
}
}
#[allow(dead_code)]
fn parse_composite_primary_key(
key_spec: &str,
columns: &[Column],
partition_keys: &mut Vec<KeyColumn>,
clustering_keys: &mut Vec<ClusteringColumn>,
) -> Result<()> {
let mut paren_depth = 0;
let mut partition_end = 0;
for (i, ch) in key_spec.char_indices() {
match ch {
'(' => paren_depth += 1,
')' => {
paren_depth -= 1;
if paren_depth == 0 {
partition_end = i;
break;
}
}
_ => {}
}
}
if partition_end == 0 {
return Err(anyhow::anyhow!("Invalid composite primary key format"));
}
let partition_spec = &key_spec[1..partition_end]; let partition_names: Vec<&str> = partition_spec.split(',').map(|s| s.trim()).collect();
for (position, key_name) in partition_names.iter().enumerate() {
let column = columns
.iter()
.find(|c| c.name == *key_name)
.ok_or_else(|| anyhow::anyhow!("Partition key column '{}' not found", key_name))?;
partition_keys.push(KeyColumn {
name: column.name.clone(),
data_type: column.data_type.clone(),
position,
});
}
let remaining = &key_spec[partition_end + 1..].trim();
if remaining.starts_with(',') {
let clustering_spec = &remaining[1..].trim(); let clustering_names: Vec<&str> = clustering_spec.split(',').map(|s| s.trim()).collect();
for (position, key_name) in clustering_names.iter().enumerate() {
if key_name.is_empty() {
continue;
}
let column = columns
.iter()
.find(|c| c.name == *key_name)
.ok_or_else(|| anyhow::anyhow!("Clustering key column '{}' not found", key_name))?;
clustering_keys.push(ClusteringColumn {
name: column.name.clone(),
data_type: column.data_type.clone(),
position,
order: ClusteringOrder::Asc, });
}
}
Ok(())
}