1use std::collections::HashMap;
6
7use vibesql_ast::IndexColumn;
8
9use super::indexes::IndexManager;
10use crate::{
11 index::{extract_mbr_from_sql_value, SpatialIndex},
12 progress::ProgressTracker,
13 Row, StorageError, Table,
14};
15
16#[derive(Debug, Clone)]
18pub struct SpatialIndexMetadata {
19 pub index_name: String,
20 pub table_name: String,
21 pub column_name: String,
22 pub created_at: Option<chrono::DateTime<chrono::Utc>>,
23}
24
25#[derive(Debug, Clone)]
27pub struct Operations {
28 index_manager: IndexManager,
30 spatial_indexes: HashMap<String, (SpatialIndexMetadata, SpatialIndex)>,
34}
35
36impl Operations {
37 pub fn new() -> Self {
39 Operations { index_manager: IndexManager::new(), spatial_indexes: HashMap::new() }
40 }
41
42 pub fn set_database_path(&mut self, path: std::path::PathBuf) {
44 self.index_manager.set_database_path(path);
45 }
46
47 pub fn set_config(&mut self, config: super::DatabaseConfig) {
49 self.index_manager.set_config(config);
50 }
51
52 #[cfg(target_arch = "wasm32")]
57 pub async fn init_opfs_async(&mut self) -> Result<(), crate::StorageError> {
58 self.index_manager.init_opfs_async().await
59 }
60
61 pub fn create_table(
67 &mut self,
68 catalog: &mut vibesql_catalog::Catalog,
69 schema: vibesql_catalog::TableSchema,
70 ) -> Result<(), StorageError> {
71 let _table_name = schema.name.clone();
72
73 catalog
75 .create_table(schema.clone())
76 .map_err(|e| StorageError::CatalogError(e.to_string()))?;
77
78 Ok(())
79 }
80
81 fn find_table_mut<'a>(
91 catalog: &vibesql_catalog::Catalog,
92 tables: &'a mut HashMap<String, Table>,
93 table_name: &str,
94 ) -> Result<&'a mut Table, StorageError> {
95 let resolved_name = if let Some((schema_part, table_part)) = table_name.split_once('.') {
98 if schema_part.eq_ignore_ascii_case(vibesql_catalog::TEMP_SCHEMA) {
99 Some(format!("{}.{}", catalog.temp_schema_name(), table_part))
100 } else {
101 None
102 }
103 } else {
104 None
105 };
106 let table_name = resolved_name.as_deref().unwrap_or(table_name);
107
108 if tables.contains_key(table_name) {
110 return Ok(tables.get_mut(table_name).unwrap());
111 }
112
113 let normalized_name = if catalog.is_case_sensitive_identifiers() {
114 table_name.to_string()
115 } else {
116 table_name.to_lowercase()
117 };
118
119 if normalized_name != table_name && tables.contains_key(&normalized_name) {
121 return Ok(tables.get_mut(&normalized_name).unwrap());
122 }
123
124 if !table_name.contains('.') {
126 let temp_qualified = format!("{}.{}", catalog.temp_schema_name(), normalized_name);
128 if tables.contains_key(&temp_qualified) {
129 return Ok(tables.get_mut(&temp_qualified).unwrap());
130 }
131
132 let current_schema = catalog.get_current_schema();
133
134 let qualified_original = format!("{}.{}", current_schema, table_name);
136 if tables.contains_key(&qualified_original) {
137 return Ok(tables.get_mut(&qualified_original).unwrap());
138 }
139
140 if normalized_name != table_name {
142 let qualified_normalized = format!("{}.{}", current_schema, normalized_name);
143 if tables.contains_key(&qualified_normalized) {
144 return Ok(tables.get_mut(&qualified_normalized).unwrap());
145 }
146 }
147 }
148
149 Err(StorageError::TableNotFound(table_name.to_string()))
150 }
151
152 pub fn drop_table(
157 &mut self,
158 catalog: &mut vibesql_catalog::Catalog,
159 tables: &mut HashMap<String, Table>,
160 name: &str,
161 ) -> Result<(), StorageError> {
162 let normalized_name = if catalog.is_case_sensitive_identifiers() {
164 name.to_string()
165 } else {
166 name.to_lowercase()
167 };
168
169 let resolved_name = if let Some((schema_part, table_part)) = normalized_name.split_once('.') {
171 if schema_part.eq_ignore_ascii_case(vibesql_catalog::TEMP_SCHEMA) {
172 format!("{}.{}", catalog.temp_schema_name(), table_part)
173 } else {
174 normalized_name.clone()
175 }
176 } else {
177 normalized_name.clone()
178 };
179
180 let qualified_name = if resolved_name.contains('.') {
182 resolved_name.clone()
183 } else {
184 let current_schema = catalog.get_current_schema();
185 format!("{}.{}", current_schema, resolved_name)
186 };
187
188 self.index_manager.drop_indexes_for_table(&qualified_name);
190
191 self.drop_spatial_indexes_for_table(&qualified_name);
193
194 catalog.drop_table(name).map_err(|e| StorageError::CatalogError(e.to_string()))?;
196
197 if tables.remove(&resolved_name).is_none() {
199 tables.remove(&qualified_name);
200 }
201
202 Ok(())
203 }
204
205 pub fn insert_row(
207 &mut self,
208 catalog: &vibesql_catalog::Catalog,
209 tables: &mut HashMap<String, Table>,
210 table_name: &str,
211 row: Row,
212 ) -> Result<usize, StorageError> {
213 let table = Self::find_table_mut(catalog, tables, table_name)?;
215
216 let row_index = table.row_count();
217
218 if let Some(table_schema) = catalog.get_table(table_name) {
220 self.index_manager.check_unique_constraints_for_insert(
221 table_name,
222 table_schema,
223 &row,
224 )?;
225 }
226
227 table.insert(row.clone())?;
229
230 if let Some(table_schema) = catalog.get_table(table_name) {
232 self.index_manager.add_to_indexes_for_insert(table_name, table_schema, &row, row_index);
233 }
234
235 self.update_spatial_indexes_for_insert(catalog, table_name, &row, row_index);
237
238 Ok(row_index)
239 }
240
241 pub fn insert_rows_batch(
255 &mut self,
256 catalog: &vibesql_catalog::Catalog,
257 tables: &mut HashMap<String, Table>,
258 table_name: &str,
259 rows: Vec<Row>,
260 ) -> Result<Vec<usize>, StorageError> {
261 if rows.is_empty() {
262 return Ok(Vec::new());
263 }
264
265 let table = Self::find_table_mut(catalog, tables, table_name)?;
267
268 let table_schema = catalog.get_table(table_name);
270
271 if let Some(schema) = table_schema {
274 for row in &rows {
275 self.index_manager.check_unique_constraints_for_insert(table_name, schema, row)?;
276 }
277 }
278
279 let start_index = table.row_count();
281
282 let has_btree_indexes = self.index_manager.has_indexes_for_table(table_name);
285 let has_spatial_indexes = self.has_spatial_indexes_for_table(table_name);
286 let needs_index_updates = has_btree_indexes || has_spatial_indexes;
287
288 let rows_for_indexes = if needs_index_updates { Some(rows.clone()) } else { None };
291
292 let count = table.insert_batch(rows)?;
294
295 let row_indices: Vec<usize> = (start_index..start_index + count).collect();
297
298 if let Some(rows_ref) = rows_for_indexes {
301 let rows_to_insert: Vec<(usize, &Row)> =
302 rows_ref.iter().enumerate().map(|(i, row)| (start_index + i, row)).collect();
303 self.batch_add_to_indexes_for_insert(catalog, table_name, &rows_to_insert);
304 }
305
306 Ok(row_indices)
307 }
308
309 #[allow(dead_code)] pub fn insert_rows_iter<I>(
332 &mut self,
333 catalog: &vibesql_catalog::Catalog,
334 tables: &mut HashMap<String, Table>,
335 table_name: &str,
336 rows: I,
337 batch_size: usize,
338 ) -> Result<usize, StorageError>
339 where
340 I: Iterator<Item = Row>,
341 {
342 let batch_size = if batch_size == 0 { 1000 } else { batch_size };
343 let mut total_inserted = 0;
344 let mut batch = Vec::with_capacity(batch_size);
345
346 for row in rows {
347 batch.push(row);
348
349 if batch.len() >= batch_size {
350 let indices = self.insert_rows_batch(
351 catalog,
352 tables,
353 table_name,
354 std::mem::take(&mut batch),
355 )?;
356 total_inserted += indices.len();
357 batch = Vec::with_capacity(batch_size);
358 }
359 }
360
361 if !batch.is_empty() {
363 let indices = self.insert_rows_batch(catalog, tables, table_name, batch)?;
364 total_inserted += indices.len();
365 }
366
367 Ok(total_inserted)
368 }
369
370 fn validate_prefix_lengths(
380 table_schema: &vibesql_catalog::TableSchema,
381 columns: &[IndexColumn],
382 ) -> Result<(), StorageError> {
383 use vibesql_types::DataType;
384
385 for index_col in columns {
386 if let Some(prefix_length) = index_col.prefix_length() {
387 let column_schema = table_schema
389 .columns
390 .iter()
391 .find(|col| col.name == index_col.expect_column_name())
392 .ok_or_else(|| StorageError::ColumnNotFound {
393 column_name: index_col.expect_column_name().to_string(),
394 table_name: table_schema.name.clone(),
395 })?;
396
397 match &column_schema.data_type {
399 DataType::Character { length } => {
401 if prefix_length as usize > *length {
403 eprintln!(
404 "Warning: Key part '{}' prefix length ({}) exceeds column width ({})",
405 index_col.expect_column_name(), prefix_length, length
406 );
407 }
408 }
409 DataType::Varchar { max_length } => {
410 if let Some(max_len) = max_length {
412 if prefix_length as usize > *max_len {
413 eprintln!(
414 "Warning: Key part '{}' prefix length ({}) exceeds column width ({})",
415 index_col.expect_column_name(), prefix_length, max_len
416 );
417 }
418 }
419 }
420 DataType::CharacterLargeObject | DataType::Name => {
421 }
423 DataType::BinaryLargeObject => {
424 }
426 _ => {
428 return Err(StorageError::InvalidIndexColumn(format!(
429 "Incorrect prefix key; the used key part '{}' isn't a string or binary type (type: {:?})",
430 index_col.expect_column_name(), column_schema.data_type
431 )));
432 }
433 }
434 }
435 }
436
437 Ok(())
438 }
439
440 pub fn create_index(
442 &mut self,
443 catalog: &vibesql_catalog::Catalog,
444 tables: &HashMap<String, Table>,
445 index_name: String,
446 table_name: String,
447 unique: bool,
448 columns: Vec<IndexColumn>,
449 ) -> Result<(), StorageError> {
450 let normalized_name = if catalog.is_case_sensitive_identifiers() {
452 table_name.clone()
453 } else {
454 table_name.to_lowercase()
455 };
456
457 let table = if let Some(tbl) = tables.get(&normalized_name) {
459 tbl
460 } else if !table_name.contains('.') {
461 let current_schema = catalog.get_current_schema();
463 let qualified_name = format!("{}.{}", current_schema, normalized_name);
464 tables
465 .get(&qualified_name)
466 .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?
467 } else {
468 return Err(StorageError::TableNotFound(table_name.clone()));
469 };
470
471 let table_schema = catalog
472 .get_table(&table_name)
473 .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
474
475 Self::validate_prefix_lengths(table_schema, &columns)?;
477
478 self.index_manager.create_index(
481 index_name,
482 table_name,
483 table_schema,
484 table.scan(),
485 unique,
486 columns,
487 )
488 }
489
490 pub fn create_index_with_keys(
496 &mut self,
497 catalog: &vibesql_catalog::Catalog,
498 index_name: String,
499 table_name: String,
500 unique: bool,
501 columns: Vec<vibesql_ast::IndexColumn>,
502 keys: Vec<(Vec<vibesql_types::SqlValue>, usize)>,
503 ) -> Result<(), StorageError> {
504 let table_schema = catalog
506 .get_table(&table_name)
507 .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
508
509 self.index_manager.create_index_with_keys(
510 index_name,
511 table_name,
512 table_schema,
513 unique,
514 columns,
515 keys,
516 )
517 }
518
519 pub fn index_exists(&self, index_name: &str) -> bool {
521 self.index_manager.index_exists(index_name)
522 }
523
524 pub fn get_index(&self, index_name: &str) -> Option<&super::indexes::IndexMetadata> {
526 self.index_manager.get_index(index_name)
527 }
528
529 pub fn get_index_data(&self, index_name: &str) -> Option<&super::indexes::IndexData> {
531 self.index_manager.get_index_data(index_name)
532 }
533
534 pub fn update_indexes_for_update(
545 &mut self,
546 catalog: &vibesql_catalog::Catalog,
547 table_name: &str,
548 old_row: &Row,
549 new_row: &Row,
550 row_index: usize,
551 changed_columns: Option<&std::collections::HashSet<usize>>,
552 ) {
553 if let Some(table_schema) = catalog.get_table(table_name) {
554 self.index_manager.update_indexes_for_update(
555 table_name,
556 table_schema,
557 old_row,
558 new_row,
559 row_index,
560 changed_columns,
561 );
562 }
563
564 self.update_spatial_indexes_for_update(catalog, table_name, old_row, new_row, row_index);
565 }
566
567 pub fn update_indexes_for_delete(
569 &mut self,
570 catalog: &vibesql_catalog::Catalog,
571 table_name: &str,
572 row: &Row,
573 row_index: usize,
574 ) {
575 self.update_indexes_for_delete_with_values(catalog, table_name, &row.values, row_index);
576 }
577
578 pub fn update_indexes_for_delete_with_values(
584 &mut self,
585 catalog: &vibesql_catalog::Catalog,
586 table_name: &str,
587 values: &[vibesql_types::SqlValue],
588 row_index: usize,
589 ) {
590 if let Some(table_schema) = catalog.get_table(table_name) {
591 self.index_manager.update_indexes_for_delete_with_values(
592 table_name,
593 table_schema,
594 values,
595 row_index,
596 );
597 }
598
599 self.update_spatial_indexes_for_delete_with_values(catalog, table_name, values, row_index);
600 }
601
602 pub fn batch_update_indexes_for_delete(
607 &mut self,
608 catalog: &vibesql_catalog::Catalog,
609 table_name: &str,
610 rows_to_delete: &[(usize, &Row)],
611 ) {
612 if let Some(table_schema) = catalog.get_table(table_name) {
613 self.index_manager.batch_update_indexes_for_delete(
614 table_name,
615 table_schema,
616 rows_to_delete,
617 );
618 }
619
620 self.batch_update_spatial_indexes_for_delete(catalog, table_name, rows_to_delete);
622 }
623
624 pub fn batch_add_to_indexes_for_insert(
629 &mut self,
630 catalog: &vibesql_catalog::Catalog,
631 table_name: &str,
632 rows_to_insert: &[(usize, &Row)],
633 ) {
634 if let Some(table_schema) = catalog.get_table(table_name) {
635 self.index_manager.batch_add_to_indexes_for_insert(
636 table_name,
637 table_schema,
638 rows_to_insert,
639 );
640 }
641
642 self.batch_update_spatial_indexes_for_insert(catalog, table_name, rows_to_insert);
644 }
645
646 pub fn add_to_expression_indexes_for_insert(
655 &mut self,
656 table_name: &str,
657 row_index: usize,
658 expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
659 ) {
660 self.index_manager.add_to_expression_indexes_for_insert(
661 table_name,
662 row_index,
663 expression_keys,
664 );
665 }
666
667 pub fn update_expression_indexes_for_update(
669 &mut self,
670 table_name: &str,
671 row_index: usize,
672 old_expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
673 new_expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
674 ) {
675 self.index_manager.update_expression_indexes_for_update(
676 table_name,
677 row_index,
678 old_expression_keys,
679 new_expression_keys,
680 );
681 }
682
683 pub fn update_expression_indexes_for_delete(
685 &mut self,
686 table_name: &str,
687 row_index: usize,
688 expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
689 ) {
690 self.index_manager.update_expression_indexes_for_delete(
691 table_name,
692 row_index,
693 expression_keys,
694 );
695 }
696
697 pub fn get_expression_indexes_for_table(
702 &self,
703 table_name: &str,
704 ) -> Vec<(String, &super::indexes::IndexMetadata)> {
705 self.index_manager.get_expression_indexes_for_table(table_name)
706 }
707
708 pub fn has_expression_indexes(&self, table_name: &str) -> bool {
710 self.index_manager.has_expression_indexes(table_name)
711 }
712
713 pub fn clear_expression_index_data(&mut self, table_name: &str) {
715 self.index_manager.clear_expression_index_data(table_name);
716 }
717
718 pub fn rebuild_indexes(
720 &mut self,
721 catalog: &vibesql_catalog::Catalog,
722 tables: &HashMap<String, Table>,
723 table_name: &str,
724 ) {
725 let normalized_name = if catalog.is_case_sensitive_identifiers() {
727 table_name.to_string()
728 } else {
729 table_name.to_lowercase()
730 };
731
732 let table_rows: Vec<Row> = if let Some(table) = tables.get(&normalized_name) {
734 table.scan().to_vec()
735 } else if !table_name.contains('.') {
736 let current_schema = catalog.get_current_schema();
738 let qualified_name = format!("{}.{}", current_schema, normalized_name);
739 if let Some(table) = tables.get(&qualified_name) {
740 table.scan().to_vec()
741 } else {
742 return;
743 }
744 } else {
745 return;
746 };
747
748 let table_schema = match catalog.get_table(table_name) {
749 Some(schema) => schema,
750 None => return,
751 };
752
753 self.index_manager.rebuild_indexes(table_name, table_schema, &table_rows);
754 }
755
756 pub fn adjust_indexes_after_delete(&mut self, table_name: &str, deleted_indices: &[usize]) {
765 self.index_manager.adjust_indexes_after_delete(table_name, deleted_indices);
766 }
767
768 pub fn drop_index(&mut self, index_name: &str) -> Result<(), StorageError> {
770 self.index_manager.drop_index(index_name)
771 }
772
773 pub fn list_indexes(&self) -> Vec<String> {
775 self.index_manager.list_indexes()
776 }
777
778 pub fn list_indexes_for_table(&self, table_name: &str) -> Vec<String> {
780 let normalized_search = table_name.to_lowercase();
782
783 self.index_manager
784 .list_indexes()
785 .into_iter()
786 .filter(|index_name| {
787 self.index_manager
788 .get_index(index_name)
789 .map(|metadata| {
790 metadata.table_name.to_lowercase() == normalized_search
792 })
793 .unwrap_or(false)
794 })
795 .collect()
796 }
797
798 #[inline]
801 pub fn has_index_on_column(&self, table_name: &str, column_name: &str) -> bool {
802 let normalized_table = table_name.to_lowercase();
803 let normalized_column = column_name.to_lowercase();
804
805 for index_name in self.index_manager.list_indexes() {
807 if let Some(metadata) = self.index_manager.get_index(&index_name) {
808 if metadata.table_name.to_lowercase() == normalized_table {
809 for col in &metadata.columns {
810 if let Some(col_name) = col.column_name() {
813 if col_name.to_lowercase() == normalized_column {
814 return true;
815 }
816 }
817 }
819 }
820 }
821 }
822
823 for (metadata, _) in self.spatial_indexes.values() {
825 if metadata.table_name.to_lowercase() == normalized_table
826 && metadata.column_name.to_lowercase() == normalized_column
827 {
828 return true;
829 }
830 }
831
832 false
833 }
834
835 fn normalize_index_name(name: &str) -> String {
841 name.to_lowercase()
842 }
843
844 pub fn create_spatial_index(
846 &mut self,
847 metadata: SpatialIndexMetadata,
848 spatial_index: SpatialIndex,
849 ) -> Result<(), StorageError> {
850 let normalized_name = Self::normalize_index_name(&metadata.index_name);
851
852 if self.index_manager.index_exists(&metadata.index_name) {
853 return Err(StorageError::IndexAlreadyExists(metadata.index_name.clone()));
854 }
855 if self.spatial_indexes.contains_key(&normalized_name) {
856 return Err(StorageError::IndexAlreadyExists(metadata.index_name.clone()));
857 }
858
859 self.spatial_indexes.insert(normalized_name, (metadata, spatial_index));
860 Ok(())
861 }
862
863 #[allow(clippy::too_many_arguments)]
868 pub fn create_ivfflat_index(
869 &mut self,
870 catalog: &vibesql_catalog::Catalog,
871 tables: &std::collections::HashMap<String, crate::Table>,
872 index_name: String,
873 table_name: String,
874 column_name: String,
875 col_idx: usize,
876 dimensions: usize,
877 lists: usize,
878 metric: vibesql_ast::VectorDistanceMetric,
879 ) -> Result<(), StorageError> {
880 let normalized_name = if catalog.is_case_sensitive_identifiers() {
882 table_name.clone()
883 } else {
884 table_name.to_lowercase()
885 };
886
887 let table = if let Some(tbl) = tables.get(&normalized_name) {
889 tbl
890 } else if !table_name.contains('.') {
891 let current_schema = catalog.get_current_schema();
893 let qualified_name = format!("{}.{}", current_schema, normalized_name);
894 tables
895 .get(&qualified_name)
896 .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?
897 } else {
898 return Err(StorageError::TableNotFound(table_name.clone()));
899 };
900
901 let rows = table.scan();
904 let total_rows = rows.len();
905 let mut vectors: Vec<(usize, Vec<f64>)> = Vec::new();
906 let mut progress = ProgressTracker::new(
907 format!("Creating IVFFlat index '{}'", index_name),
908 Some(total_rows),
909 );
910 for (row_idx, row) in rows.iter().enumerate() {
911 if col_idx < row.values.len() {
912 if let vibesql_types::SqlValue::Vector(vec_data) = &row.values[col_idx] {
913 let vec_f64: Vec<f64> = vec_data.iter().map(|&v| v as f64).collect();
915 vectors.push((row_idx, vec_f64));
916 }
917 }
918 progress.update(row_idx + 1);
919 }
920 progress.finish();
921
922 self.index_manager.create_ivfflat_index_with_vectors(
924 index_name,
925 table_name,
926 column_name,
927 dimensions,
928 lists,
929 metric,
930 vectors,
931 )
932 }
933
934 pub fn search_ivfflat_index(
945 &self,
946 index_name: &str,
947 query_vector: &[f64],
948 k: usize,
949 ) -> Result<Vec<(usize, f64)>, StorageError> {
950 self.index_manager.search_ivfflat_index(index_name, query_vector, k)
951 }
952
953 pub fn get_ivfflat_indexes_for_table(
955 &self,
956 table_name: &str,
957 ) -> Vec<(&super::indexes::IndexMetadata, &super::indexes::ivfflat::IVFFlatIndex)> {
958 self.index_manager.get_ivfflat_indexes_for_table(table_name)
959 }
960
961 pub fn set_ivfflat_probes(
963 &mut self,
964 index_name: &str,
965 probes: usize,
966 ) -> Result<(), StorageError> {
967 self.index_manager.set_ivfflat_probes(index_name, probes)
968 }
969
970 #[allow(clippy::too_many_arguments)]
979 pub fn create_hnsw_index(
980 &mut self,
981 catalog: &vibesql_catalog::Catalog,
982 tables: &std::collections::HashMap<String, crate::Table>,
983 index_name: String,
984 table_name: String,
985 column_name: String,
986 col_idx: usize,
987 dimensions: usize,
988 m: u32,
989 ef_construction: u32,
990 metric: vibesql_ast::VectorDistanceMetric,
991 ) -> Result<(), StorageError> {
992 let normalized_name = if catalog.is_case_sensitive_identifiers() {
994 table_name.clone()
995 } else {
996 table_name.to_lowercase()
997 };
998
999 let table = if let Some(tbl) = tables.get(&normalized_name) {
1001 tbl
1002 } else if !table_name.contains('.') {
1003 let current_schema = catalog.get_current_schema();
1005 let qualified_name = format!("{}.{}", current_schema, normalized_name);
1006 tables
1007 .get(&qualified_name)
1008 .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?
1009 } else {
1010 return Err(StorageError::TableNotFound(table_name.clone()));
1011 };
1012
1013 let rows = table.scan();
1016 let total_rows = rows.len();
1017 let mut vectors: Vec<(usize, Vec<f64>)> = Vec::new();
1018 let mut progress =
1019 ProgressTracker::new(format!("Creating HNSW index '{}'", index_name), Some(total_rows));
1020 for (row_idx, row) in rows.iter().enumerate() {
1021 if col_idx < row.values.len() {
1022 if let vibesql_types::SqlValue::Vector(vec_data) = &row.values[col_idx] {
1023 let vec_f64: Vec<f64> = vec_data.iter().map(|&v| v as f64).collect();
1025 vectors.push((row_idx, vec_f64));
1026 }
1027 }
1028 progress.update(row_idx + 1);
1029 }
1030 progress.finish();
1031
1032 self.index_manager.create_hnsw_index_with_vectors(
1034 index_name,
1035 table_name,
1036 column_name,
1037 dimensions,
1038 m,
1039 ef_construction,
1040 metric,
1041 vectors,
1042 )
1043 }
1044
1045 pub fn search_hnsw_index(
1056 &self,
1057 index_name: &str,
1058 query_vector: &[f64],
1059 k: usize,
1060 ) -> Result<Vec<(usize, f64)>, StorageError> {
1061 self.index_manager.search_hnsw_index(index_name, query_vector, k)
1062 }
1063
1064 pub fn get_hnsw_indexes_for_table(
1066 &self,
1067 table_name: &str,
1068 ) -> Vec<(&super::indexes::IndexMetadata, &super::indexes::hnsw::HnswIndex)> {
1069 self.index_manager.get_hnsw_indexes_for_table(table_name)
1070 }
1071
1072 pub fn set_hnsw_ef_search(
1074 &mut self,
1075 index_name: &str,
1076 ef_search: usize,
1077 ) -> Result<(), StorageError> {
1078 self.index_manager.set_hnsw_ef_search(index_name, ef_search)
1079 }
1080
1081 pub fn spatial_index_exists(&self, index_name: &str) -> bool {
1083 let normalized = Self::normalize_index_name(index_name);
1084 self.spatial_indexes.contains_key(&normalized)
1085 }
1086
1087 pub fn get_spatial_index_metadata(&self, index_name: &str) -> Option<&SpatialIndexMetadata> {
1089 let normalized = Self::normalize_index_name(index_name);
1090 self.spatial_indexes.get(&normalized).map(|(metadata, _)| metadata)
1091 }
1092
1093 pub fn get_spatial_index(&self, index_name: &str) -> Option<&SpatialIndex> {
1095 let normalized = Self::normalize_index_name(index_name);
1096 self.spatial_indexes.get(&normalized).map(|(_, index)| index)
1097 }
1098
1099 pub fn get_spatial_index_mut(&mut self, index_name: &str) -> Option<&mut SpatialIndex> {
1101 let normalized = Self::normalize_index_name(index_name);
1102 self.spatial_indexes.get_mut(&normalized).map(|(_, index)| index)
1103 }
1104
1105 pub fn get_spatial_indexes_for_table(
1107 &self,
1108 table_name: &str,
1109 ) -> Vec<(&SpatialIndexMetadata, &SpatialIndex)> {
1110 self.spatial_indexes
1111 .values()
1112 .filter(|(metadata, _)| metadata.table_name == table_name)
1113 .map(|(metadata, index)| (metadata, index))
1114 .collect()
1115 }
1116
1117 pub fn get_spatial_indexes_for_table_mut(
1119 &mut self,
1120 table_name: &str,
1121 ) -> Vec<(&SpatialIndexMetadata, &mut SpatialIndex)> {
1122 self.spatial_indexes
1123 .iter_mut()
1124 .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1125 .map(|(_, (metadata, index))| (metadata as &SpatialIndexMetadata, index))
1126 .collect()
1127 }
1128
1129 pub fn drop_spatial_index(&mut self, index_name: &str) -> Result<(), StorageError> {
1131 let normalized = Self::normalize_index_name(index_name);
1132
1133 if self.spatial_indexes.remove(&normalized).is_none() {
1134 return Err(StorageError::IndexNotFound(index_name.to_string()));
1135 }
1136
1137 Ok(())
1138 }
1139
1140 pub fn drop_spatial_indexes_for_table(&mut self, table_name: &str) -> Vec<String> {
1145 let search_name_lower = table_name.to_lowercase();
1147
1148 let search_table_only = search_name_lower.rsplit('.').next().unwrap_or(&search_name_lower);
1150
1151 let indexes_to_drop: Vec<String> = self
1152 .spatial_indexes
1153 .iter()
1154 .filter(|(_, (metadata, _))| {
1155 let stored_lower = metadata.table_name.to_lowercase();
1156 let stored_table_only = stored_lower.rsplit('.').next().unwrap_or(&stored_lower);
1157
1158 stored_lower == search_name_lower || stored_table_only == search_table_only
1160 })
1161 .map(|(name, _)| name.clone())
1162 .collect();
1163
1164 for index_name in &indexes_to_drop {
1165 self.spatial_indexes.remove(index_name);
1166 }
1167
1168 indexes_to_drop
1169 }
1170
1171 pub fn list_spatial_indexes(&self) -> Vec<String> {
1173 self.spatial_indexes.keys().cloned().collect()
1174 }
1175
1176 fn has_spatial_indexes_for_table(&self, table_name: &str) -> bool {
1181 self.spatial_indexes.values().any(|(metadata, _)| metadata.table_name == table_name)
1182 }
1183
1184 fn update_spatial_indexes_for_insert(
1186 &mut self,
1187 catalog: &vibesql_catalog::Catalog,
1188 table_name: &str,
1189 row: &Row,
1190 row_index: usize,
1191 ) {
1192 let table_schema = match catalog.get_table(table_name) {
1193 Some(schema) => schema,
1194 None => return,
1195 };
1196
1197 let indexes_to_update: Vec<(String, usize)> = self
1198 .spatial_indexes
1199 .iter()
1200 .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1201 .filter_map(|(index_name, (metadata, _))| {
1202 table_schema
1203 .get_column_index(&metadata.column_name)
1204 .map(|col_idx| (index_name.clone(), col_idx))
1205 })
1206 .collect();
1207
1208 for (index_name, col_idx) in indexes_to_update {
1209 let geom_value = &row.values[col_idx];
1210
1211 if let Some(mbr) = extract_mbr_from_sql_value(geom_value) {
1212 if let Some((_, index)) = self.spatial_indexes.get_mut(&index_name) {
1213 index.insert(row_index, mbr);
1214 }
1215 }
1216 }
1217 }
1218
1219 fn batch_update_spatial_indexes_for_insert(
1229 &mut self,
1230 catalog: &vibesql_catalog::Catalog,
1231 table_name: &str,
1232 rows_to_insert: &[(usize, &Row)],
1233 ) {
1234 if rows_to_insert.is_empty() {
1235 return;
1236 }
1237
1238 let table_schema = match catalog.get_table(table_name) {
1239 Some(schema) => schema,
1240 None => return,
1241 };
1242
1243 let indexes_to_update: Vec<(String, usize)> = self
1245 .spatial_indexes
1246 .iter()
1247 .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1248 .filter_map(|(index_name, (metadata, _))| {
1249 table_schema
1250 .get_column_index(&metadata.column_name)
1251 .map(|col_idx| (index_name.clone(), col_idx))
1252 })
1253 .collect();
1254
1255 for (index_name, col_idx) in indexes_to_update {
1257 if let Some((_, index)) = self.spatial_indexes.get_mut(&index_name) {
1258 for &(row_index, row) in rows_to_insert {
1259 let geom_value = &row.values[col_idx];
1260 if let Some(mbr) = extract_mbr_from_sql_value(geom_value) {
1261 index.insert(row_index, mbr);
1262 }
1263 }
1264 }
1265 }
1266 }
1267
1268 fn update_spatial_indexes_for_update(
1270 &mut self,
1271 catalog: &vibesql_catalog::Catalog,
1272 table_name: &str,
1273 old_row: &Row,
1274 new_row: &Row,
1275 row_index: usize,
1276 ) {
1277 let table_schema = match catalog.get_table(table_name) {
1278 Some(schema) => schema,
1279 None => return,
1280 };
1281
1282 let indexes_to_update: Vec<(String, usize)> = self
1283 .spatial_indexes
1284 .iter()
1285 .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1286 .filter_map(|(index_name, (metadata, _))| {
1287 table_schema
1288 .get_column_index(&metadata.column_name)
1289 .map(|col_idx| (index_name.clone(), col_idx))
1290 })
1291 .collect();
1292
1293 for (index_name, col_idx) in indexes_to_update {
1294 let old_geom = &old_row.values[col_idx];
1295 let new_geom = &new_row.values[col_idx];
1296
1297 if old_geom != new_geom {
1298 if let Some((_, index)) = self.spatial_indexes.get_mut(&index_name) {
1299 if let Some(old_mbr) = extract_mbr_from_sql_value(old_geom) {
1300 index.remove(row_index, &old_mbr);
1301 }
1302
1303 if let Some(new_mbr) = extract_mbr_from_sql_value(new_geom) {
1304 index.insert(row_index, new_mbr);
1305 }
1306 }
1307 }
1308 }
1309 }
1310
1311 fn update_spatial_indexes_for_delete_with_values(
1312 &mut self,
1313 catalog: &vibesql_catalog::Catalog,
1314 table_name: &str,
1315 values: &[vibesql_types::SqlValue],
1316 row_index: usize,
1317 ) {
1318 let table_schema = match catalog.get_table(table_name) {
1319 Some(schema) => schema,
1320 None => return,
1321 };
1322
1323 let indexes_to_update: Vec<(String, usize)> = self
1324 .spatial_indexes
1325 .iter()
1326 .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1327 .filter_map(|(index_name, (metadata, _))| {
1328 table_schema
1329 .get_column_index(&metadata.column_name)
1330 .map(|col_idx| (index_name.clone(), col_idx))
1331 })
1332 .collect();
1333
1334 for (index_name, col_idx) in indexes_to_update {
1335 let geom_value = &values[col_idx];
1336
1337 if let Some(mbr) = extract_mbr_from_sql_value(geom_value) {
1338 if let Some((_, index)) = self.spatial_indexes.get_mut(&index_name) {
1339 index.remove(row_index, &mbr);
1340 }
1341 }
1342 }
1343 }
1344
1345 fn batch_update_spatial_indexes_for_delete(
1351 &mut self,
1352 catalog: &vibesql_catalog::Catalog,
1353 table_name: &str,
1354 rows_to_delete: &[(usize, &Row)],
1355 ) {
1356 if rows_to_delete.is_empty() {
1357 return;
1358 }
1359
1360 let table_schema = match catalog.get_table(table_name) {
1361 Some(schema) => schema,
1362 None => return,
1363 };
1364
1365 let indexes_to_update: Vec<(String, usize)> = self
1367 .spatial_indexes
1368 .iter()
1369 .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1370 .filter_map(|(index_name, (metadata, _))| {
1371 table_schema
1372 .get_column_index(&metadata.column_name)
1373 .map(|col_idx| (index_name.clone(), col_idx))
1374 })
1375 .collect();
1376
1377 if indexes_to_update.is_empty() {
1378 return;
1379 }
1380
1381 for (index_name, col_idx) in indexes_to_update {
1383 if let Some((_, index)) = self.spatial_indexes.get_mut(&index_name) {
1384 for &(row_index, row) in rows_to_delete {
1385 let geom_value = &row.values[col_idx];
1386 if let Some(mbr) = extract_mbr_from_sql_value(geom_value) {
1387 index.remove(row_index, &mbr);
1388 }
1389 }
1390 }
1391 }
1392 }
1393
1394 pub fn reset(&mut self) {
1399 self.index_manager.reset();
1401
1402 self.spatial_indexes.clear();
1404 }
1405}
1406
1407impl Default for Operations {
1408 fn default() -> Self {
1409 Self::new()
1410 }
1411}