1use vibesql_ast::CreateIndexStmt;
4use vibesql_storage::{
5 index::{extract_mbr_from_sql_value, SpatialIndex, SpatialIndexEntry},
6 Database, SpatialIndexMetadata,
7};
8
9use crate::{errors::ExecutorError, privilege_checker::PrivilegeChecker};
10
11pub struct CreateIndexExecutor;
13
14impl CreateIndexExecutor {
15 pub fn execute(
53 stmt: &CreateIndexStmt,
54 database: &mut Database,
55 ) -> Result<String, ExecutorError> {
56 let (schema_name, table_name) =
58 if let Some((schema_part, table_part)) = stmt.table_name.split_once('.') {
59 (schema_part.to_string(), table_part.to_string())
60 } else {
61 (database.catalog.get_current_schema().to_string(), stmt.table_name.clone())
62 };
63
64 PrivilegeChecker::check_create(database, &schema_name)?;
66
67 let qualified_table_name = format!("{}.{}", schema_name, table_name);
69
70 if !database.catalog.table_exists(&qualified_table_name) {
72 return Err(ExecutorError::TableNotFound(qualified_table_name.clone()));
73 }
74
75 let table_schema = database
77 .catalog
78 .get_table(&qualified_table_name)
79 .ok_or_else(|| ExecutorError::TableNotFound(qualified_table_name.clone()))?;
80
81 for index_col in &stmt.columns {
83 if table_schema.get_column(&index_col.column_name).is_none() {
84 let available_columns =
85 table_schema.columns.iter().map(|c| c.name.clone()).collect();
86 return Err(ExecutorError::ColumnNotFound {
87 column_name: index_col.column_name.clone(),
88 table_name: qualified_table_name.clone(),
89 searched_tables: vec![qualified_table_name.clone()],
90 available_columns,
91 });
92 }
93 }
94
95 for index_col in &stmt.columns {
97 if let Some(prefix_len) = index_col.prefix_length {
98 if prefix_len == 0 {
100 return Err(ExecutorError::InvalidIndexDefinition(
101 format!("Prefix length must be greater than 0 for column '{}'", index_col.column_name),
102 ));
103 }
104
105 let column = table_schema.get_column(&index_col.column_name).unwrap(); match column.data_type {
108 vibesql_types::DataType::Varchar { .. }
109 | vibesql_types::DataType::Character { .. } => {
110 }
112 _ => {
113 return Err(ExecutorError::InvalidIndexDefinition(
114 format!(
115 "Prefix length can only be specified for string columns, but column '{}' has type {:?}",
116 index_col.column_name, column.data_type
117 ),
118 ));
119 }
120 }
121
122 const MAX_PREFIX_LENGTH: u64 = 65536;
125 if prefix_len > MAX_PREFIX_LENGTH {
126 return Err(ExecutorError::InvalidIndexDefinition(
127 format!(
128 "Prefix length {} is too large for column '{}' (maximum: {})",
129 prefix_len, index_col.column_name, MAX_PREFIX_LENGTH
130 ),
131 ));
132 }
133 }
134 }
135
136 let index_name = &stmt.index_name;
138 let index_exists = database.index_exists(index_name) || database.spatial_index_exists(index_name);
139
140 if index_exists {
141 if stmt.if_not_exists {
142 return Ok(format!("Index '{}' already exists (skipped)", index_name));
144 } else {
145 return Err(ExecutorError::IndexAlreadyExists(index_name.clone()));
146 }
147 }
148
149 match &stmt.index_type {
151 vibesql_ast::IndexType::BTree { unique } => {
152 let index_metadata = vibesql_catalog::IndexMetadata::new(
154 index_name.clone(),
155 table_name.clone(),
156 vibesql_catalog::IndexType::BTree,
157 stmt.columns
158 .iter()
159 .map(|col| vibesql_catalog::IndexedColumn {
160 column_name: col.column_name.clone(),
161 order: match col.direction {
162 vibesql_ast::OrderDirection::Asc => vibesql_catalog::SortOrder::Ascending,
163 vibesql_ast::OrderDirection::Desc => vibesql_catalog::SortOrder::Descending,
164 },
165 prefix_length: col.prefix_length,
166 })
167 .collect(),
168 *unique,
169 );
170 database.catalog.add_index(index_metadata)?;
171
172 database.create_index(
174 index_name.clone(),
175 table_name.clone(),
176 *unique,
177 stmt.columns.clone(),
178 )?;
179
180 Ok(format!(
181 "Index '{}' created successfully on table '{}'",
182 index_name, qualified_table_name
183 ))
184 }
185 vibesql_ast::IndexType::Fulltext => {
186 Err(ExecutorError::UnsupportedFeature(
187 "FULLTEXT indexes are not yet implemented".to_string(),
188 ))
189 }
190 vibesql_ast::IndexType::Spatial => {
191 if stmt.columns.len() != 1 {
193 return Err(ExecutorError::InvalidIndexDefinition(
194 "SPATIAL indexes must be defined on exactly one column".to_string(),
195 ));
196 }
197
198 let column_name = &stmt.columns[0].column_name;
199
200 let col_idx = table_schema
202 .get_column_index(column_name)
203 .ok_or_else(|| ExecutorError::ColumnNotFound {
204 column_name: column_name.clone(),
205 table_name: qualified_table_name.clone(),
206 searched_tables: vec![qualified_table_name.clone()],
207 available_columns: table_schema
208 .columns
209 .iter()
210 .map(|c| c.name.clone())
211 .collect(),
212 })?;
213
214 let table = database
216 .get_table(&table_name)
217 .ok_or_else(|| ExecutorError::TableNotFound(qualified_table_name.clone()))?;
218
219 let mut entries = Vec::new();
220 for (row_idx, row) in table.scan().iter().enumerate() {
221 let geom_value = &row.values[col_idx];
222
223 if let Some(mbr) = extract_mbr_from_sql_value(geom_value) {
225 entries.push(SpatialIndexEntry { row_id: row_idx, mbr });
226 }
227 }
228
229 let spatial_index = SpatialIndex::bulk_load(column_name.clone(), entries);
231
232 let index_metadata = vibesql_catalog::IndexMetadata::new(
234 index_name.clone(),
235 table_name.clone(),
236 vibesql_catalog::IndexType::RTree,
237 vec![vibesql_catalog::IndexedColumn {
238 column_name: column_name.clone(),
239 order: vibesql_catalog::SortOrder::Ascending,
240 prefix_length: None, }],
242 false,
243 );
244 database.catalog.add_index(index_metadata)?;
245
246 let metadata = SpatialIndexMetadata {
248 index_name: index_name.clone(),
249 table_name: table_name.clone(),
250 column_name: column_name.clone(),
251 created_at: Some(chrono::Utc::now()),
252 };
253
254 database.create_spatial_index(metadata, spatial_index)?;
255
256 Ok(format!(
257 "Spatial index '{}' created successfully on table '{}'",
258 index_name, qualified_table_name
259 ))
260 }
261 }
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use vibesql_ast::{ColumnDef, CreateTableStmt, IndexColumn, OrderDirection};
268 use vibesql_types::DataType;
269
270 use super::*;
271 use crate::CreateTableExecutor;
272
273 fn create_test_table(db: &mut Database) {
274 let stmt = CreateTableStmt {
275 table_name: "users".to_string(),
276 columns: vec![
277 ColumnDef {
278 name: "id".to_string(),
279 data_type: DataType::Integer,
280 nullable: false,
281 constraints: vec![],
282 default_value: None,
283 comment: None,
284 },
285 ColumnDef {
286 name: "email".to_string(),
287 data_type: DataType::Varchar { max_length: Some(255) },
288 nullable: false,
289 constraints: vec![],
290 default_value: None,
291 comment: None,
292 },
293 ColumnDef {
294 name: "name".to_string(),
295 data_type: DataType::Varchar { max_length: Some(100) },
296 nullable: true,
297 constraints: vec![],
298 default_value: None,
299 comment: None,
300 },
301 ],
302 table_constraints: vec![],
303 table_options: vec![],
304 };
305
306 CreateTableExecutor::execute(&stmt, db).unwrap();
307 }
308
309 #[test]
310 fn test_create_simple_index() {
311 let mut db = Database::new();
312 create_test_table(&mut db);
313
314 let stmt = CreateIndexStmt {
315 index_name: "idx_users_email".to_string(),
316 if_not_exists: false,
317 table_name: "users".to_string(),
318 index_type: vibesql_ast::IndexType::BTree { unique: false },
319 columns: vec![IndexColumn {
320 column_name: "email".to_string(),
321 direction: OrderDirection::Asc,
322 prefix_length: None,
323 }],
324 };
325
326 let result = CreateIndexExecutor::execute(&stmt, &mut db);
327 assert!(result.is_ok());
328 assert_eq!(
329 result.unwrap(),
330 "Index 'idx_users_email' created successfully on table 'public.users'"
331 );
332
333 assert!(db.index_exists("idx_users_email"));
335 }
336
337 #[test]
338 fn test_create_unique_index() {
339 let mut db = Database::new();
340 create_test_table(&mut db);
341
342 let stmt = CreateIndexStmt {
343 index_name: "idx_users_email_unique".to_string(),
344 if_not_exists: false,
345 table_name: "users".to_string(),
346 index_type: vibesql_ast::IndexType::BTree { unique: true },
347 columns: vec![IndexColumn {
348 column_name: "email".to_string(),
349 direction: OrderDirection::Asc,
350 prefix_length: None,
351 }],
352 };
353
354 let result = CreateIndexExecutor::execute(&stmt, &mut db);
355 assert!(result.is_ok());
356 assert!(db.index_exists("idx_users_email_unique"));
357 }
358
359 #[test]
360 fn test_create_multi_column_index() {
361 let mut db = Database::new();
362 create_test_table(&mut db);
363
364 let stmt = CreateIndexStmt {
365 index_name: "idx_users_email_name".to_string(),
366 if_not_exists: false,
367 table_name: "users".to_string(),
368 index_type: vibesql_ast::IndexType::BTree { unique: false },
369 columns: vec![
370 IndexColumn { column_name: "email".to_string(), direction: OrderDirection::Asc, prefix_length: None },
371 IndexColumn { column_name: "name".to_string(), direction: OrderDirection::Desc, prefix_length: None },
372 ],
373 };
374
375 let result = CreateIndexExecutor::execute(&stmt, &mut db);
376 assert!(result.is_ok());
377 }
378
379 #[test]
380 fn test_create_index_duplicate_name() {
381 let mut db = Database::new();
382 create_test_table(&mut db);
383
384 let stmt = CreateIndexStmt {
385 index_name: "idx_users_email".to_string(),
386 if_not_exists: false,
387 table_name: "users".to_string(),
388 index_type: vibesql_ast::IndexType::BTree { unique: false },
389 columns: vec![IndexColumn {
390 column_name: "email".to_string(),
391 direction: OrderDirection::Asc,
392 prefix_length: None,
393 }],
394 };
395
396 let result = CreateIndexExecutor::execute(&stmt, &mut db);
398 assert!(result.is_ok());
399
400 let result = CreateIndexExecutor::execute(&stmt, &mut db);
402 assert!(result.is_err());
403 assert!(matches!(result, Err(ExecutorError::IndexAlreadyExists(_))));
404 }
405
406 #[test]
407 fn test_create_index_on_nonexistent_table() {
408 let mut db = Database::new();
409
410 let stmt = CreateIndexStmt {
411 index_name: "idx_nonexistent".to_string(),
412 if_not_exists: false,
413 table_name: "nonexistent_table".to_string(),
414 index_type: vibesql_ast::IndexType::BTree { unique: false },
415 columns: vec![IndexColumn {
416 column_name: "id".to_string(),
417 direction: OrderDirection::Asc,
418 prefix_length: None,
419 }],
420 };
421
422 let result = CreateIndexExecutor::execute(&stmt, &mut db);
423 assert!(result.is_err());
424 assert!(matches!(result, Err(ExecutorError::TableNotFound(_))));
425 }
426
427 #[test]
428 fn test_create_index_on_nonexistent_column() {
429 let mut db = Database::new();
430 create_test_table(&mut db);
431
432 let stmt = CreateIndexStmt {
433 index_name: "idx_users_nonexistent".to_string(),
434 if_not_exists: false,
435 table_name: "users".to_string(),
436 index_type: vibesql_ast::IndexType::BTree { unique: false },
437 columns: vec![IndexColumn {
438 column_name: "nonexistent_column".to_string(),
439 direction: OrderDirection::Asc,
440 prefix_length: None,
441 }],
442 };
443
444 let result = CreateIndexExecutor::execute(&stmt, &mut db);
445 assert!(result.is_err());
446 assert!(matches!(result, Err(ExecutorError::ColumnNotFound { .. })));
447 }
448
449 #[test]
450 fn test_create_index_if_not_exists_when_not_exists() {
451 let mut db = Database::new();
452 create_test_table(&mut db);
453
454 let stmt = CreateIndexStmt {
455 index_name: "idx_users_email".to_string(),
456 if_not_exists: true,
457 table_name: "users".to_string(),
458 index_type: vibesql_ast::IndexType::BTree { unique: false },
459 columns: vec![IndexColumn {
460 column_name: "email".to_string(),
461 direction: OrderDirection::Asc,
462 prefix_length: None,
463 }],
464 };
465
466 let result = CreateIndexExecutor::execute(&stmt, &mut db);
467 assert!(result.is_ok());
468 assert_eq!(
469 result.unwrap(),
470 "Index 'idx_users_email' created successfully on table 'public.users'"
471 );
472 assert!(db.index_exists("idx_users_email"));
473 }
474
475 #[test]
476 fn test_create_index_if_not_exists_when_exists() {
477 let mut db = Database::new();
478 create_test_table(&mut db);
479
480 let stmt = CreateIndexStmt {
482 index_name: "idx_users_email".to_string(),
483 if_not_exists: false,
484 table_name: "users".to_string(),
485 index_type: vibesql_ast::IndexType::BTree { unique: false },
486 columns: vec![IndexColumn {
487 column_name: "email".to_string(),
488 direction: OrderDirection::Asc,
489 prefix_length: None,
490 }],
491 };
492 CreateIndexExecutor::execute(&stmt, &mut db).unwrap();
493
494 let stmt_with_if_not_exists = CreateIndexStmt {
496 index_name: "idx_users_email".to_string(),
497 if_not_exists: true,
498 table_name: "users".to_string(),
499 index_type: vibesql_ast::IndexType::BTree { unique: false },
500 columns: vec![IndexColumn {
501 column_name: "email".to_string(),
502 direction: OrderDirection::Asc,
503 prefix_length: None,
504 }],
505 };
506 let result = CreateIndexExecutor::execute(&stmt_with_if_not_exists, &mut db);
507 assert!(result.is_ok());
508 assert!(db.index_exists("idx_users_email"));
509 }
510
511 #[test]
512 fn test_create_index_with_schema_qualified_table() {
513 let mut db = Database::new();
514 create_test_table(&mut db);
515
516 let index_stmt = CreateIndexStmt {
518 index_name: "idx_users_email_qualified".to_string(),
519 if_not_exists: false,
520 table_name: "public.users".to_string(), index_type: vibesql_ast::IndexType::BTree { unique: false },
522 columns: vec![IndexColumn {
523 column_name: "email".to_string(),
524 direction: OrderDirection::Asc,
525 prefix_length: None,
526 }],
527 };
528
529 let result = CreateIndexExecutor::execute(&index_stmt, &mut db);
530 assert!(result.is_ok());
531 assert_eq!(
532 result.unwrap(),
533 "Index 'idx_users_email_qualified' created successfully on table 'public.users'"
534 );
535
536 assert!(db.index_exists("idx_users_email_qualified"));
538 }
539
540 #[test]
541 fn test_create_index_on_nonexistent_schema_qualified_table() {
542 let mut db = Database::new();
543
544 db.catalog.create_schema("test_schema".to_string()).unwrap();
546
547 let index_stmt = CreateIndexStmt {
549 index_name: "idx_nonexistent".to_string(),
550 if_not_exists: false,
551 table_name: "test_schema.nonexistent_table".to_string(),
552 index_type: vibesql_ast::IndexType::BTree { unique: false },
553 columns: vec![IndexColumn {
554 column_name: "id".to_string(),
555 direction: OrderDirection::Asc,
556 prefix_length: None,
557 }],
558 };
559
560 let result = CreateIndexExecutor::execute(&index_stmt, &mut db);
561 assert!(result.is_err());
562 assert!(matches!(result, Err(ExecutorError::TableNotFound(_))));
563 }
564}