nodedb_sql/planner/index_ddl/
create.rs1use sqlparser::ast;
6
7use crate::SqlPlan;
8use crate::error::{Result, SqlError};
9use crate::parser::normalize::{normalize_ident, normalize_object_name_checked};
10
11pub fn plan_create_index(ci: &ast::CreateIndex) -> Result<SqlPlan> {
21 let collection =
22 normalize_object_name_checked(&ci.table_name).map_err(|_| SqlError::Parse {
23 detail: "CREATE INDEX: missing or schema-qualified table name".into(),
24 })?;
25
26 let index_name = match ci.name.as_ref() {
27 Some(n) => Some(normalize_object_name_checked(n)?),
28 None => None,
29 };
30
31 if ci.columns.is_empty() {
32 return Err(SqlError::Parse {
33 detail: "CREATE INDEX: at least one column is required".into(),
34 });
35 }
36 if ci.columns.len() > 1 {
37 return Err(SqlError::Unsupported {
38 detail: "CREATE INDEX: multi-column indexes are not supported".into(),
39 });
40 }
41
42 let col = &ci.columns[0];
43 let (field_expr, case_insensitive) = strip_collate(&col.column.expr);
44 let field = match field_expr {
45 ast::Expr::Identifier(ident) => normalize_ident(ident),
46 ast::Expr::CompoundIdentifier(parts) if parts.len() == 1 => normalize_ident(&parts[0]),
47 other => {
48 return Err(SqlError::Unsupported {
49 detail: format!("CREATE INDEX: expression indexes are not supported: {other}"),
50 });
51 }
52 };
53
54 if ci.predicate.is_some() {
55 return Err(SqlError::Unsupported {
56 detail: "CREATE INDEX: partial (WHERE) indexes are not supported".into(),
57 });
58 }
59
60 Ok(SqlPlan::CreateIndex {
61 index_name,
62 collection,
63 field,
64 unique: ci.unique,
65 if_not_exists: ci.if_not_exists,
66 case_insensitive,
67 })
68}
69
70fn strip_collate(expr: &ast::Expr) -> (&ast::Expr, bool) {
74 if let ast::Expr::Collate { expr, collation } = expr {
75 let ci = collation
76 .0
77 .iter()
78 .filter_map(|part| match part {
79 ast::ObjectNamePart::Identifier(ident) => Some(ident.value.as_str()),
80 _ => None,
81 })
82 .any(is_case_insensitive_collation);
83 return (expr.as_ref(), ci);
84 }
85 (expr, false)
86}
87
88fn is_case_insensitive_collation(name: &str) -> bool {
89 name.eq_ignore_ascii_case("NOCASE")
90 || name.eq_ignore_ascii_case("CI")
91 || name.eq_ignore_ascii_case("CASE_INSENSITIVE")
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use crate::parser::statement::parse_sql;
98
99 fn plan(sql: &str) -> Result<SqlPlan> {
100 let stmts = parse_sql(sql).expect("parse");
101 let ast::Statement::CreateIndex(ci) = &stmts[0] else {
102 panic!("expected CREATE INDEX");
103 };
104 plan_create_index(ci)
105 }
106
107 #[test]
108 fn basic_index() {
109 let SqlPlan::CreateIndex {
110 index_name,
111 collection,
112 field,
113 unique,
114 if_not_exists,
115 case_insensitive,
116 } = plan("CREATE INDEX idx_users_email ON users (email)").unwrap()
117 else {
118 panic!("expected CreateIndex");
119 };
120 assert_eq!(index_name.as_deref(), Some("idx_users_email"));
121 assert_eq!(collection, "users");
122 assert_eq!(field, "email");
123 assert!(!unique);
124 assert!(!if_not_exists);
125 assert!(!case_insensitive);
126 }
127
128 #[test]
129 fn anonymous_index_name() {
130 let SqlPlan::CreateIndex { index_name, .. } =
131 plan("CREATE INDEX ON users (email)").unwrap()
132 else {
133 panic!("expected CreateIndex");
134 };
135 assert!(index_name.is_none());
136 }
137
138 #[test]
139 fn unique_and_if_not_exists() {
140 let SqlPlan::CreateIndex {
141 unique,
142 if_not_exists,
143 ..
144 } = plan("CREATE UNIQUE INDEX IF NOT EXISTS u ON users (email)").unwrap()
145 else {
146 panic!("expected CreateIndex");
147 };
148 assert!(unique);
149 assert!(if_not_exists);
150 }
151
152 #[test]
153 fn collate_nocase_detected() {
154 for sql in [
155 "CREATE INDEX i ON users (email COLLATE NOCASE)",
156 "CREATE INDEX i ON users (email COLLATE \"NOCASE\")",
157 "CREATE INDEX i ON users (email COLLATE ci)",
158 "CREATE INDEX i ON users (email COLLATE case_insensitive)",
159 ] {
160 let SqlPlan::CreateIndex {
161 case_insensitive, ..
162 } = plan(sql).unwrap()
163 else {
164 panic!("expected CreateIndex for {sql}");
165 };
166 assert!(case_insensitive, "expected case_insensitive for: {sql}");
167 }
168 }
169
170 #[test]
171 fn collate_other_not_case_insensitive() {
172 let SqlPlan::CreateIndex {
173 case_insensitive, ..
174 } = plan("CREATE INDEX i ON users (email COLLATE \"en_US\")").unwrap()
175 else {
176 panic!("expected CreateIndex");
177 };
178 assert!(!case_insensitive);
179 }
180
181 #[test]
182 fn multi_column_rejected() {
183 let err = plan("CREATE INDEX i ON users (a, b)").unwrap_err();
184 assert!(matches!(err, SqlError::Unsupported { .. }), "{err:?}");
185 }
186
187 #[test]
188 fn partial_index_rejected() {
189 let err = plan("CREATE INDEX i ON users (email) WHERE email IS NOT NULL").unwrap_err();
190 assert!(matches!(err, SqlError::Unsupported { .. }), "{err:?}");
191 }
192
193 #[test]
194 fn expression_index_rejected() {
195 let err = plan("CREATE INDEX i ON users (lower(email))").unwrap_err();
196 assert!(matches!(err, SqlError::Unsupported { .. }), "{err:?}");
197 }
198}