Skip to main content

nodedb_sql/planner/index_ddl/
create.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Plan a `CREATE [UNIQUE] INDEX` statement parsed by sqlparser.
4
5use sqlparser::ast;
6
7use crate::SqlPlan;
8use crate::error::{Result, SqlError};
9use crate::parser::normalize::{normalize_ident, normalize_object_name_checked};
10
11/// Plan a `CREATE INDEX` statement.
12///
13/// Supports:
14/// - `CREATE [UNIQUE] INDEX [IF NOT EXISTS] [name] ON table (col [COLLATE coll])`
15/// - `COLLATE NOCASE` / `COLLATE CI` / `COLLATE CASE_INSENSITIVE` on the
16///   indexed column → `case_insensitive = true`.
17///
18/// Multi-column indexes, expression indexes, and predicate (`WHERE`) indexes
19/// are rejected with a typed error rather than silently dropped.
20pub fn plan_create_index(ci: &ast::CreateIndex) -> Result<SqlPlan> {
21    let collection =
22        normalize_object_name_checked(&ci.table_name).map_err(|_| SqlError::Parse {
23            detail: "CREATE INDEX: missing or schema-qualified table name".into(),
24        })?;
25
26    let index_name = match ci.name.as_ref() {
27        Some(n) => Some(normalize_object_name_checked(n)?),
28        None => None,
29    };
30
31    if ci.columns.is_empty() {
32        return Err(SqlError::Parse {
33            detail: "CREATE INDEX: at least one column is required".into(),
34        });
35    }
36    if ci.columns.len() > 1 {
37        return Err(SqlError::Unsupported {
38            detail: "CREATE INDEX: multi-column indexes are not supported".into(),
39        });
40    }
41
42    let col = &ci.columns[0];
43    let (field_expr, case_insensitive) = strip_collate(&col.column.expr);
44    let field = match field_expr {
45        ast::Expr::Identifier(ident) => normalize_ident(ident),
46        ast::Expr::CompoundIdentifier(parts) if parts.len() == 1 => normalize_ident(&parts[0]),
47        other => {
48            return Err(SqlError::Unsupported {
49                detail: format!("CREATE INDEX: expression indexes are not supported: {other}"),
50            });
51        }
52    };
53
54    if ci.predicate.is_some() {
55        return Err(SqlError::Unsupported {
56            detail: "CREATE INDEX: partial (WHERE) indexes are not supported".into(),
57        });
58    }
59
60    Ok(SqlPlan::CreateIndex {
61        index_name,
62        collection,
63        field,
64        unique: ci.unique,
65        if_not_exists: ci.if_not_exists,
66        case_insensitive,
67    })
68}
69
70/// Strip a `COLLATE <name>` wrapper from `expr` and return the inner
71/// expression together with whether the collation is a recognised
72/// case-insensitive collation.
73fn strip_collate(expr: &ast::Expr) -> (&ast::Expr, bool) {
74    if let ast::Expr::Collate { expr, collation } = expr {
75        let ci = collation
76            .0
77            .iter()
78            .filter_map(|part| match part {
79                ast::ObjectNamePart::Identifier(ident) => Some(ident.value.as_str()),
80                _ => None,
81            })
82            .any(is_case_insensitive_collation);
83        return (expr.as_ref(), ci);
84    }
85    (expr, false)
86}
87
88fn is_case_insensitive_collation(name: &str) -> bool {
89    name.eq_ignore_ascii_case("NOCASE")
90        || name.eq_ignore_ascii_case("CI")
91        || name.eq_ignore_ascii_case("CASE_INSENSITIVE")
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use crate::parser::statement::parse_sql;
98
99    fn plan(sql: &str) -> Result<SqlPlan> {
100        let stmts = parse_sql(sql).expect("parse");
101        let ast::Statement::CreateIndex(ci) = &stmts[0] else {
102            panic!("expected CREATE INDEX");
103        };
104        plan_create_index(ci)
105    }
106
107    #[test]
108    fn basic_index() {
109        let SqlPlan::CreateIndex {
110            index_name,
111            collection,
112            field,
113            unique,
114            if_not_exists,
115            case_insensitive,
116        } = plan("CREATE INDEX idx_users_email ON users (email)").unwrap()
117        else {
118            panic!("expected CreateIndex");
119        };
120        assert_eq!(index_name.as_deref(), Some("idx_users_email"));
121        assert_eq!(collection, "users");
122        assert_eq!(field, "email");
123        assert!(!unique);
124        assert!(!if_not_exists);
125        assert!(!case_insensitive);
126    }
127
128    #[test]
129    fn anonymous_index_name() {
130        let SqlPlan::CreateIndex { index_name, .. } =
131            plan("CREATE INDEX ON users (email)").unwrap()
132        else {
133            panic!("expected CreateIndex");
134        };
135        assert!(index_name.is_none());
136    }
137
138    #[test]
139    fn unique_and_if_not_exists() {
140        let SqlPlan::CreateIndex {
141            unique,
142            if_not_exists,
143            ..
144        } = plan("CREATE UNIQUE INDEX IF NOT EXISTS u ON users (email)").unwrap()
145        else {
146            panic!("expected CreateIndex");
147        };
148        assert!(unique);
149        assert!(if_not_exists);
150    }
151
152    #[test]
153    fn collate_nocase_detected() {
154        for sql in [
155            "CREATE INDEX i ON users (email COLLATE NOCASE)",
156            "CREATE INDEX i ON users (email COLLATE \"NOCASE\")",
157            "CREATE INDEX i ON users (email COLLATE ci)",
158            "CREATE INDEX i ON users (email COLLATE case_insensitive)",
159        ] {
160            let SqlPlan::CreateIndex {
161                case_insensitive, ..
162            } = plan(sql).unwrap()
163            else {
164                panic!("expected CreateIndex for {sql}");
165            };
166            assert!(case_insensitive, "expected case_insensitive for: {sql}");
167        }
168    }
169
170    #[test]
171    fn collate_other_not_case_insensitive() {
172        let SqlPlan::CreateIndex {
173            case_insensitive, ..
174        } = plan("CREATE INDEX i ON users (email COLLATE \"en_US\")").unwrap()
175        else {
176            panic!("expected CreateIndex");
177        };
178        assert!(!case_insensitive);
179    }
180
181    #[test]
182    fn multi_column_rejected() {
183        let err = plan("CREATE INDEX i ON users (a, b)").unwrap_err();
184        assert!(matches!(err, SqlError::Unsupported { .. }), "{err:?}");
185    }
186
187    #[test]
188    fn partial_index_rejected() {
189        let err = plan("CREATE INDEX i ON users (email) WHERE email IS NOT NULL").unwrap_err();
190        assert!(matches!(err, SqlError::Unsupported { .. }), "{err:?}");
191    }
192
193    #[test]
194    fn expression_index_rejected() {
195        let err = plan("CREATE INDEX i ON users (lower(email))").unwrap_err();
196        assert!(matches!(err, SqlError::Unsupported { .. }), "{err:?}");
197    }
198}