vibesql_storage/database/
operations.rs

1// ============================================================================
2// Table and Index Operations
3// ============================================================================
4
5use std::collections::HashMap;
6
7use vibesql_ast::IndexColumn;
8
9use super::indexes::IndexManager;
10use crate::{
11    index::{extract_mbr_from_sql_value, SpatialIndex},
12    progress::ProgressTracker,
13    Row, StorageError, Table,
14};
15
16/// Metadata for a spatial index
17#[derive(Debug, Clone)]
18pub struct SpatialIndexMetadata {
19    pub index_name: String,
20    pub table_name: String,
21    pub column_name: String,
22    pub created_at: Option<chrono::DateTime<chrono::Utc>>,
23}
24
25/// Manages table and index operations
26#[derive(Debug, Clone)]
27pub struct Operations {
28    /// User-defined index manager (B-tree indexes)
29    index_manager: IndexManager,
30    /// Spatial indexes (R-tree) - stored separately from B-tree indexes
31    /// Key: normalized index name (uppercase)
32    /// Value: (metadata, spatial index)
33    spatial_indexes: HashMap<String, (SpatialIndexMetadata, SpatialIndex)>,
34}
35
36impl Operations {
37    /// Create a new operations manager
38    pub fn new() -> Self {
39        Operations { index_manager: IndexManager::new(), spatial_indexes: HashMap::new() }
40    }
41
42    /// Set the database path for index storage
43    pub fn set_database_path(&mut self, path: std::path::PathBuf) {
44        self.index_manager.set_database_path(path);
45    }
46
47    /// Set the database configuration (memory budgets, spill policy)
48    pub fn set_config(&mut self, config: super::DatabaseConfig) {
49        self.index_manager.set_config(config);
50    }
51
52    /// Initialize OPFS storage asynchronously (WASM only)
53    ///
54    /// This replaces the temporary in-memory storage with persistent OPFS storage.
55    /// Must be called from an async context.
56    #[cfg(target_arch = "wasm32")]
57    pub async fn init_opfs_async(&mut self) -> Result<(), crate::StorageError> {
58        self.index_manager.init_opfs_async().await
59    }
60
61    // ============================================================================
62    // Table Operations
63    // ============================================================================
64
65    /// Create a table in the catalog and storage
66    pub fn create_table(
67        &mut self,
68        catalog: &mut vibesql_catalog::Catalog,
69        schema: vibesql_catalog::TableSchema,
70    ) -> Result<(), StorageError> {
71        let _table_name = schema.name.clone();
72
73        // Add to catalog
74        catalog
75            .create_table(schema.clone())
76            .map_err(|e| StorageError::CatalogError(e.to_string()))?;
77
78        Ok(())
79    }
80
81    /// Find a table by name with fallback lookups for quoted identifiers.
82    ///
83    /// This tries multiple lookup strategies to handle both quoted and unquoted identifiers:
84    /// 1. Resolve "temp" schema to session's temp schema (SQLite compatibility)
85    /// 2. Direct lookup as-is (for quoted identifiers that preserve case)
86    /// 3. Normalized (lowercase) lookup
87    /// 4. Temp schema lookup (SQLite semantics - temp tables shadow main tables)
88    /// 5. Schema-qualified with original case
89    /// 6. Schema-qualified with normalized case
90    fn find_table_mut<'a>(
91        catalog: &vibesql_catalog::Catalog,
92        tables: &'a mut HashMap<String, Table>,
93        table_name: &str,
94    ) -> Result<&'a mut Table, StorageError> {
95        // For qualified names with "temp" schema, resolve to session's temp schema
96        // This enables `INSERT INTO temp.t1 VALUES(...)` syntax
97        let resolved_name = if let Some((schema_part, table_part)) = table_name.split_once('.') {
98            if schema_part.eq_ignore_ascii_case(vibesql_catalog::TEMP_SCHEMA) {
99                Some(format!("{}.{}", catalog.temp_schema_name(), table_part))
100            } else {
101                None
102            }
103        } else {
104            None
105        };
106        let table_name = resolved_name.as_deref().unwrap_or(table_name);
107
108        // Try 1: Direct lookup as-is (handles quoted identifiers correctly)
109        if tables.contains_key(table_name) {
110            return Ok(tables.get_mut(table_name).unwrap());
111        }
112
113        let normalized_name = if catalog.is_case_sensitive_identifiers() {
114            table_name.to_string()
115        } else {
116            table_name.to_lowercase()
117        };
118
119        // Try 2: Normalized name (for unquoted identifiers)
120        if normalized_name != table_name && tables.contains_key(&normalized_name) {
121            return Ok(tables.get_mut(&normalized_name).unwrap());
122        }
123
124        // Try with schema prefix if not already qualified
125        if !table_name.contains('.') {
126            // Try 3: Session's temp schema first (SQLite semantics - temp tables shadow main tables)
127            let temp_qualified = format!("{}.{}", catalog.temp_schema_name(), normalized_name);
128            if tables.contains_key(&temp_qualified) {
129                return Ok(tables.get_mut(&temp_qualified).unwrap());
130            }
131
132            let current_schema = catalog.get_current_schema();
133
134            // Try 4: Schema-qualified with original case (for quoted identifiers)
135            let qualified_original = format!("{}.{}", current_schema, table_name);
136            if tables.contains_key(&qualified_original) {
137                return Ok(tables.get_mut(&qualified_original).unwrap());
138            }
139
140            // Try 5: Schema-qualified with normalized case
141            if normalized_name != table_name {
142                let qualified_normalized = format!("{}.{}", current_schema, normalized_name);
143                if tables.contains_key(&qualified_normalized) {
144                    return Ok(tables.get_mut(&qualified_normalized).unwrap());
145                }
146            }
147        }
148
149        Err(StorageError::TableNotFound(table_name.to_string()))
150    }
151
152    /// Drop a table from the catalog
153    ///
154    /// SQLite Compatibility: The "temp" schema name is mapped to the session's
155    /// temp schema, allowing `DROP TABLE temp.tablename` syntax.
156    pub fn drop_table(
157        &mut self,
158        catalog: &mut vibesql_catalog::Catalog,
159        tables: &mut HashMap<String, Table>,
160        name: &str,
161    ) -> Result<(), StorageError> {
162        // Normalize table name for lookup (matches catalog normalization)
163        let normalized_name = if catalog.is_case_sensitive_identifiers() {
164            name.to_string()
165        } else {
166            name.to_lowercase()
167        };
168
169        // Resolve "temp" schema to session's temp schema for storage lookup
170        let resolved_name = if let Some((schema_part, table_part)) = normalized_name.split_once('.') {
171            if schema_part.eq_ignore_ascii_case(vibesql_catalog::TEMP_SCHEMA) {
172                format!("{}.{}", catalog.temp_schema_name(), table_part)
173            } else {
174                normalized_name.clone()
175            }
176        } else {
177            normalized_name.clone()
178        };
179
180        // Get qualified table name for index cleanup
181        let qualified_name = if resolved_name.contains('.') {
182            resolved_name.clone()
183        } else {
184            let current_schema = catalog.get_current_schema();
185            format!("{}.{}", current_schema, resolved_name)
186        };
187
188        // Drop associated indexes BEFORE dropping table (CASCADE behavior)
189        self.index_manager.drop_indexes_for_table(&qualified_name);
190
191        // Drop associated spatial indexes too
192        self.drop_spatial_indexes_for_table(&qualified_name);
193
194        // Remove from catalog
195        catalog.drop_table(name).map_err(|e| StorageError::CatalogError(e.to_string()))?;
196
197        // Remove table data - try resolved name first, then try qualified name
198        if tables.remove(&resolved_name).is_none() {
199            tables.remove(&qualified_name);
200        }
201
202        Ok(())
203    }
204
205    /// Insert a row into a table
206    pub fn insert_row(
207        &mut self,
208        catalog: &vibesql_catalog::Catalog,
209        tables: &mut HashMap<String, Table>,
210        table_name: &str,
211        row: Row,
212    ) -> Result<usize, StorageError> {
213        // Use the helper function for proper table lookup with fallbacks for quoted identifiers
214        let table = Self::find_table_mut(catalog, tables, table_name)?;
215
216        let row_index = table.row_count();
217
218        // Check user-defined unique indexes BEFORE inserting
219        if let Some(table_schema) = catalog.get_table(table_name) {
220            self.index_manager.check_unique_constraints_for_insert(
221                table_name,
222                table_schema,
223                &row,
224            )?;
225        }
226
227        // Insert the row (this validates table-level constraints like PK, UNIQUE)
228        table.insert(row.clone())?;
229
230        // Update user-defined indexes
231        if let Some(table_schema) = catalog.get_table(table_name) {
232            self.index_manager.add_to_indexes_for_insert(table_name, table_schema, &row, row_index);
233        }
234
235        // Update spatial indexes
236        self.update_spatial_indexes_for_insert(catalog, table_name, &row, row_index);
237
238        Ok(row_index)
239    }
240
241    /// Insert multiple rows into a table in a single batch
242    ///
243    /// This method is optimized for bulk data loading. It uses `Table::insert_batch()`
244    /// internally which provides significant performance improvements:
245    ///
246    /// - Pre-allocates vector capacity
247    /// - Validates all rows before inserting any
248    /// - Rebuilds indexes once after all inserts (vs per-row updates)
249    /// - Invalidates caches only once at the end
250    ///
251    /// # Returns
252    ///
253    /// Row indices of all inserted rows (starting from the first new row)
254    pub fn insert_rows_batch(
255        &mut self,
256        catalog: &vibesql_catalog::Catalog,
257        tables: &mut HashMap<String, Table>,
258        table_name: &str,
259        rows: Vec<Row>,
260    ) -> Result<Vec<usize>, StorageError> {
261        if rows.is_empty() {
262            return Ok(Vec::new());
263        }
264
265        // Use the helper function for proper table lookup with fallbacks for quoted identifiers
266        let table = Self::find_table_mut(catalog, tables, table_name)?;
267
268        // Get table schema once for all rows
269        let table_schema = catalog.get_table(table_name);
270
271        // Check user-defined unique indexes BEFORE inserting any rows
272        // This is separate from the table-level constraint checks in Table::insert_batch
273        if let Some(schema) = table_schema {
274            for row in &rows {
275                self.index_manager.check_unique_constraints_for_insert(table_name, schema, row)?;
276            }
277        }
278
279        // Record start index for return value
280        let start_index = table.row_count();
281
282        // Check if we have any user-defined or spatial indexes for this table
283        // Only clone rows if we actually need them for index updates
284        let has_btree_indexes = self.index_manager.has_indexes_for_table(table_name);
285        let has_spatial_indexes = self.has_spatial_indexes_for_table(table_name);
286        let needs_index_updates = has_btree_indexes || has_spatial_indexes;
287
288        // Conditionally clone rows only if index updates are needed
289        // This avoids expensive cloning during bulk data loading when no indexes exist
290        let rows_for_indexes = if needs_index_updates { Some(rows.clone()) } else { None };
291
292        // Use optimized batch insert
293        let count = table.insert_batch(rows)?;
294
295        // Generate row indices for return
296        let row_indices: Vec<usize> = (start_index..start_index + count).collect();
297
298        // Update user-defined indexes for all inserted rows using batch optimization
299        // This pre-computes column indices once per index rather than once per row
300        if let Some(rows_ref) = rows_for_indexes {
301            let rows_to_insert: Vec<(usize, &Row)> =
302                rows_ref.iter().enumerate().map(|(i, row)| (start_index + i, row)).collect();
303            self.batch_add_to_indexes_for_insert(catalog, table_name, &rows_to_insert);
304        }
305
306        Ok(row_indices)
307    }
308
309    /// Insert rows from an iterator in a streaming fashion
310    ///
311    /// This method processes rows in batches for memory efficiency when loading
312    /// very large datasets. Rows are committed batch-by-batch.
313    ///
314    /// # Arguments
315    ///
316    /// * `catalog` - The database catalog
317    /// * `tables` - Map of table names to tables
318    /// * `table_name` - Name of the table to insert into
319    /// * `rows` - Iterator yielding rows to insert
320    /// * `batch_size` - Number of rows per batch (default: 1000)
321    ///
322    /// # Returns
323    ///
324    /// Total number of rows successfully inserted
325    ///
326    /// # Note
327    ///
328    /// Unlike `insert_rows_batch`, this method commits in batches, so a failure
329    /// partway through will leave previously committed rows in the table.
330    #[allow(dead_code)] // Available for internal use; public API is via Database::insert_rows_iter
331    pub fn insert_rows_iter<I>(
332        &mut self,
333        catalog: &vibesql_catalog::Catalog,
334        tables: &mut HashMap<String, Table>,
335        table_name: &str,
336        rows: I,
337        batch_size: usize,
338    ) -> Result<usize, StorageError>
339    where
340        I: Iterator<Item = Row>,
341    {
342        let batch_size = if batch_size == 0 { 1000 } else { batch_size };
343        let mut total_inserted = 0;
344        let mut batch = Vec::with_capacity(batch_size);
345
346        for row in rows {
347            batch.push(row);
348
349            if batch.len() >= batch_size {
350                let indices = self.insert_rows_batch(
351                    catalog,
352                    tables,
353                    table_name,
354                    std::mem::take(&mut batch),
355                )?;
356                total_inserted += indices.len();
357                batch = Vec::with_capacity(batch_size);
358            }
359        }
360
361        // Insert any remaining rows
362        if !batch.is_empty() {
363            let indices = self.insert_rows_batch(catalog, tables, table_name, batch)?;
364            total_inserted += indices.len();
365        }
366
367        Ok(total_inserted)
368    }
369
370    // ============================================================================
371    // Index Management - Delegates to IndexManager
372    // ============================================================================
373
374    /// Validate prefix lengths for indexed columns
375    ///
376    /// Checks:
377    /// 1. Prefix lengths are only used on string/binary types
378    /// 2. Prefix lengths don't exceed column width (for fixed-width types)
379    fn validate_prefix_lengths(
380        table_schema: &vibesql_catalog::TableSchema,
381        columns: &[IndexColumn],
382    ) -> Result<(), StorageError> {
383        use vibesql_types::DataType;
384
385        for index_col in columns {
386            if let Some(prefix_length) = index_col.prefix_length() {
387                // Find the column in the table schema
388                let column_schema = table_schema
389                    .columns
390                    .iter()
391                    .find(|col| col.name == index_col.expect_column_name())
392                    .ok_or_else(|| StorageError::ColumnNotFound {
393                        column_name: index_col.expect_column_name().to_string(),
394                        table_name: table_schema.name.clone(),
395                    })?;
396
397                // Check if the column type supports prefix indexing
398                match &column_schema.data_type {
399                    // String types that support prefix indexing
400                    DataType::Character { length } => {
401                        // Check if prefix exceeds column width
402                        if prefix_length as usize > *length {
403                            eprintln!(
404                                "Warning: Key part '{}' prefix length ({}) exceeds column width ({})",
405                                index_col.expect_column_name(), prefix_length, length
406                            );
407                        }
408                    }
409                    DataType::Varchar { max_length } => {
410                        // Check if prefix exceeds column width (if specified)
411                        if let Some(max_len) = max_length {
412                            if prefix_length as usize > *max_len {
413                                eprintln!(
414                                    "Warning: Key part '{}' prefix length ({}) exceeds column width ({})",
415                                    index_col.expect_column_name(), prefix_length, max_len
416                                );
417                            }
418                        }
419                    }
420                    DataType::CharacterLargeObject | DataType::Name => {
421                        // CLOB/TEXT and NAME types support prefix indexing without width check
422                    }
423                    DataType::BinaryLargeObject => {
424                        // BLOB supports prefix indexing
425                    }
426                    // All other types do not support prefix indexing
427                    _ => {
428                        return Err(StorageError::InvalidIndexColumn(format!(
429                            "Incorrect prefix key; the used key part '{}' isn't a string or binary type (type: {:?})",
430                            index_col.expect_column_name(), column_schema.data_type
431                        )));
432                    }
433                }
434            }
435        }
436
437        Ok(())
438    }
439
440    /// Create an index
441    pub fn create_index(
442        &mut self,
443        catalog: &vibesql_catalog::Catalog,
444        tables: &HashMap<String, Table>,
445        index_name: String,
446        table_name: String,
447        unique: bool,
448        columns: Vec<IndexColumn>,
449    ) -> Result<(), StorageError> {
450        // Normalize table name for lookup (matches catalog normalization)
451        let normalized_name = if catalog.is_case_sensitive_identifiers() {
452            table_name.clone()
453        } else {
454            table_name.to_lowercase()
455        };
456
457        // Try to find the table with normalized name or qualified name
458        let table = if let Some(tbl) = tables.get(&normalized_name) {
459            tbl
460        } else if !table_name.contains('.') {
461            // Try with schema prefix
462            let current_schema = catalog.get_current_schema();
463            let qualified_name = format!("{}.{}", current_schema, normalized_name);
464            tables
465                .get(&qualified_name)
466                .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?
467        } else {
468            return Err(StorageError::TableNotFound(table_name.clone()));
469        };
470
471        let table_schema = catalog
472            .get_table(&table_name)
473            .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
474
475        // Validate prefix lengths against column types and widths
476        Self::validate_prefix_lengths(table_schema, &columns)?;
477
478        // Pass table rows directly by reference - avoid cloning all rows
479        // This is critical for performance at scale (O(n) clone was causing major slowdown)
480        self.index_manager.create_index(
481            index_name,
482            table_name,
483            table_schema,
484            table.scan(),
485            unique,
486            columns,
487        )
488    }
489
490    /// Create an index with pre-computed keys (for expression indexes)
491    ///
492    /// This method is used when the caller has already evaluated the expressions
493    /// and computed the key values for each row. This is necessary for expression
494    /// indexes where the key values are derived from evaluating expressions on rows.
495    pub fn create_index_with_keys(
496        &mut self,
497        catalog: &vibesql_catalog::Catalog,
498        index_name: String,
499        table_name: String,
500        unique: bool,
501        columns: Vec<vibesql_ast::IndexColumn>,
502        keys: Vec<(Vec<vibesql_types::SqlValue>, usize)>,
503    ) -> Result<(), StorageError> {
504        // Get the table schema for key type inference
505        let table_schema = catalog
506            .get_table(&table_name)
507            .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
508
509        self.index_manager.create_index_with_keys(
510            index_name,
511            table_name,
512            table_schema,
513            unique,
514            columns,
515            keys,
516        )
517    }
518
519    /// Check if an index exists
520    pub fn index_exists(&self, index_name: &str) -> bool {
521        self.index_manager.index_exists(index_name)
522    }
523
524    /// Get index metadata
525    pub fn get_index(&self, index_name: &str) -> Option<&super::indexes::IndexMetadata> {
526        self.index_manager.get_index(index_name)
527    }
528
529    /// Get index data
530    pub fn get_index_data(&self, index_name: &str) -> Option<&super::indexes::IndexData> {
531        self.index_manager.get_index_data(index_name)
532    }
533
534    /// Update user-defined indexes for update operation
535    ///
536    /// # Arguments
537    /// * `catalog` - Database catalog for schema lookup
538    /// * `table_name` - Name of the table being updated
539    /// * `old_row` - Row data before the update
540    /// * `new_row` - Row data after the update
541    /// * `row_index` - Index of the row in the table
542    /// * `changed_columns` - Optional set of column indices that were modified. If provided,
543    ///   indexes that don't involve any changed columns will be skipped.
544    pub fn update_indexes_for_update(
545        &mut self,
546        catalog: &vibesql_catalog::Catalog,
547        table_name: &str,
548        old_row: &Row,
549        new_row: &Row,
550        row_index: usize,
551        changed_columns: Option<&std::collections::HashSet<usize>>,
552    ) {
553        if let Some(table_schema) = catalog.get_table(table_name) {
554            self.index_manager.update_indexes_for_update(
555                table_name,
556                table_schema,
557                old_row,
558                new_row,
559                row_index,
560                changed_columns,
561            );
562        }
563
564        self.update_spatial_indexes_for_update(catalog, table_name, old_row, new_row, row_index);
565    }
566
567    /// Update user-defined indexes for delete operation
568    pub fn update_indexes_for_delete(
569        &mut self,
570        catalog: &vibesql_catalog::Catalog,
571        table_name: &str,
572        row: &Row,
573        row_index: usize,
574    ) {
575        self.update_indexes_for_delete_with_values(catalog, table_name, &row.values, row_index);
576    }
577
578    /// Update user-defined indexes for delete operation using raw values slice
579    ///
580    /// This is an optimization over `update_indexes_for_delete` that avoids requiring
581    /// a full Row struct. Useful in the fast delete path where we already have a values
582    /// slice and want to avoid wrapping overhead.
583    pub fn update_indexes_for_delete_with_values(
584        &mut self,
585        catalog: &vibesql_catalog::Catalog,
586        table_name: &str,
587        values: &[vibesql_types::SqlValue],
588        row_index: usize,
589    ) {
590        if let Some(table_schema) = catalog.get_table(table_name) {
591            self.index_manager.update_indexes_for_delete_with_values(
592                table_name,
593                table_schema,
594                values,
595                row_index,
596            );
597        }
598
599        self.update_spatial_indexes_for_delete_with_values(catalog, table_name, values, row_index);
600    }
601
602    /// Batch update user-defined indexes for delete operation
603    ///
604    /// This is significantly more efficient than calling `update_indexes_for_delete` in a loop
605    /// because it pre-computes column indices once per index rather than once per row.
606    pub fn batch_update_indexes_for_delete(
607        &mut self,
608        catalog: &vibesql_catalog::Catalog,
609        table_name: &str,
610        rows_to_delete: &[(usize, &Row)],
611    ) {
612        if let Some(table_schema) = catalog.get_table(table_name) {
613            self.index_manager.batch_update_indexes_for_delete(
614                table_name,
615                table_schema,
616                rows_to_delete,
617            );
618        }
619
620        // Batch update spatial indexes (pre-computes column indices once per index)
621        self.batch_update_spatial_indexes_for_delete(catalog, table_name, rows_to_delete);
622    }
623
624    /// Batch add to user-defined indexes for insert operation
625    ///
626    /// This is significantly more efficient than calling `add_to_indexes_for_insert` in a loop
627    /// because it pre-computes column indices once per index rather than once per row.
628    pub fn batch_add_to_indexes_for_insert(
629        &mut self,
630        catalog: &vibesql_catalog::Catalog,
631        table_name: &str,
632        rows_to_insert: &[(usize, &Row)],
633    ) {
634        if let Some(table_schema) = catalog.get_table(table_name) {
635            self.index_manager.batch_add_to_indexes_for_insert(
636                table_name,
637                table_schema,
638                rows_to_insert,
639            );
640        }
641
642        // Update spatial indexes in batch
643        self.batch_update_spatial_indexes_for_insert(catalog, table_name, rows_to_insert);
644    }
645
646    // ============================================================================
647    // Expression Index Methods
648    // ============================================================================
649
650    /// Add row to expression indexes after insert with pre-computed keys
651    ///
652    /// This method handles expression indexes which require pre-computed key values
653    /// since the storage layer cannot evaluate expressions.
654    pub fn add_to_expression_indexes_for_insert(
655        &mut self,
656        table_name: &str,
657        row_index: usize,
658        expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
659    ) {
660        self.index_manager.add_to_expression_indexes_for_insert(
661            table_name,
662            row_index,
663            expression_keys,
664        );
665    }
666
667    /// Update expression indexes for update operation with pre-computed keys
668    pub fn update_expression_indexes_for_update(
669        &mut self,
670        table_name: &str,
671        row_index: usize,
672        old_expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
673        new_expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
674    ) {
675        self.index_manager.update_expression_indexes_for_update(
676            table_name,
677            row_index,
678            old_expression_keys,
679            new_expression_keys,
680        );
681    }
682
683    /// Update expression indexes for delete operation with pre-computed keys
684    pub fn update_expression_indexes_for_delete(
685        &mut self,
686        table_name: &str,
687        row_index: usize,
688        expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
689    ) {
690        self.index_manager.update_expression_indexes_for_delete(
691            table_name,
692            row_index,
693            expression_keys,
694        );
695    }
696
697    /// Get expression indexes for a specific table
698    ///
699    /// Returns metadata for all expression indexes on the table. Used by executor
700    /// to determine which indexes need expression evaluation during DML operations.
701    pub fn get_expression_indexes_for_table(
702        &self,
703        table_name: &str,
704    ) -> Vec<(String, &super::indexes::IndexMetadata)> {
705        self.index_manager.get_expression_indexes_for_table(table_name)
706    }
707
708    /// Check if a table has any expression indexes
709    pub fn has_expression_indexes(&self, table_name: &str) -> bool {
710        self.index_manager.has_expression_indexes(table_name)
711    }
712
713    /// Clear expression index data for a table (for rebuilding after compaction)
714    pub fn clear_expression_index_data(&mut self, table_name: &str) {
715        self.index_manager.clear_expression_index_data(table_name);
716    }
717
718    /// Rebuild user-defined indexes after bulk operations that change row indices
719    pub fn rebuild_indexes(
720        &mut self,
721        catalog: &vibesql_catalog::Catalog,
722        tables: &HashMap<String, Table>,
723        table_name: &str,
724    ) {
725        // Normalize table name for lookup (matches catalog normalization)
726        let normalized_name = if catalog.is_case_sensitive_identifiers() {
727            table_name.to_string()
728        } else {
729            table_name.to_lowercase()
730        };
731
732        // First try direct lookup, then try with schema prefix if needed
733        let table_rows: Vec<Row> = if let Some(table) = tables.get(&normalized_name) {
734            table.scan().to_vec()
735        } else if !table_name.contains('.') {
736            // Try with schema prefix
737            let current_schema = catalog.get_current_schema();
738            let qualified_name = format!("{}.{}", current_schema, normalized_name);
739            if let Some(table) = tables.get(&qualified_name) {
740                table.scan().to_vec()
741            } else {
742                return;
743            }
744        } else {
745            return;
746        };
747
748        let table_schema = match catalog.get_table(table_name) {
749            Some(schema) => schema,
750            None => return,
751        };
752
753        self.index_manager.rebuild_indexes(table_name, table_schema, &table_rows);
754    }
755
756    /// Adjust user-defined indexes after row deletions
757    ///
758    /// This is more efficient than rebuild_indexes when only a few rows are deleted,
759    /// as it adjusts row indices in place rather than rebuilding from scratch.
760    ///
761    /// # Arguments
762    /// * `table_name` - Name of the table whose indexes need adjustment
763    /// * `deleted_indices` - Sorted list of deleted row indices (ascending order)
764    pub fn adjust_indexes_after_delete(&mut self, table_name: &str, deleted_indices: &[usize]) {
765        self.index_manager.adjust_indexes_after_delete(table_name, deleted_indices);
766    }
767
768    /// Drop an index
769    pub fn drop_index(&mut self, index_name: &str) -> Result<(), StorageError> {
770        self.index_manager.drop_index(index_name)
771    }
772
773    /// List all indexes
774    pub fn list_indexes(&self) -> Vec<String> {
775        self.index_manager.list_indexes()
776    }
777
778    /// List all indexes for a specific table
779    pub fn list_indexes_for_table(&self, table_name: &str) -> Vec<String> {
780        // Normalize for case-insensitive comparison
781        let normalized_search = table_name.to_lowercase();
782
783        self.index_manager
784            .list_indexes()
785            .into_iter()
786            .filter(|index_name| {
787                self.index_manager
788                    .get_index(index_name)
789                    .map(|metadata| {
790                        // Normalize both sides for comparison
791                        metadata.table_name.to_lowercase() == normalized_search
792                    })
793                    .unwrap_or(false)
794            })
795            .collect()
796    }
797
798    /// Check if a column has any user-defined index (B-tree or spatial)
799    /// Note: Expression indexes are NOT checked here - they don't have named columns
800    #[inline]
801    pub fn has_index_on_column(&self, table_name: &str, column_name: &str) -> bool {
802        let normalized_table = table_name.to_lowercase();
803        let normalized_column = column_name.to_lowercase();
804
805        // Check B-tree indexes
806        for index_name in self.index_manager.list_indexes() {
807            if let Some(metadata) = self.index_manager.get_index(&index_name) {
808                if metadata.table_name.to_lowercase() == normalized_table {
809                    for col in &metadata.columns {
810                        // Use column_name() instead of expect_column_name() to handle
811                        // expression indexes gracefully - they return None for column_name
812                        if let Some(col_name) = col.column_name() {
813                            if col_name.to_lowercase() == normalized_column {
814                                return true;
815                            }
816                        }
817                        // Skip expression indexes - they don't have named columns
818                    }
819                }
820            }
821        }
822
823        // Check spatial indexes
824        for (metadata, _) in self.spatial_indexes.values() {
825            if metadata.table_name.to_lowercase() == normalized_table
826                && metadata.column_name.to_lowercase() == normalized_column
827            {
828                return true;
829            }
830        }
831
832        false
833    }
834
835    // ========================================================================
836    // Spatial Index Methods
837    // ========================================================================
838
839    /// Normalize an index name to lowercase for case-insensitive comparison
840    fn normalize_index_name(name: &str) -> String {
841        name.to_lowercase()
842    }
843
844    /// Create a spatial index
845    pub fn create_spatial_index(
846        &mut self,
847        metadata: SpatialIndexMetadata,
848        spatial_index: SpatialIndex,
849    ) -> Result<(), StorageError> {
850        let normalized_name = Self::normalize_index_name(&metadata.index_name);
851
852        if self.index_manager.index_exists(&metadata.index_name) {
853            return Err(StorageError::IndexAlreadyExists(metadata.index_name.clone()));
854        }
855        if self.spatial_indexes.contains_key(&normalized_name) {
856            return Err(StorageError::IndexAlreadyExists(metadata.index_name.clone()));
857        }
858
859        self.spatial_indexes.insert(normalized_name, (metadata, spatial_index));
860        Ok(())
861    }
862
863    /// Create an IVFFlat index for approximate nearest neighbor search
864    ///
865    /// Extracts vectors from the specified table and builds an IVFFlat index
866    /// using k-means clustering.
867    #[allow(clippy::too_many_arguments)]
868    pub fn create_ivfflat_index(
869        &mut self,
870        catalog: &vibesql_catalog::Catalog,
871        tables: &std::collections::HashMap<String, crate::Table>,
872        index_name: String,
873        table_name: String,
874        column_name: String,
875        col_idx: usize,
876        dimensions: usize,
877        lists: usize,
878        metric: vibesql_ast::VectorDistanceMetric,
879    ) -> Result<(), StorageError> {
880        // Normalize table name for lookup (matches catalog normalization)
881        let normalized_name = if catalog.is_case_sensitive_identifiers() {
882            table_name.clone()
883        } else {
884            table_name.to_lowercase()
885        };
886
887        // Try to find the table with normalized name or qualified name
888        let table = if let Some(tbl) = tables.get(&normalized_name) {
889            tbl
890        } else if !table_name.contains('.') {
891            // Try with schema prefix
892            let current_schema = catalog.get_current_schema();
893            let qualified_name = format!("{}.{}", current_schema, normalized_name);
894            tables
895                .get(&qualified_name)
896                .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?
897        } else {
898            return Err(StorageError::TableNotFound(table_name.clone()));
899        };
900
901        // Extract vectors from the table
902        // Note: SqlValue::Vector stores f32, but IVFFlat uses f64 for precision in clustering
903        let rows = table.scan();
904        let total_rows = rows.len();
905        let mut vectors: Vec<(usize, Vec<f64>)> = Vec::new();
906        let mut progress = ProgressTracker::new(
907            format!("Creating IVFFlat index '{}'", index_name),
908            Some(total_rows),
909        );
910        for (row_idx, row) in rows.iter().enumerate() {
911            if col_idx < row.values.len() {
912                if let vibesql_types::SqlValue::Vector(vec_data) = &row.values[col_idx] {
913                    // Convert f32 vector to f64 for IVFFlat processing
914                    let vec_f64: Vec<f64> = vec_data.iter().map(|&v| v as f64).collect();
915                    vectors.push((row_idx, vec_f64));
916                }
917            }
918            progress.update(row_idx + 1);
919        }
920        progress.finish();
921
922        // Create the IVFFlat index with the extracted vectors
923        self.index_manager.create_ivfflat_index_with_vectors(
924            index_name,
925            table_name,
926            column_name,
927            dimensions,
928            lists,
929            metric,
930            vectors,
931        )
932    }
933
934    /// Search an IVFFlat index for approximate nearest neighbors
935    ///
936    /// # Arguments
937    /// * `index_name` - Name of the IVFFlat index
938    /// * `query_vector` - The query vector (f64)
939    /// * `k` - Maximum number of nearest neighbors to return
940    ///
941    /// # Returns
942    /// * `Ok(Vec<(usize, f64)>)` - Vector of (row_id, distance) pairs, ordered by distance
943    /// * `Err(StorageError)` - If index not found or not an IVFFlat index
944    pub fn search_ivfflat_index(
945        &self,
946        index_name: &str,
947        query_vector: &[f64],
948        k: usize,
949    ) -> Result<Vec<(usize, f64)>, StorageError> {
950        self.index_manager.search_ivfflat_index(index_name, query_vector, k)
951    }
952
953    /// Get all IVFFlat indexes for a specific table
954    pub fn get_ivfflat_indexes_for_table(
955        &self,
956        table_name: &str,
957    ) -> Vec<(&super::indexes::IndexMetadata, &super::indexes::ivfflat::IVFFlatIndex)> {
958        self.index_manager.get_ivfflat_indexes_for_table(table_name)
959    }
960
961    /// Set the number of probes for an IVFFlat index
962    pub fn set_ivfflat_probes(
963        &mut self,
964        index_name: &str,
965        probes: usize,
966    ) -> Result<(), StorageError> {
967        self.index_manager.set_ivfflat_probes(index_name, probes)
968    }
969
970    // ============================================================================
971    // HNSW Index Methods
972    // ============================================================================
973
974    /// Create an HNSW index for approximate nearest neighbor search
975    ///
976    /// Extracts vectors from the specified table and builds an HNSW index
977    /// using the hierarchical navigable small world algorithm.
978    #[allow(clippy::too_many_arguments)]
979    pub fn create_hnsw_index(
980        &mut self,
981        catalog: &vibesql_catalog::Catalog,
982        tables: &std::collections::HashMap<String, crate::Table>,
983        index_name: String,
984        table_name: String,
985        column_name: String,
986        col_idx: usize,
987        dimensions: usize,
988        m: u32,
989        ef_construction: u32,
990        metric: vibesql_ast::VectorDistanceMetric,
991    ) -> Result<(), StorageError> {
992        // Normalize table name for lookup (matches catalog normalization)
993        let normalized_name = if catalog.is_case_sensitive_identifiers() {
994            table_name.clone()
995        } else {
996            table_name.to_lowercase()
997        };
998
999        // Try to find the table with normalized name or qualified name
1000        let table = if let Some(tbl) = tables.get(&normalized_name) {
1001            tbl
1002        } else if !table_name.contains('.') {
1003            // Try with schema prefix
1004            let current_schema = catalog.get_current_schema();
1005            let qualified_name = format!("{}.{}", current_schema, normalized_name);
1006            tables
1007                .get(&qualified_name)
1008                .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?
1009        } else {
1010            return Err(StorageError::TableNotFound(table_name.clone()));
1011        };
1012
1013        // Extract vectors from the table
1014        // Note: SqlValue::Vector stores f32, but HNSW uses f64 for precision
1015        let rows = table.scan();
1016        let total_rows = rows.len();
1017        let mut vectors: Vec<(usize, Vec<f64>)> = Vec::new();
1018        let mut progress =
1019            ProgressTracker::new(format!("Creating HNSW index '{}'", index_name), Some(total_rows));
1020        for (row_idx, row) in rows.iter().enumerate() {
1021            if col_idx < row.values.len() {
1022                if let vibesql_types::SqlValue::Vector(vec_data) = &row.values[col_idx] {
1023                    // Convert f32 vector to f64 for HNSW processing
1024                    let vec_f64: Vec<f64> = vec_data.iter().map(|&v| v as f64).collect();
1025                    vectors.push((row_idx, vec_f64));
1026                }
1027            }
1028            progress.update(row_idx + 1);
1029        }
1030        progress.finish();
1031
1032        // Create the HNSW index with the extracted vectors
1033        self.index_manager.create_hnsw_index_with_vectors(
1034            index_name,
1035            table_name,
1036            column_name,
1037            dimensions,
1038            m,
1039            ef_construction,
1040            metric,
1041            vectors,
1042        )
1043    }
1044
1045    /// Search an HNSW index for approximate nearest neighbors
1046    ///
1047    /// # Arguments
1048    /// * `index_name` - Name of the HNSW index
1049    /// * `query_vector` - The query vector (f64)
1050    /// * `k` - Maximum number of nearest neighbors to return
1051    ///
1052    /// # Returns
1053    /// * `Ok(Vec<(usize, f64)>)` - Vector of (row_id, distance) pairs, ordered by distance
1054    /// * `Err(StorageError)` - If index not found or not an HNSW index
1055    pub fn search_hnsw_index(
1056        &self,
1057        index_name: &str,
1058        query_vector: &[f64],
1059        k: usize,
1060    ) -> Result<Vec<(usize, f64)>, StorageError> {
1061        self.index_manager.search_hnsw_index(index_name, query_vector, k)
1062    }
1063
1064    /// Get all HNSW indexes for a specific table
1065    pub fn get_hnsw_indexes_for_table(
1066        &self,
1067        table_name: &str,
1068    ) -> Vec<(&super::indexes::IndexMetadata, &super::indexes::hnsw::HnswIndex)> {
1069        self.index_manager.get_hnsw_indexes_for_table(table_name)
1070    }
1071
1072    /// Set the ef_search parameter for an HNSW index
1073    pub fn set_hnsw_ef_search(
1074        &mut self,
1075        index_name: &str,
1076        ef_search: usize,
1077    ) -> Result<(), StorageError> {
1078        self.index_manager.set_hnsw_ef_search(index_name, ef_search)
1079    }
1080
1081    /// Check if a spatial index exists
1082    pub fn spatial_index_exists(&self, index_name: &str) -> bool {
1083        let normalized = Self::normalize_index_name(index_name);
1084        self.spatial_indexes.contains_key(&normalized)
1085    }
1086
1087    /// Get spatial index metadata
1088    pub fn get_spatial_index_metadata(&self, index_name: &str) -> Option<&SpatialIndexMetadata> {
1089        let normalized = Self::normalize_index_name(index_name);
1090        self.spatial_indexes.get(&normalized).map(|(metadata, _)| metadata)
1091    }
1092
1093    /// Get spatial index (immutable)
1094    pub fn get_spatial_index(&self, index_name: &str) -> Option<&SpatialIndex> {
1095        let normalized = Self::normalize_index_name(index_name);
1096        self.spatial_indexes.get(&normalized).map(|(_, index)| index)
1097    }
1098
1099    /// Get spatial index (mutable)
1100    pub fn get_spatial_index_mut(&mut self, index_name: &str) -> Option<&mut SpatialIndex> {
1101        let normalized = Self::normalize_index_name(index_name);
1102        self.spatial_indexes.get_mut(&normalized).map(|(_, index)| index)
1103    }
1104
1105    /// Get all spatial indexes for a specific table
1106    pub fn get_spatial_indexes_for_table(
1107        &self,
1108        table_name: &str,
1109    ) -> Vec<(&SpatialIndexMetadata, &SpatialIndex)> {
1110        self.spatial_indexes
1111            .values()
1112            .filter(|(metadata, _)| metadata.table_name == table_name)
1113            .map(|(metadata, index)| (metadata, index))
1114            .collect()
1115    }
1116
1117    /// Get all spatial indexes for a specific table (mutable)
1118    pub fn get_spatial_indexes_for_table_mut(
1119        &mut self,
1120        table_name: &str,
1121    ) -> Vec<(&SpatialIndexMetadata, &mut SpatialIndex)> {
1122        self.spatial_indexes
1123            .iter_mut()
1124            .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1125            .map(|(_, (metadata, index))| (metadata as &SpatialIndexMetadata, index))
1126            .collect()
1127    }
1128
1129    /// Drop a spatial index
1130    pub fn drop_spatial_index(&mut self, index_name: &str) -> Result<(), StorageError> {
1131        let normalized = Self::normalize_index_name(index_name);
1132
1133        if self.spatial_indexes.remove(&normalized).is_none() {
1134            return Err(StorageError::IndexNotFound(index_name.to_string()));
1135        }
1136
1137        Ok(())
1138    }
1139
1140    /// Drop all spatial indexes associated with a table (CASCADE behavior)
1141    ///
1142    /// Matching is case-insensitive and handles both qualified ("schema.table")
1143    /// and unqualified ("table") names.
1144    pub fn drop_spatial_indexes_for_table(&mut self, table_name: &str) -> Vec<String> {
1145        // Normalize for case-insensitive comparison
1146        let search_name_lower = table_name.to_lowercase();
1147
1148        // Extract just the table name part if qualified (e.g., "public.users" -> "users")
1149        let search_table_only = search_name_lower.rsplit('.').next().unwrap_or(&search_name_lower);
1150
1151        let indexes_to_drop: Vec<String> = self
1152            .spatial_indexes
1153            .iter()
1154            .filter(|(_, (metadata, _))| {
1155                let stored_lower = metadata.table_name.to_lowercase();
1156                let stored_table_only = stored_lower.rsplit('.').next().unwrap_or(&stored_lower);
1157
1158                // Match if full names match OR unqualified parts match
1159                stored_lower == search_name_lower || stored_table_only == search_table_only
1160            })
1161            .map(|(name, _)| name.clone())
1162            .collect();
1163
1164        for index_name in &indexes_to_drop {
1165            self.spatial_indexes.remove(index_name);
1166        }
1167
1168        indexes_to_drop
1169    }
1170
1171    /// List all spatial indexes
1172    pub fn list_spatial_indexes(&self) -> Vec<String> {
1173        self.spatial_indexes.keys().cloned().collect()
1174    }
1175
1176    /// Check if any spatial indexes exist for a specific table
1177    ///
1178    /// This is an O(n) operation over all spatial indexes but is useful for
1179    /// optimizing bulk insert operations when no indexes need updating.
1180    fn has_spatial_indexes_for_table(&self, table_name: &str) -> bool {
1181        self.spatial_indexes.values().any(|(metadata, _)| metadata.table_name == table_name)
1182    }
1183
1184    /// Update spatial indexes for insert operation
1185    fn update_spatial_indexes_for_insert(
1186        &mut self,
1187        catalog: &vibesql_catalog::Catalog,
1188        table_name: &str,
1189        row: &Row,
1190        row_index: usize,
1191    ) {
1192        let table_schema = match catalog.get_table(table_name) {
1193            Some(schema) => schema,
1194            None => return,
1195        };
1196
1197        let indexes_to_update: Vec<(String, usize)> = self
1198            .spatial_indexes
1199            .iter()
1200            .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1201            .filter_map(|(index_name, (metadata, _))| {
1202                table_schema
1203                    .get_column_index(&metadata.column_name)
1204                    .map(|col_idx| (index_name.clone(), col_idx))
1205            })
1206            .collect();
1207
1208        for (index_name, col_idx) in indexes_to_update {
1209            let geom_value = &row.values[col_idx];
1210
1211            if let Some(mbr) = extract_mbr_from_sql_value(geom_value) {
1212                if let Some((_, index)) = self.spatial_indexes.get_mut(&index_name) {
1213                    index.insert(row_index, mbr);
1214                }
1215            }
1216        }
1217    }
1218
1219    /// Batch update spatial indexes for insert operation
1220    ///
1221    /// This is more efficient than calling `update_spatial_indexes_for_insert` in a loop
1222    /// because it pre-computes column indices once per index rather than once per row.
1223    ///
1224    /// # Arguments
1225    /// * `catalog` - The database catalog
1226    /// * `table_name` - The table name
1227    /// * `rows_to_insert` - Vec of (row_index, row) pairs to insert
1228    fn batch_update_spatial_indexes_for_insert(
1229        &mut self,
1230        catalog: &vibesql_catalog::Catalog,
1231        table_name: &str,
1232        rows_to_insert: &[(usize, &Row)],
1233    ) {
1234        if rows_to_insert.is_empty() {
1235            return;
1236        }
1237
1238        let table_schema = match catalog.get_table(table_name) {
1239            Some(schema) => schema,
1240            None => return,
1241        };
1242
1243        // Pre-compute indexes and column indices once
1244        let indexes_to_update: Vec<(String, usize)> = self
1245            .spatial_indexes
1246            .iter()
1247            .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1248            .filter_map(|(index_name, (metadata, _))| {
1249                table_schema
1250                    .get_column_index(&metadata.column_name)
1251                    .map(|col_idx| (index_name.clone(), col_idx))
1252            })
1253            .collect();
1254
1255        // Process each index
1256        for (index_name, col_idx) in indexes_to_update {
1257            if let Some((_, index)) = self.spatial_indexes.get_mut(&index_name) {
1258                for &(row_index, row) in rows_to_insert {
1259                    let geom_value = &row.values[col_idx];
1260                    if let Some(mbr) = extract_mbr_from_sql_value(geom_value) {
1261                        index.insert(row_index, mbr);
1262                    }
1263                }
1264            }
1265        }
1266    }
1267
1268    /// Update spatial indexes for update operation
1269    fn update_spatial_indexes_for_update(
1270        &mut self,
1271        catalog: &vibesql_catalog::Catalog,
1272        table_name: &str,
1273        old_row: &Row,
1274        new_row: &Row,
1275        row_index: usize,
1276    ) {
1277        let table_schema = match catalog.get_table(table_name) {
1278            Some(schema) => schema,
1279            None => return,
1280        };
1281
1282        let indexes_to_update: Vec<(String, usize)> = self
1283            .spatial_indexes
1284            .iter()
1285            .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1286            .filter_map(|(index_name, (metadata, _))| {
1287                table_schema
1288                    .get_column_index(&metadata.column_name)
1289                    .map(|col_idx| (index_name.clone(), col_idx))
1290            })
1291            .collect();
1292
1293        for (index_name, col_idx) in indexes_to_update {
1294            let old_geom = &old_row.values[col_idx];
1295            let new_geom = &new_row.values[col_idx];
1296
1297            if old_geom != new_geom {
1298                if let Some((_, index)) = self.spatial_indexes.get_mut(&index_name) {
1299                    if let Some(old_mbr) = extract_mbr_from_sql_value(old_geom) {
1300                        index.remove(row_index, &old_mbr);
1301                    }
1302
1303                    if let Some(new_mbr) = extract_mbr_from_sql_value(new_geom) {
1304                        index.insert(row_index, new_mbr);
1305                    }
1306                }
1307            }
1308        }
1309    }
1310
1311    fn update_spatial_indexes_for_delete_with_values(
1312        &mut self,
1313        catalog: &vibesql_catalog::Catalog,
1314        table_name: &str,
1315        values: &[vibesql_types::SqlValue],
1316        row_index: usize,
1317    ) {
1318        let table_schema = match catalog.get_table(table_name) {
1319            Some(schema) => schema,
1320            None => return,
1321        };
1322
1323        let indexes_to_update: Vec<(String, usize)> = self
1324            .spatial_indexes
1325            .iter()
1326            .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1327            .filter_map(|(index_name, (metadata, _))| {
1328                table_schema
1329                    .get_column_index(&metadata.column_name)
1330                    .map(|col_idx| (index_name.clone(), col_idx))
1331            })
1332            .collect();
1333
1334        for (index_name, col_idx) in indexes_to_update {
1335            let geom_value = &values[col_idx];
1336
1337            if let Some(mbr) = extract_mbr_from_sql_value(geom_value) {
1338                if let Some((_, index)) = self.spatial_indexes.get_mut(&index_name) {
1339                    index.remove(row_index, &mbr);
1340                }
1341            }
1342        }
1343    }
1344
1345    /// Batch update spatial indexes for delete operation
1346    ///
1347    /// This is significantly more efficient than calling
1348    /// `update_spatial_indexes_for_delete_with_values` in a loop because it pre-computes column
1349    /// indices once per index rather than once per row.
1350    fn batch_update_spatial_indexes_for_delete(
1351        &mut self,
1352        catalog: &vibesql_catalog::Catalog,
1353        table_name: &str,
1354        rows_to_delete: &[(usize, &Row)],
1355    ) {
1356        if rows_to_delete.is_empty() {
1357            return;
1358        }
1359
1360        let table_schema = match catalog.get_table(table_name) {
1361            Some(schema) => schema,
1362            None => return,
1363        };
1364
1365        // Pre-compute which spatial indexes apply to this table and their column indices
1366        let indexes_to_update: Vec<(String, usize)> = self
1367            .spatial_indexes
1368            .iter()
1369            .filter(|(_, (metadata, _))| metadata.table_name == table_name)
1370            .filter_map(|(index_name, (metadata, _))| {
1371                table_schema
1372                    .get_column_index(&metadata.column_name)
1373                    .map(|col_idx| (index_name.clone(), col_idx))
1374            })
1375            .collect();
1376
1377        if indexes_to_update.is_empty() {
1378            return;
1379        }
1380
1381        // Process each spatial index - batch remove entries for all rows
1382        for (index_name, col_idx) in indexes_to_update {
1383            if let Some((_, index)) = self.spatial_indexes.get_mut(&index_name) {
1384                for &(row_index, row) in rows_to_delete {
1385                    let geom_value = &row.values[col_idx];
1386                    if let Some(mbr) = extract_mbr_from_sql_value(geom_value) {
1387                        index.remove(row_index, &mbr);
1388                    }
1389                }
1390            }
1391        }
1392    }
1393
1394    /// Reset the operations manager to empty state (clears all indexes).
1395    ///
1396    /// Clears all index data but preserves configuration (database path, storage backend, config).
1397    /// This is more efficient than creating a new instance and ensures indexes work after reset.
1398    pub fn reset(&mut self) {
1399        // Clear all user-defined indexes (preserves database_path, storage, config)
1400        self.index_manager.reset();
1401
1402        // Clear all spatial indexes
1403        self.spatial_indexes.clear();
1404    }
1405}
1406
1407impl Default for Operations {
1408    fn default() -> Self {
1409        Self::new()
1410    }
1411}