1use std::collections::HashSet;
2
3use crate::ast::CreateTableQuery;
4use reddb_types::types::{DataType, SqlTypeName};
5
6#[derive(Debug, Clone)]
7pub enum AnalysisError {
8 DuplicateColumn(String),
9 UnsupportedType(String),
10}
11
12impl std::fmt::Display for AnalysisError {
13 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
14 match self {
15 Self::DuplicateColumn(name) => write!(f, "duplicate column name: {name}"),
16 Self::UnsupportedType(name) => write!(f, "unsupported SQL type: {name}"),
17 }
18 }
19}
20
21impl std::error::Error for AnalysisError {}
22
23#[derive(Debug, Clone)]
24pub struct AnalyzedCreateTableQuery {
25 pub name: String,
26 pub columns: Vec<AnalyzedColumnDef>,
27 pub if_not_exists: bool,
28 pub default_ttl_ms: Option<u64>,
29 pub context_index_fields: Vec<String>,
30 pub timestamps: bool,
31}
32
33#[derive(Debug, Clone)]
34pub struct AnalyzedColumnDef {
35 pub name: String,
36 pub declared_type: SqlTypeName,
37 pub storage_type: DataType,
38 pub not_null: bool,
39 pub default: Option<String>,
40 pub primary_key: bool,
41 pub unique: bool,
42}
43
44pub fn analyze_create_table(
45 query: &CreateTableQuery,
46) -> Result<AnalyzedCreateTableQuery, AnalysisError> {
47 let mut seen = HashSet::new();
48 let mut columns = Vec::with_capacity(query.columns.len());
49
50 for column in &query.columns {
51 if !seen.insert(column.name.to_ascii_lowercase()) {
52 return Err(AnalysisError::DuplicateColumn(column.name.clone()));
53 }
54
55 columns.push(AnalyzedColumnDef {
56 name: column.name.clone(),
57 declared_type: column.sql_type.clone(),
58 storage_type: resolve_sql_type_name(&column.sql_type)?,
59 not_null: column.not_null,
60 default: column.default.clone(),
61 primary_key: column.primary_key,
62 unique: column.unique,
63 });
64 }
65
66 Ok(AnalyzedCreateTableQuery {
67 name: query.name.clone(),
68 columns,
69 if_not_exists: query.if_not_exists,
70 default_ttl_ms: query.default_ttl_ms,
71 context_index_fields: query.context_index_fields.clone(),
72 timestamps: query.timestamps,
73 })
74}
75
76pub fn resolve_declared_data_type(declared: &str) -> Result<DataType, AnalysisError> {
77 resolve_sql_type_name(&SqlTypeName::parse_declared(declared))
78}
79
80pub fn resolve_sql_type_name(sql_type: &SqlTypeName) -> Result<DataType, AnalysisError> {
81 DataType::from_sql_type_name(sql_type)
82 .ok_or_else(|| AnalysisError::UnsupportedType(sql_type.base_name()))
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use crate::ast::{CreateColumnDef, CreateTableQuery};
89 use reddb_types::catalog::CollectionModel;
90
91 fn column(name: &str, declared: &str) -> CreateColumnDef {
92 CreateColumnDef {
93 name: name.to_string(),
94 data_type: declared.to_string(),
95 sql_type: SqlTypeName::parse_declared(declared),
96 not_null: false,
97 default: None,
98 compress: None,
99 unique: false,
100 primary_key: false,
101 enum_variants: Vec::new(),
102 array_element: None,
103 decimal_precision: None,
104 }
105 }
106
107 fn create_table(columns: Vec<CreateColumnDef>) -> CreateTableQuery {
108 CreateTableQuery {
109 collection_model: CollectionModel::Table,
110 name: "orders".to_string(),
111 columns,
112 if_not_exists: true,
113 default_ttl_ms: Some(60_000),
114 metrics_rollup_policies: Vec::new(),
115 context_index_fields: vec!["description".to_string()],
116 context_index_enabled: true,
117 timestamps: true,
118 partition_by: None,
119 tenant_by: None,
120 append_only: false,
121 subscriptions: Vec::new(),
122 analytics_config: Vec::new(),
123 vault_own_master_key: false,
124 }
125 }
126
127 #[test]
128 fn analyze_create_table_resolves_columns_and_preserves_options() {
129 let mut id = column("id", "INTEGER");
130 id.primary_key = true;
131 id.not_null = true;
132
133 let mut description = column("description", "VARCHAR");
134 description.default = Some("'new'".to_string());
135 description.unique = true;
136
137 let query = create_table(vec![id, description]);
138 let analyzed = analyze_create_table(&query).unwrap();
139
140 assert_eq!(analyzed.name, "orders");
141 assert!(analyzed.if_not_exists);
142 assert_eq!(analyzed.default_ttl_ms, Some(60_000));
143 assert_eq!(analyzed.context_index_fields, ["description"]);
144 assert!(analyzed.timestamps);
145 assert_eq!(analyzed.columns.len(), 2);
146 assert_eq!(analyzed.columns[0].name, "id");
147 assert_eq!(analyzed.columns[0].storage_type, DataType::Integer);
148 assert!(analyzed.columns[0].not_null);
149 assert!(analyzed.columns[0].primary_key);
150 assert_eq!(analyzed.columns[1].declared_type.base_name(), "VARCHAR");
151 assert_eq!(analyzed.columns[1].storage_type, DataType::Text);
152 assert_eq!(analyzed.columns[1].default.as_deref(), Some("'new'"));
153 assert!(analyzed.columns[1].unique);
154 }
155
156 #[test]
157 fn duplicate_columns_are_case_insensitive() {
158 let query = create_table(vec![column("Id", "INT"), column("id", "INT")]);
159 let err = analyze_create_table(&query).unwrap_err();
160
161 assert!(matches!(err, AnalysisError::DuplicateColumn(ref name) if name == "id"));
162 assert_eq!(err.to_string(), "duplicate column name: id");
163 }
164
165 #[test]
166 fn unsupported_type_is_reported_with_normalized_name() {
167 let query = create_table(vec![column("mystery", "not_a_real_type")]);
168 let err = analyze_create_table(&query).unwrap_err();
169
170 assert!(
171 matches!(err, AnalysisError::UnsupportedType(ref name) if name == "NOT_A_REAL_TYPE")
172 );
173 assert_eq!(err.to_string(), "unsupported SQL type: NOT_A_REAL_TYPE");
174 }
175
176 #[test]
177 fn resolve_declared_data_type_accepts_sql_aliases() {
178 assert_eq!(
179 resolve_declared_data_type("varchar").unwrap(),
180 DataType::Text
181 );
182 assert_eq!(
183 resolve_declared_data_type("numeric(10)").unwrap(),
184 DataType::Decimal
185 );
186 assert_eq!(
187 resolve_declared_data_type("timestamptz").unwrap(),
188 DataType::TimestampMs
189 );
190 assert!(resolve_declared_data_type("definitely_not_sql").is_err());
191 }
192}