use crate::cql::{CqlCreateTable, CqlDataType};
use crate::error::{Error, Result};
use crate::parser::types::CqlTypeId;
use crate::schema::{ClusteringColumn, Column, KeyColumn, TableSchema};
use nom::{
branch::alt,
bytes::complete::{tag_no_case, take_while, take_while1},
character::complete::char,
combinator::{map, opt},
multi::{separated_list0, separated_list1},
sequence::{delimited, preceded, separated_pair, tuple},
IResult,
};
use serde_json;
use std::collections::HashMap;
fn keyword(s: &str) -> impl Fn(&str) -> IResult<&str, &str> + '_ {
move |input| tag_no_case(s)(input)
}
fn ws(input: &str) -> IResult<&str, &str> {
take_while(|c: char| c.is_whitespace())(input)
}
fn ws1(input: &str) -> IResult<&str, &str> {
take_while1(|c: char| c.is_whitespace())(input)
}
fn identifier(input: &str) -> IResult<&str, String> {
let (input, name) = alt((
delimited(char('"'), take_while1(|c: char| c != '"'), char('"')),
take_while1(|c: char| c.is_alphanumeric() || c == '_'),
))(input)?;
Ok((input, name.to_string()))
}
fn qualified_table_name(input: &str) -> IResult<&str, (Option<String>, String)> {
let (input, first) = identifier(input)?;
let (input, second) = opt(preceded(char('.'), identifier))(input)?;
match second {
Some(table) => Ok((input, (Some(first), table))),
None => Ok((input, (None, first))),
}
}
fn cql_type(input: &str) -> IResult<&str, String> {
fn parse_type_inner(input: &str) -> IResult<&str, String> {
let (input, base) = alt((
map(
tuple((
alt((keyword("list"), keyword("set"))),
char('<'),
parse_type_inner,
char('>'),
)),
|(collection, _, inner, _)| format!("{}<{}>", collection, inner),
),
map(
tuple((
keyword("map"),
char('<'),
parse_type_inner,
char(','),
ws,
parse_type_inner,
char('>'),
)),
|(_, _, key_type, _, _, value_type, _)| {
format!("map<{}, {}>", key_type, value_type)
},
),
map(
tuple((
keyword("tuple"),
char('<'),
separated_list1(tuple((ws, char(','), ws)), parse_type_inner),
char('>'),
)),
|(_, _, types, _)| format!("tuple<{}>", types.join(", ")),
),
map(
tuple((keyword("frozen"), char('<'), parse_type_inner, char('>'))),
|(_, _, inner, _)| format!("frozen<{}>", inner),
),
map(identifier, |name| name),
))(input)?;
Ok((input, base))
}
let (input, _) = ws(input)?;
let (input, type_name) = parse_type_inner(input)?;
let (input, _) = ws(input)?;
Ok((input, type_name))
}
fn column_definition(input: &str) -> IResult<&str, (String, String, bool)> {
let (input, _) = ws(input)?;
let (input, name) = identifier(input)?;
let (input, _) = ws1(input)?;
let (input, data_type) = cql_type(input)?;
let (input, _) = ws(input)?;
let (input, is_static) = opt(keyword("static"))(input)?;
let is_static = is_static.is_some();
let (input, _) = ws(input)?;
let (input, _is_primary) = opt(tuple((keyword("primary"), ws1, keyword("key"))))(input)?;
Ok((input, (name, data_type, is_static)))
}
fn primary_key_spec(input: &str) -> IResult<&str, (Vec<String>, Vec<String>)> {
let (input, _) = ws(input)?;
let (input, _) = keyword("primary")(input)?;
let (input, _) = ws1(input)?;
let (input, _) = keyword("key")(input)?;
let (input, _) = ws(input)?;
let (input, _) = char('(')(input)?;
let (input, _) = ws(input)?;
let (input, partition_keys) = alt((
map(
tuple((
char('('),
ws,
separated_list1(tuple((ws, char(','), ws)), identifier),
ws,
char(')'),
)),
|(_, _, keys, _, _)| keys,
),
map(identifier, |key| vec![key]),
))(input)?;
let (input, _) = ws(input)?;
let (input, clustering_keys) = opt(preceded(
tuple((char(','), ws)),
separated_list1(tuple((ws, char(','), ws)), identifier),
))(input)?;
let (input, _) = ws(input)?;
let (input, _) = char(')')(input)?;
Ok((input, (partition_keys, clustering_keys.unwrap_or_default())))
}
fn table_options(input: &str) -> IResult<&str, HashMap<String, String>> {
let (input, _) = ws(input)?;
let (input, _) = keyword("with")(input)?;
let (input, _) = ws1(input)?;
let option_pair = map(
separated_pair(
identifier,
tuple((ws, char('='), ws)),
alt((
delimited(char('\''), take_while(|c: char| c != '\''), char('\'')),
take_while1(|c: char| c.is_alphanumeric() || c == '_' || c == '.'),
)),
),
|(key, value)| (key, value.to_string()),
);
let (input, options) = separated_list0(tuple((ws, keyword("and"), ws)), option_pair)(input)?;
Ok((input, options.into_iter().collect()))
}
pub fn split_cql_statements(input: &str) -> Vec<String> {
let mut statements = Vec::new();
let mut current_statement = String::new();
let mut in_string = false;
let mut in_single_line_comment = false;
let mut in_multi_line_comment = false;
let mut escape_next = false;
let chars: Vec<char> = input.chars().collect();
let mut i = 0;
while i < chars.len() {
let c = chars[i];
if escape_next {
current_statement.push(c);
escape_next = false;
i += 1;
continue;
}
if !in_string
&& !in_single_line_comment
&& !in_multi_line_comment
&& i + 1 < chars.len()
&& c == '/'
&& chars[i + 1] == '*'
{
in_multi_line_comment = true;
current_statement.push(c);
current_statement.push(chars[i + 1]);
i += 2;
continue;
}
if in_multi_line_comment && i + 1 < chars.len() && c == '*' && chars[i + 1] == '/' {
in_multi_line_comment = false;
current_statement.push(c);
current_statement.push(chars[i + 1]);
i += 2;
continue;
}
if !in_string
&& !in_multi_line_comment
&& !in_single_line_comment
&& i + 1 < chars.len()
&& c == '-'
&& chars[i + 1] == '-'
{
in_single_line_comment = true;
current_statement.push(c);
current_statement.push(chars[i + 1]);
i += 2;
continue;
}
if c == '\n' {
in_single_line_comment = false;
current_statement.push(c);
i += 1;
continue;
}
if in_single_line_comment || in_multi_line_comment {
current_statement.push(c);
i += 1;
continue;
}
if c == '\'' {
in_string = !in_string;
current_statement.push(c);
i += 1;
continue;
}
if in_string && c == '\\' {
escape_next = true;
current_statement.push(c);
i += 1;
continue;
}
if !in_string && c == ';' {
let trimmed = current_statement.trim();
if !trimmed.is_empty() {
statements.push(trimmed.to_string());
}
current_statement.clear();
i += 1;
continue;
}
current_statement.push(c);
i += 1;
}
let trimmed = current_statement.trim();
if !trimmed.is_empty() {
statements.push(trimmed.to_string());
}
statements
.into_iter()
.map(|stmt| strip_leading_trailing_comments(&stmt))
.filter(|s| !s.is_empty())
.collect()
}
fn strip_leading_trailing_comments(stmt: &str) -> String {
let lines: Vec<&str> = stmt.lines().collect();
let mut start = 0;
let mut end = lines.len();
for (i, line) in lines.iter().enumerate() {
let trimmed = line.trim();
if !trimmed.is_empty() && !trimmed.starts_with("--") && !trimmed.starts_with("/*") {
start = i;
break;
}
}
for (i, line) in lines.iter().enumerate().rev() {
let trimmed = line.trim();
if !trimmed.is_empty() && !trimmed.starts_with("--") && !trimmed.ends_with("*/") {
end = i + 1;
break;
}
}
if start >= end {
return String::new();
}
lines[start..end].join("\n")
}
#[cfg(test)]
mod tests_splitter {
use super::*;
#[test]
fn test_split_with_comments() {
let cql = r#"
-- Comment
CREATE TYPE test.udt (field text);
/* Multi-line
comment */
CREATE TABLE test.tbl (id int PRIMARY KEY);
"#;
let stmts = split_cql_statements(cql);
assert_eq!(stmts.len(), 2);
assert!(stmts[0].contains("CREATE TYPE"));
assert!(!stmts[0].contains("--"));
assert!(stmts[1].contains("CREATE TABLE"));
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum StatementType {
CreateTable,
CreateType,
Other(String),
}
pub fn classify_statement(statement: &str) -> StatementType {
let normalized = statement.trim().to_lowercase();
let normalized = normalized
.lines()
.map(|line| {
if let Some(pos) = line.find("--") {
&line[..pos]
} else {
line
}
})
.collect::<Vec<&str>>()
.join(" ");
let normalized = normalized.trim();
if normalized.starts_with("create table")
|| normalized.starts_with("create table if not exists")
{
StatementType::CreateTable
} else if normalized.starts_with("create type")
|| normalized.starts_with("create type if not exists")
{
StatementType::CreateType
} else {
StatementType::Other(
normalized
.split_whitespace()
.next()
.unwrap_or("unknown")
.to_string(),
)
}
}
#[allow(clippy::type_complexity)]
pub fn parse_create_type(
input: &str,
) -> IResult<&str, (String, Option<String>, Vec<(String, String)>)> {
let (input, _) = ws(input)?;
let (input, _) = keyword("create")(input)?;
let (input, _) = ws1(input)?;
let (input, _) = keyword("type")(input)?;
let (input, _) = ws1(input)?;
let (input, _) = opt(tuple((
keyword("if"),
ws1,
keyword("not"),
ws1,
keyword("exists"),
ws1,
)))(input)?;
let (input, (keyspace, type_name)) = qualified_table_name(input)?;
let (input, _) = ws(input)?;
let (input, _) = char('(')(input)?;
let (input, _) = ws(input)?;
let (input, fields) = separated_list1(
tuple((ws, char(','), ws)),
map(
tuple((identifier, ws1, cql_type)),
|(name, _, field_type)| (name, field_type),
),
)(input)?;
let (input, _) = ws(input)?;
let (input, _) = char(')')(input)?;
Ok((input, (type_name, keyspace, fields)))
}
pub fn parse_create_table(input: &str) -> IResult<&str, TableSchema> {
let (input, _) = ws(input)?;
let (input, _) = keyword("create")(input)?;
let (input, _) = ws1(input)?;
let (input, _) = keyword("table")(input)?;
let (input, _) = ws1(input)?;
let (input, _) = opt(tuple((
keyword("if"),
ws1,
keyword("not"),
ws1,
keyword("exists"),
ws1,
)))(input)?;
let (input, (keyspace, table_name)) = qualified_table_name(input)?;
let (input, _) = ws(input)?;
let (input, _) = char('(')(input)?;
let (input, _) = ws(input)?;
let mut columns: Vec<(String, String, bool)> = Vec::new();
let mut partition_keys = Vec::new();
let mut clustering_keys = Vec::new();
let mut primary_key_found = false;
let (input, items) = separated_list1(
tuple((ws, char(','), ws)),
alt((
map(primary_key_spec, |keys| {
(
"PRIMARY_KEY".to_string(),
serde_json::to_string(&keys).unwrap_or_default(),
false, )
}),
column_definition,
)),
)(input)?;
for (name, value, is_static) in items {
if name == "PRIMARY_KEY" {
if let Ok(keys_tuple) = serde_json::from_str::<(Vec<String>, Vec<String>)>(&value) {
partition_keys = keys_tuple.0;
clustering_keys = keys_tuple.1;
primary_key_found = true;
}
continue;
}
columns.push((name, value, is_static));
}
let (input, _) = ws(input)?;
let (input, _) = char(')')(input)?;
let (input, _options) = opt(table_options)(input)?;
if !primary_key_found && !columns.is_empty() {
let mut found_inline = false;
for (col_name, col_type, _is_static) in &columns {
if col_type.to_lowercase().contains("primary key") {
partition_keys.push(col_name.clone());
found_inline = true;
break;
}
}
if !found_inline {
partition_keys.push(columns[0].0.clone());
}
}
let schema = TableSchema {
keyspace: keyspace.unwrap_or_else(|| "default".to_string()),
table: table_name,
partition_keys: partition_keys
.into_iter()
.enumerate()
.map(|(pos, name)| {
let data_type = columns
.iter()
.find(|(col_name, _, _)| col_name == &name)
.map(|(_, dt, _)| dt.clone())
.unwrap_or_else(|| "text".to_string());
KeyColumn {
name,
data_type,
position: pos,
}
})
.collect(),
clustering_keys: clustering_keys
.into_iter()
.enumerate()
.map(|(pos, name)| {
let data_type = columns
.iter()
.find(|(col_name, _, _)| col_name == &name)
.map(|(_, dt, _)| dt.clone())
.unwrap_or_else(|| "text".to_string());
ClusteringColumn {
name,
data_type,
position: pos,
order: crate::schema::ClusteringOrder::Asc,
}
})
.collect(),
columns: columns
.into_iter()
.map(|(name, data_type_with_constraints, is_static)| {
let data_type = if data_type_with_constraints
.to_lowercase()
.contains("primary key")
{
data_type_with_constraints
.to_lowercase()
.replace("primary key", "")
.trim()
.to_string()
} else {
data_type_with_constraints
};
Column {
name,
data_type,
nullable: true,
default: None,
is_static,
}
})
.collect(),
comments: HashMap::new(),
};
Ok((input, schema))
}
pub fn cql_type_to_type_id(cql_type: &str) -> Result<CqlTypeId> {
let type_lower = cql_type.trim().to_lowercase();
if type_lower.starts_with("list<") {
return Ok(CqlTypeId::List);
}
if type_lower.starts_with("set<") {
return Ok(CqlTypeId::Set);
}
if type_lower.starts_with("map<") {
return Ok(CqlTypeId::Map);
}
if type_lower.starts_with("tuple<") {
return Ok(CqlTypeId::Tuple);
}
if type_lower.starts_with("frozen<") {
if let Some(inner_start) = type_lower.find('<') {
if let Some(inner_end) = type_lower.rfind('>') {
let inner_type = &type_lower[inner_start + 1..inner_end];
return cql_type_to_type_id(inner_type);
}
}
}
match type_lower.as_str() {
"ascii" => Ok(CqlTypeId::Ascii),
"bigint" | "long" => Ok(CqlTypeId::BigInt),
"blob" => Ok(CqlTypeId::Blob),
"boolean" | "bool" => Ok(CqlTypeId::Boolean),
"counter" => Ok(CqlTypeId::Counter),
"decimal" => Ok(CqlTypeId::Decimal),
"double" => Ok(CqlTypeId::Double),
"float" => Ok(CqlTypeId::Float),
"int" | "integer" => Ok(CqlTypeId::Int),
"timestamp" => Ok(CqlTypeId::Timestamp),
"uuid" => Ok(CqlTypeId::Uuid),
"varchar" | "text" => Ok(CqlTypeId::Varchar),
"varint" => Ok(CqlTypeId::Varint),
"timeuuid" => Ok(CqlTypeId::Timeuuid),
"inet" => Ok(CqlTypeId::Inet),
"date" => Ok(CqlTypeId::Date),
"time" => Ok(CqlTypeId::Time),
"smallint" => Ok(CqlTypeId::Smallint),
"tinyint" => Ok(CqlTypeId::Tinyint),
"duration" => Ok(CqlTypeId::Duration),
_ => {
Ok(CqlTypeId::Udt)
}
}
}
pub fn extract_table_name(cql: &str) -> Result<(Option<String>, String)> {
match parse_create_table(cql) {
Ok((_, schema)) => {
let keyspace = if schema.keyspace == "default" {
None
} else {
Some(schema.keyspace)
};
Ok((keyspace, schema.table))
}
Err(_) => {
let cql_lower = cql.to_lowercase();
if let Some(table_start) = cql_lower.find("create table") {
let after_table = &cql[table_start + 12..];
if let Some(if_not_exists) = after_table.find("if not exists") {
let after_if = &after_table[if_not_exists + 13..];
return extract_simple_table_name(after_if);
}
return extract_simple_table_name(after_table);
}
Err(Error::schema(
"Failed to extract table name from CQL".to_string(),
))
}
}
}
fn extract_simple_table_name(input: &str) -> Result<(Option<String>, String)> {
let trimmed = input.trim();
let words: Vec<&str> = trimmed.split_whitespace().collect();
if words.is_empty() {
return Err(Error::schema("No table name found".to_string()));
}
let table_name = words[0];
if let Some(dot_pos) = table_name.find('.') {
let keyspace = &table_name[..dot_pos];
let table = &table_name[dot_pos + 1..];
Ok((Some(keyspace.to_string()), table.to_string()))
} else {
Ok((None, table_name.to_string()))
}
}
pub fn table_name_matches(
schema_keyspace: &Option<String>,
schema_table: &str,
target_keyspace: &Option<String>,
target_table: &str,
) -> bool {
if schema_table != target_table {
return false;
}
if target_keyspace.is_none() {
return true;
}
schema_keyspace == target_keyspace
}
pub fn parse_cql_schema(cql: &str) -> Result<TableSchema> {
match parse_create_table(cql) {
Ok((_, schema)) => {
schema.validate()?;
Ok(schema)
}
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(Error::schema(format!(
"Failed to parse CQL schema: {:?}",
e
))),
Err(nom::Err::Incomplete(_)) => Err(Error::schema("Incomplete CQL schema".to_string())),
}
}
pub fn parse_cql_schema_with_visitor(cql: &str) -> Result<TableSchema> {
use crate::cql::traits::CqlVisitor;
use crate::cql::visitor::SchemaBuilderVisitor;
use crate::cql::CqlStatement;
let schema = parse_cql_schema(cql)?;
let ast = table_schema_to_ast(&schema)?;
let statement = CqlStatement::CreateTable(ast);
let mut visitor = SchemaBuilderVisitor;
visitor.visit_statement(&statement)
}
fn table_schema_to_ast(schema: &TableSchema) -> Result<CqlCreateTable> {
use crate::cql::{
CqlColumnDef, CqlCreateTable, CqlIdentifier, CqlPrimaryKey, CqlTable, CqlTableOptions,
};
let table = if schema.keyspace == "default" {
CqlTable::new(&schema.table)
} else {
CqlTable::with_keyspace(&schema.keyspace, &schema.table)
};
let columns: Result<Vec<CqlColumnDef>> = schema
.columns
.iter()
.map(|col| {
Ok(CqlColumnDef {
name: CqlIdentifier::new(&col.name),
data_type: string_to_cql_data_type(&col.data_type)?,
is_static: col.is_static,
})
})
.collect();
let columns = columns?;
let partition_key: Vec<CqlIdentifier> = schema
.partition_keys
.iter()
.map(|pk| CqlIdentifier::new(&pk.name))
.collect();
let clustering_key: Vec<CqlIdentifier> = schema
.clustering_keys
.iter()
.map(|ck| CqlIdentifier::new(&ck.name))
.collect();
Ok(CqlCreateTable {
if_not_exists: false,
table,
columns,
primary_key: CqlPrimaryKey {
partition_key,
clustering_key,
},
options: CqlTableOptions {
options: HashMap::new(),
},
})
}
fn string_to_cql_data_type(type_str: &str) -> Result<CqlDataType> {
use crate::cql::{CqlDataType, CqlIdentifier};
let type_lower = type_str.trim().to_lowercase();
if type_lower.starts_with("list<") && type_lower.ends_with('>') {
let inner_type_str = &type_lower[5..type_lower.len() - 1];
let inner_type = string_to_cql_data_type(inner_type_str)?;
return Ok(CqlDataType::List(Box::new(inner_type)));
}
if type_lower.starts_with("set<") && type_lower.ends_with('>') {
let inner_type_str = &type_lower[4..type_lower.len() - 1];
let inner_type = string_to_cql_data_type(inner_type_str)?;
return Ok(CqlDataType::Set(Box::new(inner_type)));
}
if type_lower.starts_with("map<") && type_lower.ends_with('>') {
let inner = &type_lower[4..type_lower.len() - 1];
if let Some(comma_pos) = inner.find(',') {
let key_type_str = inner[..comma_pos].trim();
let value_type_str = inner[comma_pos + 1..].trim();
let key_type = string_to_cql_data_type(key_type_str)?;
let value_type = string_to_cql_data_type(value_type_str)?;
return Ok(CqlDataType::Map(Box::new(key_type), Box::new(value_type)));
}
}
if type_lower.starts_with("frozen<") && type_lower.ends_with('>') {
let inner_type_str = &type_lower[7..type_lower.len() - 1];
let inner_type = string_to_cql_data_type(inner_type_str)?;
return Ok(CqlDataType::Frozen(Box::new(inner_type)));
}
match type_lower.as_str() {
"boolean" | "bool" => Ok(CqlDataType::Boolean),
"tinyint" => Ok(CqlDataType::TinyInt),
"smallint" => Ok(CqlDataType::SmallInt),
"int" => Ok(CqlDataType::Int),
"bigint" | "long" => Ok(CqlDataType::BigInt),
"varint" => Ok(CqlDataType::Varint),
"decimal" => Ok(CqlDataType::Decimal),
"float" => Ok(CqlDataType::Float),
"double" => Ok(CqlDataType::Double),
"text" | "varchar" => Ok(CqlDataType::Text),
"ascii" => Ok(CqlDataType::Ascii),
"blob" => Ok(CqlDataType::Blob),
"timestamp" => Ok(CqlDataType::Timestamp),
"date" => Ok(CqlDataType::Date),
"time" => Ok(CqlDataType::Time),
"uuid" => Ok(CqlDataType::Uuid),
"timeuuid" => Ok(CqlDataType::TimeUuid),
"inet" => Ok(CqlDataType::Inet),
"duration" => Ok(CqlDataType::Duration),
"counter" => Ok(CqlDataType::Counter),
_ => {
Ok(CqlDataType::Udt(CqlIdentifier::new(type_str)))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_table_parsing() {
let cql = r#"
CREATE TABLE users (
id uuid PRIMARY KEY,
name text,
email text
)
"#;
let schema = parse_cql_schema(cql).unwrap();
assert_eq!(schema.table, "users");
assert_eq!(schema.columns.len(), 3);
assert_eq!(schema.partition_keys.len(), 1);
assert_eq!(schema.partition_keys[0].name, "id");
}
#[test]
fn test_qualified_table_name() {
let cql = r#"
CREATE TABLE myapp.users (
id bigint PRIMARY KEY,
name text
)
"#;
let schema = parse_cql_schema(cql).unwrap();
assert_eq!(schema.keyspace, "myapp");
assert_eq!(schema.table, "users");
}
#[test]
fn test_complex_types() {
let cql = r#"
CREATE TABLE complex_table (
id uuid PRIMARY KEY,
tags set<text>,
metadata map<text, text>,
coordinates list<double>
)
"#;
let schema = parse_cql_schema(cql).unwrap();
assert_eq!(schema.columns.len(), 4);
let tags_col = schema.columns.iter().find(|c| c.name == "tags").unwrap();
assert_eq!(tags_col.data_type, "set<text>");
let metadata_col = schema
.columns
.iter()
.find(|c| c.name == "metadata")
.unwrap();
assert_eq!(metadata_col.data_type, "map<text, text>");
}
#[test]
fn test_table_name_extraction() {
let cql = "CREATE TABLE IF NOT EXISTS myapp.users (id uuid PRIMARY KEY)";
let (keyspace, table) = extract_table_name(cql).unwrap();
assert_eq!(keyspace, Some("myapp".to_string()));
assert_eq!(table, "users");
}
#[test]
fn test_cql_type_conversion() {
assert_eq!(cql_type_to_type_id("text").unwrap(), CqlTypeId::Varchar);
assert_eq!(cql_type_to_type_id("bigint").unwrap(), CqlTypeId::BigInt);
assert_eq!(cql_type_to_type_id("list<text>").unwrap(), CqlTypeId::List);
assert_eq!(
cql_type_to_type_id("frozen<set<uuid>>").unwrap(),
CqlTypeId::Set
);
}
#[test]
fn test_table_name_matching() {
assert!(table_name_matches(
&Some("ks".to_string()),
"users",
&Some("ks".to_string()),
"users"
));
assert!(table_name_matches(
&Some("ks".to_string()),
"users",
&None,
"users"
));
assert!(!table_name_matches(
&Some("ks".to_string()),
"users",
&Some("ks".to_string()),
"orders"
));
assert!(!table_name_matches(
&Some("ks1".to_string()),
"users",
&Some("ks2".to_string()),
"users"
));
}
#[test]
fn test_composite_primary_key() {
let cql = r#"
CREATE TABLE time_series (
partition_key text,
clustering_key timestamp,
value double,
PRIMARY KEY (partition_key, clustering_key)
)
"#;
let schema = parse_cql_schema(cql).unwrap();
assert_eq!(schema.partition_keys.len(), 1);
assert_eq!(schema.clustering_keys.len(), 1);
assert_eq!(schema.partition_keys[0].name, "partition_key");
assert_eq!(schema.clustering_keys[0].name, "clustering_key");
}
#[test]
fn test_frozen_collections() {
let cql = r#"
CREATE TABLE frozen_test (
id uuid PRIMARY KEY,
frozen_set frozen<set<text>>,
frozen_map frozen<map<text, bigint>>,
nested_frozen frozen<list<frozen<set<uuid>>>>
)
"#;
let schema = parse_cql_schema(cql).unwrap();
let frozen_set = schema
.columns
.iter()
.find(|c| c.name == "frozen_set")
.unwrap();
assert_eq!(frozen_set.data_type, "frozen<set<text>>");
let frozen_map = schema
.columns
.iter()
.find(|c| c.name == "frozen_map")
.unwrap();
assert_eq!(frozen_map.data_type, "frozen<map<text, bigint>>");
let nested = schema
.columns
.iter()
.find(|c| c.name == "nested_frozen")
.unwrap();
assert_eq!(nested.data_type, "frozen<list<frozen<set<uuid>>>>");
}
#[test]
fn test_udt_columns() {
let cql = r#"
CREATE TABLE user_profiles (
user_id uuid PRIMARY KEY,
address address_type,
preferences frozen<user_prefs>
)
"#;
let schema = parse_cql_schema(cql).unwrap();
let address_col = schema.columns.iter().find(|c| c.name == "address").unwrap();
assert_eq!(address_col.data_type, "address_type");
let prefs_col = schema
.columns
.iter()
.find(|c| c.name == "preferences")
.unwrap();
assert_eq!(prefs_col.data_type, "frozen<user_prefs>");
}
#[test]
fn test_tuple_types() {
let cql = r#"
CREATE TABLE tuple_test (
id uuid PRIMARY KEY,
coordinates tuple<double, double>,
person_info tuple<text, int, boolean>
)
"#;
let schema = parse_cql_schema(cql).unwrap();
let coords = schema
.columns
.iter()
.find(|c| c.name == "coordinates")
.unwrap();
assert_eq!(coords.data_type, "tuple<double, double>");
let person = schema
.columns
.iter()
.find(|c| c.name == "person_info")
.unwrap();
assert_eq!(person.data_type, "tuple<text, int, boolean>");
}
#[test]
fn test_case_insensitive_keywords() {
let cql = r#"
create table Users (
ID UUID primary key,
Name TEXT,
Email VARCHAR
)
"#;
let schema = parse_cql_schema(cql).unwrap();
assert_eq!(schema.table, "Users");
assert_eq!(schema.columns.len(), 3);
}
#[test]
fn test_quoted_identifiers() {
let cql = r#"
CREATE TABLE "CaseSensitive" (
"Id" uuid PRIMARY KEY,
"Name With Spaces" text
)
"#;
let schema = parse_cql_schema(cql).unwrap();
assert_eq!(schema.table, "CaseSensitive");
let space_col = schema.columns.iter().find(|c| c.name == "Name With Spaces");
assert!(space_col.is_some());
}
#[test]
fn test_fallback_table_extraction() {
let cql = "CREATE TABLE myapp.orders (id bigint PRIMARY KEY)";
let (keyspace, table) = extract_table_name(cql).unwrap();
assert_eq!(keyspace, Some("myapp".to_string()));
assert_eq!(table, "orders");
}
#[test]
fn test_all_primitive_types() {
let type_mappings = vec![
("ascii", CqlTypeId::Ascii),
("bigint", CqlTypeId::BigInt),
("blob", CqlTypeId::Blob),
("boolean", CqlTypeId::Boolean),
("counter", CqlTypeId::Counter),
("decimal", CqlTypeId::Decimal),
("double", CqlTypeId::Double),
("float", CqlTypeId::Float),
("int", CqlTypeId::Int),
("timestamp", CqlTypeId::Timestamp),
("uuid", CqlTypeId::Uuid),
("varchar", CqlTypeId::Varchar),
("text", CqlTypeId::Varchar),
("varint", CqlTypeId::Varint),
("timeuuid", CqlTypeId::Timeuuid),
("inet", CqlTypeId::Inet),
("date", CqlTypeId::Date),
("time", CqlTypeId::Time),
("smallint", CqlTypeId::Smallint),
("tinyint", CqlTypeId::Tinyint),
("duration", CqlTypeId::Duration),
];
for (cql_type, expected_id) in type_mappings {
assert_eq!(
cql_type_to_type_id(cql_type).unwrap(),
expected_id,
"Failed for type: {}",
cql_type
);
}
}
}