vibesql_storage/database/
index_ops.rs

1// ============================================================================
2// Database Index Operations
3// ============================================================================
4
5use vibesql_ast::IndexColumn;
6
7use super::{core::Database, operations::SpatialIndexMetadata};
8use crate::{Row, StorageError};
9
10// ============================================================================
11// DELETE Profiling Statistics (thread-local aggregates)
12// ============================================================================
13
14/// Aggregate statistics for DELETE profiling.
15///
16/// # Environment Variables
17///
18/// - `DELETE_PROFILE=1` - Enable timing collection and print summary on thread exit
19/// - `DELETE_PROFILE_VERBOSE=1` - Also print per-delete breakdown to stderr
20///
21/// When `DELETE_PROFILE=1` is set, aggregate statistics are automatically printed
22/// when the thread-local stats are dropped (typically at thread exit).
23#[derive(Default)]
24pub struct DeleteProfileStats {
25    pub count: u64,
26    pub total_ns: u128,
27    pub pk_lookup_ns: u128,
28    pub row_clone_ns: u128,
29    pub wal_ns: u128,
30    pub index_update_ns: u128,
31    pub row_remove_ns: u128,
32    pub cache_ns: u128,
33}
34
35impl DeleteProfileStats {
36    /// Add timing data from a single delete operation
37    pub fn record(&mut self, phase_times: &[u128; 6], total_ns: u128) {
38        self.count += 1;
39        self.total_ns += total_ns;
40        self.pk_lookup_ns += phase_times[0];
41        self.row_clone_ns += phase_times[1];
42        self.wal_ns += phase_times[2];
43        self.index_update_ns += phase_times[3];
44        self.row_remove_ns += phase_times[4];
45        self.cache_ns += phase_times[5];
46    }
47
48    /// Print a summary of the aggregate statistics
49    pub fn print_summary(&self) {
50        if self.count == 0 {
51            return;
52        }
53        let total = self.total_ns;
54        let avg_us = (total as f64 / self.count as f64) / 1000.0;
55        eprintln!("\n=== DELETE PROFILE SUMMARY ({} deletes) ===", self.count);
56        eprintln!("Average DELETE time: {:.1}µs", avg_us);
57        eprintln!(
58            "  pk_lookup:    {:>8.1}µs ({:>5.1}%)",
59            (self.pk_lookup_ns as f64 / self.count as f64) / 1000.0,
60            if total > 0 { self.pk_lookup_ns as f64 / total as f64 * 100.0 } else { 0.0 }
61        );
62        eprintln!(
63            "  value_clone:  {:>8.1}µs ({:>5.1}%)",
64            (self.row_clone_ns as f64 / self.count as f64) / 1000.0,
65            if total > 0 { self.row_clone_ns as f64 / total as f64 * 100.0 } else { 0.0 }
66        );
67        eprintln!(
68            "  wal:          {:>8.1}µs ({:>5.1}%)",
69            (self.wal_ns as f64 / self.count as f64) / 1000.0,
70            if total > 0 { self.wal_ns as f64 / total as f64 * 100.0 } else { 0.0 }
71        );
72        eprintln!(
73            "  index_update: {:>8.1}µs ({:>5.1}%)",
74            (self.index_update_ns as f64 / self.count as f64) / 1000.0,
75            if total > 0 { self.index_update_ns as f64 / total as f64 * 100.0 } else { 0.0 }
76        );
77        eprintln!(
78            "  row_remove:   {:>8.1}µs ({:>5.1}%)",
79            (self.row_remove_ns as f64 / self.count as f64) / 1000.0,
80            if total > 0 { self.row_remove_ns as f64 / total as f64 * 100.0 } else { 0.0 }
81        );
82        eprintln!(
83            "  cache:        {:>8.1}µs ({:>5.1}%)",
84            (self.cache_ns as f64 / self.count as f64) / 1000.0,
85            if total > 0 { self.cache_ns as f64 / total as f64 * 100.0 } else { 0.0 }
86        );
87        eprintln!("==========================================\n");
88    }
89}
90
91impl Drop for DeleteProfileStats {
92    fn drop(&mut self) {
93        // Print summary if DELETE_PROFILE=1 (auto-summary) or DELETE_PROFILE_SUMMARY=1 (explicit)
94        if self.count > 0
95            && (std::env::var("DELETE_PROFILE").is_ok()
96                || std::env::var("DELETE_PROFILE_SUMMARY").is_ok())
97        {
98            self.print_summary();
99        }
100    }
101}
102
103thread_local! {
104    /// Thread-local aggregate statistics for DELETE profiling
105    pub static DELETE_PROFILE_STATS: std::cell::RefCell<DeleteProfileStats> =
106        std::cell::RefCell::new(DeleteProfileStats::default());
107}
108
109/// Print the DELETE profile summary for the current thread.
110/// Call this at the end of a benchmark to see aggregate statistics.
111pub fn print_delete_profile_summary() {
112    DELETE_PROFILE_STATS.with(|stats| {
113        stats.borrow().print_summary();
114    });
115}
116
117/// Reset the DELETE profile statistics for the current thread.
118pub fn reset_delete_profile_stats() {
119    DELETE_PROFILE_STATS.with(|stats| {
120        *stats.borrow_mut() = DeleteProfileStats::default();
121    });
122}
123
124impl Database {
125    // ============================================================================
126    // Index Management
127    // ============================================================================
128
129    /// Create an index
130    pub fn create_index(
131        &mut self,
132        index_name: String,
133        table_name: String,
134        unique: bool,
135        columns: Vec<IndexColumn>,
136    ) -> Result<(), StorageError> {
137        self.operations.create_index(
138            &self.catalog,
139            &self.tables,
140            index_name,
141            table_name,
142            unique,
143            columns,
144        )
145    }
146
147    /// Create an index with pre-computed keys (for expression indexes)
148    ///
149    /// This method is used when the caller has already evaluated the expressions
150    /// and computed the key values for each row. This is necessary for expression
151    /// indexes where the key values are derived from evaluating expressions on rows.
152    ///
153    /// # Arguments
154    /// * `index_name` - Name of the index to create
155    /// * `table_name` - Name of the table this index is on
156    /// * `unique` - Whether this is a unique index
157    /// * `columns` - The index column definitions (for metadata storage)
158    /// * `keys` - Pre-computed (key_values, row_id) pairs
159    pub fn create_index_with_keys(
160        &mut self,
161        index_name: String,
162        table_name: String,
163        unique: bool,
164        columns: Vec<IndexColumn>,
165        keys: Vec<(Vec<vibesql_types::SqlValue>, usize)>,
166    ) -> Result<(), StorageError> {
167        self.operations.create_index_with_keys(
168            &self.catalog,
169            index_name,
170            table_name,
171            unique,
172            columns,
173            keys,
174        )
175    }
176
177    /// Check if an index exists
178    pub fn index_exists(&self, index_name: &str) -> bool {
179        self.operations.index_exists(index_name)
180    }
181
182    /// Get index metadata
183    pub fn get_index(&self, index_name: &str) -> Option<&super::indexes::IndexMetadata> {
184        self.operations.get_index(index_name)
185    }
186
187    /// Get index data
188    pub fn get_index_data(&self, index_name: &str) -> Option<&super::indexes::IndexData> {
189        self.operations.get_index_data(index_name)
190    }
191
192    /// Update user-defined indexes for update operation
193    ///
194    /// # Arguments
195    /// * `table_name` - Name of the table being updated
196    /// * `old_row` - Row data before the update
197    /// * `new_row` - Row data after the update
198    /// * `row_index` - Index of the row in the table
199    /// * `changed_columns` - Optional set of column indices that were modified. If provided,
200    ///   indexes that don't involve any changed columns will be skipped.
201    pub fn update_indexes_for_update(
202        &mut self,
203        table_name: &str,
204        old_row: &Row,
205        new_row: &Row,
206        row_index: usize,
207        changed_columns: Option<&std::collections::HashSet<usize>>,
208    ) {
209        self.operations.update_indexes_for_update(
210            &self.catalog,
211            table_name,
212            old_row,
213            new_row,
214            row_index,
215            changed_columns,
216        );
217    }
218
219    /// Update user-defined indexes for delete operation
220    pub fn update_indexes_for_delete(&mut self, table_name: &str, row: &Row, row_index: usize) {
221        self.operations.update_indexes_for_delete(&self.catalog, table_name, row, row_index);
222    }
223
224    /// Batch update user-defined indexes for delete operation
225    ///
226    /// This is significantly more efficient than calling `update_indexes_for_delete` in a loop
227    /// because it pre-computes column indices once per index rather than once per row.
228    pub fn batch_update_indexes_for_delete(
229        &mut self,
230        table_name: &str,
231        rows_to_delete: &[(usize, &Row)],
232    ) {
233        self.operations.batch_update_indexes_for_delete(&self.catalog, table_name, rows_to_delete);
234    }
235
236    /// Rebuild user-defined indexes after bulk operations that change row indices
237    pub fn rebuild_indexes(&mut self, table_name: &str) {
238        self.operations.rebuild_indexes(&self.catalog, &self.tables, table_name);
239    }
240
241    /// Adjust user-defined indexes after row deletions
242    ///
243    /// This is more efficient than rebuild_indexes when only a few rows are deleted,
244    /// as it adjusts row indices in place rather than rebuilding from scratch.
245    ///
246    /// # Arguments
247    /// * `table_name` - Name of the table whose indexes need adjustment
248    /// * `deleted_indices` - Sorted list of deleted row indices (ascending order)
249    pub fn adjust_indexes_after_delete(&mut self, table_name: &str, deleted_indices: &[usize]) {
250        self.operations.adjust_indexes_after_delete(table_name, deleted_indices);
251    }
252
253    /// Drop an index
254    pub fn drop_index(&mut self, index_name: &str) -> Result<(), StorageError> {
255        self.operations.drop_index(index_name)
256    }
257
258    /// List all indexes
259    pub fn list_indexes(&self) -> Vec<String> {
260        self.operations.list_indexes()
261    }
262
263    /// List all indexes for a specific table
264    pub fn list_indexes_for_table(&self, table_name: &str) -> Vec<String> {
265        self.operations.list_indexes_for_table(table_name)
266    }
267
268    /// Check if a column has any user-defined index
269    ///
270    /// This is used to determine if updates to a column require index maintenance.
271    /// Returns true if any user-defined index (B-tree or spatial) includes this column.
272    #[inline]
273    pub fn has_index_on_column(&self, table_name: &str, column_name: &str) -> bool {
274        self.operations.has_index_on_column(table_name, column_name)
275    }
276
277    // ============================================================================
278    // Expression Index Methods
279    // ============================================================================
280
281    /// Add row to expression indexes after insert with pre-computed keys
282    ///
283    /// This method handles expression indexes which require pre-computed key values
284    /// since the storage layer cannot evaluate expressions.
285    pub fn add_to_expression_indexes_for_insert(
286        &mut self,
287        table_name: &str,
288        row_index: usize,
289        expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
290    ) {
291        self.operations.add_to_expression_indexes_for_insert(
292            table_name,
293            row_index,
294            expression_keys,
295        );
296    }
297
298    /// Update expression indexes for update operation with pre-computed keys
299    pub fn update_expression_indexes_for_update(
300        &mut self,
301        table_name: &str,
302        row_index: usize,
303        old_expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
304        new_expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
305    ) {
306        self.operations.update_expression_indexes_for_update(
307            table_name,
308            row_index,
309            old_expression_keys,
310            new_expression_keys,
311        );
312    }
313
314    /// Update expression indexes for delete operation with pre-computed keys
315    pub fn update_expression_indexes_for_delete(
316        &mut self,
317        table_name: &str,
318        row_index: usize,
319        expression_keys: &std::collections::HashMap<String, Vec<vibesql_types::SqlValue>>,
320    ) {
321        self.operations.update_expression_indexes_for_delete(
322            table_name,
323            row_index,
324            expression_keys,
325        );
326    }
327
328    /// Get expression indexes for a specific table
329    ///
330    /// Returns metadata for all expression indexes on the table. Used by executor
331    /// to determine which indexes need expression evaluation during DML operations.
332    pub fn get_expression_indexes_for_table(
333        &self,
334        table_name: &str,
335    ) -> Vec<(String, &crate::database::indexes::IndexMetadata)> {
336        self.operations.get_expression_indexes_for_table(table_name)
337    }
338
339    /// Check if a table has any expression indexes
340    pub fn has_expression_indexes(&self, table_name: &str) -> bool {
341        self.operations.has_expression_indexes(table_name)
342    }
343
344    /// Clear expression index data for a table (for rebuilding after compaction)
345    pub fn clear_expression_index_data(&mut self, table_name: &str) {
346        self.operations.clear_expression_index_data(table_name);
347    }
348
349    // ============================================================================
350    // Spatial Index Methods
351    // ============================================================================
352
353    /// Create a spatial index
354    pub fn create_spatial_index(
355        &mut self,
356        metadata: SpatialIndexMetadata,
357        spatial_index: crate::index::SpatialIndex,
358    ) -> Result<(), StorageError> {
359        self.operations.create_spatial_index(metadata, spatial_index)
360    }
361
362    /// Create an IVFFlat index for approximate nearest neighbor search on vector columns
363    ///
364    /// This method creates an IVFFlat (Inverted File with Flat quantization) index
365    /// for efficient approximate nearest neighbor search on vector data.
366    ///
367    /// # Arguments
368    /// * `index_name` - Name for the new index
369    /// * `table_name` - Name of the table containing the vector column
370    /// * `column_name` - Name of the vector column to index
371    /// * `col_idx` - Column index in the table schema
372    /// * `dimensions` - Number of dimensions in the vectors
373    /// * `lists` - Number of clusters for the IVFFlat algorithm
374    /// * `metric` - Distance metric to use (L2, Cosine, InnerProduct)
375    #[allow(clippy::too_many_arguments)]
376    pub fn create_ivfflat_index(
377        &mut self,
378        index_name: String,
379        table_name: String,
380        column_name: String,
381        col_idx: usize,
382        dimensions: usize,
383        lists: usize,
384        metric: vibesql_ast::VectorDistanceMetric,
385    ) -> Result<(), StorageError> {
386        self.operations.create_ivfflat_index(
387            &self.catalog,
388            &self.tables,
389            index_name,
390            table_name,
391            column_name,
392            col_idx,
393            dimensions,
394            lists,
395            metric,
396        )
397    }
398
399    /// Search an IVFFlat index for approximate nearest neighbors
400    ///
401    /// # Arguments
402    /// * `index_name` - Name of the IVFFlat index
403    /// * `query_vector` - The query vector (f64)
404    /// * `k` - Maximum number of nearest neighbors to return
405    ///
406    /// # Returns
407    /// * `Ok(Vec<(usize, f64)>)` - Vector of (row_id, distance) pairs, ordered by distance
408    /// * `Err(StorageError)` - If index not found or not an IVFFlat index
409    pub fn search_ivfflat_index(
410        &self,
411        index_name: &str,
412        query_vector: &[f64],
413        k: usize,
414    ) -> Result<Vec<(usize, f64)>, StorageError> {
415        self.operations.search_ivfflat_index(index_name, query_vector, k)
416    }
417
418    /// Get all IVFFlat indexes for a specific table
419    pub fn get_ivfflat_indexes_for_table(
420        &self,
421        table_name: &str,
422    ) -> Vec<(&super::indexes::IndexMetadata, &super::indexes::ivfflat::IVFFlatIndex)> {
423        self.operations.get_ivfflat_indexes_for_table(table_name)
424    }
425
426    /// Set the number of probes for an IVFFlat index
427    pub fn set_ivfflat_probes(
428        &mut self,
429        index_name: &str,
430        probes: usize,
431    ) -> Result<(), StorageError> {
432        self.operations.set_ivfflat_probes(index_name, probes)
433    }
434
435    // ============================================================================
436    // HNSW Index Methods
437    // ============================================================================
438
439    /// Create an HNSW index for approximate nearest neighbor search on vector columns
440    ///
441    /// This method creates an HNSW (Hierarchical Navigable Small World) index
442    /// for efficient approximate nearest neighbor search on vector data.
443    ///
444    /// # Arguments
445    /// * `index_name` - Name for the new index
446    /// * `table_name` - Name of the table containing the vector column
447    /// * `column_name` - Name of the vector column to index
448    /// * `col_idx` - Column index in the table schema
449    /// * `dimensions` - Number of dimensions in the vectors
450    /// * `m` - Maximum number of connections per node (default 16)
451    /// * `ef_construction` - Size of dynamic candidate list during construction (default 64)
452    /// * `metric` - Distance metric to use (L2, Cosine, InnerProduct)
453    #[allow(clippy::too_many_arguments)]
454    pub fn create_hnsw_index(
455        &mut self,
456        index_name: String,
457        table_name: String,
458        column_name: String,
459        col_idx: usize,
460        dimensions: usize,
461        m: u32,
462        ef_construction: u32,
463        metric: vibesql_ast::VectorDistanceMetric,
464    ) -> Result<(), StorageError> {
465        self.operations.create_hnsw_index(
466            &self.catalog,
467            &self.tables,
468            index_name,
469            table_name,
470            column_name,
471            col_idx,
472            dimensions,
473            m,
474            ef_construction,
475            metric,
476        )
477    }
478
479    /// Search an HNSW index for approximate nearest neighbors
480    ///
481    /// # Arguments
482    /// * `index_name` - Name of the HNSW index
483    /// * `query_vector` - The query vector (f64)
484    /// * `k` - Maximum number of nearest neighbors to return
485    ///
486    /// # Returns
487    /// * `Ok(Vec<(usize, f64)>)` - Vector of (row_id, distance) pairs, ordered by distance
488    /// * `Err(StorageError)` - If index not found or not an HNSW index
489    pub fn search_hnsw_index(
490        &self,
491        index_name: &str,
492        query_vector: &[f64],
493        k: usize,
494    ) -> Result<Vec<(usize, f64)>, StorageError> {
495        self.operations.search_hnsw_index(index_name, query_vector, k)
496    }
497
498    /// Get all HNSW indexes for a specific table
499    pub fn get_hnsw_indexes_for_table(
500        &self,
501        table_name: &str,
502    ) -> Vec<(&super::indexes::IndexMetadata, &super::indexes::hnsw::HnswIndex)> {
503        self.operations.get_hnsw_indexes_for_table(table_name)
504    }
505
506    /// Set the ef_search parameter for an HNSW index
507    pub fn set_hnsw_ef_search(
508        &mut self,
509        index_name: &str,
510        ef_search: usize,
511    ) -> Result<(), StorageError> {
512        self.operations.set_hnsw_ef_search(index_name, ef_search)
513    }
514
515    /// Check if a spatial index exists
516    pub fn spatial_index_exists(&self, index_name: &str) -> bool {
517        self.operations.spatial_index_exists(index_name)
518    }
519
520    /// Get spatial index metadata
521    pub fn get_spatial_index_metadata(&self, index_name: &str) -> Option<&SpatialIndexMetadata> {
522        self.operations.get_spatial_index_metadata(index_name)
523    }
524
525    /// Get spatial index (immutable)
526    pub fn get_spatial_index(&self, index_name: &str) -> Option<&crate::index::SpatialIndex> {
527        self.operations.get_spatial_index(index_name)
528    }
529
530    /// Get spatial index (mutable)
531    pub fn get_spatial_index_mut(
532        &mut self,
533        index_name: &str,
534    ) -> Option<&mut crate::index::SpatialIndex> {
535        self.operations.get_spatial_index_mut(index_name)
536    }
537
538    /// Get all spatial indexes for a specific table
539    pub fn get_spatial_indexes_for_table(
540        &self,
541        table_name: &str,
542    ) -> Vec<(&SpatialIndexMetadata, &crate::index::SpatialIndex)> {
543        self.operations.get_spatial_indexes_for_table(table_name)
544    }
545
546    /// Get all spatial indexes for a specific table (mutable)
547    pub fn get_spatial_indexes_for_table_mut(
548        &mut self,
549        table_name: &str,
550    ) -> Vec<(&SpatialIndexMetadata, &mut crate::index::SpatialIndex)> {
551        self.operations.get_spatial_indexes_for_table_mut(table_name)
552    }
553
554    /// Drop a spatial index
555    pub fn drop_spatial_index(&mut self, index_name: &str) -> Result<(), StorageError> {
556        self.operations.drop_spatial_index(index_name)
557    }
558
559    /// Drop all spatial indexes associated with a table (CASCADE behavior)
560    pub fn drop_spatial_indexes_for_table(&mut self, table_name: &str) -> Vec<String> {
561        self.operations.drop_spatial_indexes_for_table(table_name)
562    }
563
564    /// List all spatial indexes
565    pub fn list_spatial_indexes(&self) -> Vec<String> {
566        self.operations.list_spatial_indexes()
567    }
568
569    // ============================================================================
570    // Direct Index Lookup API (High-Performance OLTP)
571    // ============================================================================
572
573    /// Look up rows by index name and key values - bypasses SQL parsing for maximum performance
574    ///
575    /// This method provides direct B+ tree index lookups, completely bypassing SQL parsing
576    /// and the query execution pipeline. Use this for performance-critical OLTP workloads
577    /// where you know the exact index and key values.
578    ///
579    /// # Arguments
580    /// * `index_name` - Name of the index (as created with CREATE INDEX)
581    /// * `key_values` - Key values to look up (must match index column order)
582    ///
583    /// # Returns
584    /// * `Ok(Some(Vec<&Row>))` - The rows matching the key
585    /// * `Ok(None)` - No rows match the key
586    /// * `Err(StorageError)` - Index not found or other error
587    ///
588    /// # Performance
589    /// This is ~100-300x faster than executing a SQL SELECT query because it:
590    /// - Skips SQL parsing (~300µs saved)
591    /// - Skips query planning and optimization
592    /// - Uses direct B+ tree lookup on the index
593    ///
594    /// # Example
595    /// ```text
596    /// // Single-column index lookup
597    /// let rows = db.lookup_by_index("idx_users_pk", &[SqlValue::Integer(42)])?;
598    ///
599    /// // Composite key lookup
600    /// let rows = db.lookup_by_index("idx_orders_pk", &[
601    ///     SqlValue::Integer(warehouse_id),
602    ///     SqlValue::Integer(district_id),
603    ///     SqlValue::Integer(order_id),
604    /// ])?;
605    /// ```
606    pub fn lookup_by_index(
607        &self,
608        index_name: &str,
609        key_values: &[vibesql_types::SqlValue],
610    ) -> Result<Option<Vec<&Row>>, StorageError> {
611        // Get index metadata to find the table
612        let metadata = self
613            .get_index(index_name)
614            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
615        let table_name = metadata.table_name.clone();
616
617        // Get the index data
618        let index_data = self
619            .get_index_data(index_name)
620            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
621
622        // Perform the lookup
623        let row_indices = match index_data.get(key_values) {
624            Some(indices) => indices,
625            None => return Ok(None),
626        };
627
628        // Get the table
629        let table = self
630            .get_table(&table_name)
631            .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
632
633        // Collect the rows (use get_row() to skip deleted rows)
634        let mut result = Vec::with_capacity(row_indices.len());
635        for &idx in &row_indices {
636            // Use get_row() which returns None for deleted rows
637            if let Some(row) = table.get_row(idx) {
638                result.push(row);
639            }
640        }
641
642        if result.is_empty() {
643            Ok(None)
644        } else {
645            Ok(Some(result))
646        }
647    }
648
649    /// Look up the first row by index - optimized for unique indexes
650    ///
651    /// This is a convenience method for unique indexes where you expect exactly one row.
652    /// Returns only the first matching row.
653    ///
654    /// # Arguments
655    /// * `index_name` - Name of the index
656    /// * `key_values` - Key values to look up
657    ///
658    /// # Returns
659    /// * `Ok(Some(&Row))` - The first matching row
660    /// * `Ok(None)` - No row matches the key
661    /// * `Err(StorageError)` - Index not found or other error
662    pub fn lookup_one_by_index(
663        &self,
664        index_name: &str,
665        key_values: &[vibesql_types::SqlValue],
666    ) -> Result<Option<&Row>, StorageError> {
667        // Get index metadata to find the table
668        let metadata = self
669            .get_index(index_name)
670            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
671        let table_name = metadata.table_name.clone();
672
673        // Get the index data
674        let index_data = self
675            .get_index_data(index_name)
676            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
677
678        // Perform the lookup
679        let row_indices = match index_data.get(key_values) {
680            Some(indices) => indices,
681            None => return Ok(None),
682        };
683
684        // Get the first row index
685        let first_idx = match row_indices.first() {
686            Some(&idx) => idx,
687            None => return Ok(None),
688        };
689
690        // Get the table and return the row using O(1) direct access
691        let table = self
692            .get_table(&table_name)
693            .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
694
695        Ok(table.get_row(first_idx))
696    }
697
698    /// Batch lookup by index - look up multiple keys in a single call
699    ///
700    /// This method is optimized for batch point lookups where you need to retrieve
701    /// multiple rows by their index keys. It's more efficient than calling
702    /// `lookup_by_index` in a loop.
703    ///
704    /// # Arguments
705    /// * `index_name` - Name of the index
706    /// * `keys` - List of key value tuples to look up
707    ///
708    /// # Returns
709    /// * `Ok(Vec<Option<Vec<&Row>>>)` - For each key, the matching rows (or None if not found)
710    /// * `Err(StorageError)` - Index not found or other error
711    ///
712    /// # Example
713    /// ```text
714    /// // Batch lookup multiple items
715    /// let results = db.lookup_by_index_batch("idx_items_pk", &[
716    ///     vec![SqlValue::Integer(1)],
717    ///     vec![SqlValue::Integer(2)],
718    ///     vec![SqlValue::Integer(3)],
719    /// ])?;
720    ///
721    /// for (key_idx, rows) in results.iter().enumerate() {
722    ///     if let Some(rows) = rows {
723    ///         println!("Key {} matched {} rows", key_idx, rows.len());
724    ///     }
725    /// }
726    /// ```
727    pub fn lookup_by_index_batch<'a>(
728        &'a self,
729        index_name: &str,
730        keys: &[Vec<vibesql_types::SqlValue>],
731    ) -> Result<Vec<Option<Vec<&'a Row>>>, StorageError> {
732        // Get index metadata to find the table
733        let metadata = self
734            .get_index(index_name)
735            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
736        let table_name = metadata.table_name.clone();
737
738        // Get the index data
739        let index_data = self
740            .get_index_data(index_name)
741            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
742
743        // Get the table once for O(1) row access
744        let table = self
745            .get_table(&table_name)
746            .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
747
748        // Look up each key using direct row access
749        let mut results = Vec::with_capacity(keys.len());
750        for key in keys {
751            let row_indices = index_data.get(key);
752            match row_indices {
753                Some(indices) if !indices.is_empty() => {
754                    let matched_rows: Vec<_> =
755                        indices.iter().filter_map(|&idx| table.get_row(idx)).collect();
756                    if matched_rows.is_empty() {
757                        results.push(None);
758                    } else {
759                        results.push(Some(matched_rows));
760                    }
761                }
762                _ => results.push(None),
763            }
764        }
765
766        Ok(results)
767    }
768
769    /// Batch lookup returning first row only - optimized for unique indexes
770    ///
771    /// Like `lookup_by_index_batch` but returns only the first matching row for each key.
772    /// More efficient when you know the index is unique.
773    ///
774    /// # Arguments
775    /// * `index_name` - Name of the index
776    /// * `keys` - List of key value tuples to look up
777    ///
778    /// # Returns
779    /// * `Ok(Vec<Option<&Row>>)` - For each key, the first matching row (or None)
780    pub fn lookup_one_by_index_batch<'a>(
781        &'a self,
782        index_name: &str,
783        keys: &[Vec<vibesql_types::SqlValue>],
784    ) -> Result<Vec<Option<&'a Row>>, StorageError> {
785        // Get index metadata to find the table
786        let metadata = self
787            .get_index(index_name)
788            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
789        let table_name = metadata.table_name.clone();
790
791        // Get the index data
792        let index_data = self
793            .get_index_data(index_name)
794            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
795
796        // Get the table once for O(1) row access
797        let table = self
798            .get_table(&table_name)
799            .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
800
801        // Look up each key using direct row access
802        let mut results = Vec::with_capacity(keys.len());
803        for key in keys {
804            let row_indices = index_data.get(key);
805            match row_indices {
806                Some(indices) if !indices.is_empty() => {
807                    results.push(table.get_row(indices[0]));
808                }
809                _ => results.push(None),
810            }
811        }
812
813        Ok(results)
814    }
815
816    // ============================================================================
817    // Prefix Index Lookup API (Multi-column indexes)
818    // ============================================================================
819
820    /// Look up rows by index using prefix matching - for multi-column indexes
821    ///
822    /// This method performs prefix matching on multi-column indexes. For example,
823    /// with an index on (a, b, c), you can look up all rows where (a, b) match
824    /// a specific value, regardless of c.
825    ///
826    /// # Arguments
827    /// * `index_name` - Name of the index (as created with CREATE INDEX)
828    /// * `prefix` - Prefix key values to match (must be a prefix of index columns)
829    ///
830    /// # Returns
831    /// * `Ok(Vec<&Row>)` - The rows matching the prefix (empty if none found)
832    /// * `Err(StorageError)` - Index not found or other error
833    ///
834    /// # Performance
835    /// Uses efficient B+ tree range scan: O(log n + k) where n is total keys, k is matches.
836    ///
837    /// # Example
838    /// ```text
839    /// // Index on (warehouse_id, district_id, order_id) - 3 columns
840    /// // Find all orders for warehouse 1, district 5 (2-column prefix)
841    /// let rows = db.lookup_by_index_prefix("idx_orders_pk", &[
842    ///     SqlValue::Integer(1),  // warehouse_id
843    ///     SqlValue::Integer(5),  // district_id
844    /// ])?;
845    /// ```
846    pub fn lookup_by_index_prefix(
847        &self,
848        index_name: &str,
849        prefix: &[vibesql_types::SqlValue],
850    ) -> Result<Vec<&Row>, StorageError> {
851        // Get index metadata to find the table
852        let metadata = self
853            .get_index(index_name)
854            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
855        let table_name = metadata.table_name.clone();
856
857        // Get the index data
858        let index_data = self
859            .get_index_data(index_name)
860            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
861
862        // Perform the prefix scan
863        let row_indices = index_data.prefix_scan(prefix);
864        if row_indices.is_empty() {
865            return Ok(vec![]);
866        }
867
868        // Get the table
869        let table = self
870            .get_table(&table_name)
871            .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
872
873        // Collect the rows using O(1) direct access
874        let rows: Vec<_> = row_indices.iter().filter_map(|&idx| table.get_row(idx)).collect();
875
876        Ok(rows)
877    }
878
879    /// Batch prefix lookup - look up multiple prefixes in a single call
880    ///
881    /// This method is optimized for batch prefix lookups on multi-column indexes.
882    /// For each prefix, returns all rows where the key prefix matches.
883    ///
884    /// # Arguments
885    /// * `index_name` - Name of the index
886    /// * `prefixes` - List of prefix key tuples to look up
887    ///
888    /// # Returns
889    /// * `Ok(Vec<Vec<&Row>>)` - For each prefix, the matching rows (empty vec if none)
890    /// * `Err(StorageError)` - Index not found or other error
891    ///
892    /// # Example
893    /// ```text
894    /// // Index on (w_id, d_id, o_id) - find new orders for all 10 districts
895    /// let prefixes: Vec<Vec<SqlValue>> = (1..=10)
896    ///     .map(|d| vec![SqlValue::Integer(w_id), SqlValue::Integer(d)])
897    ///     .collect();
898    /// let results = db.lookup_by_index_prefix_batch("idx_new_order_pk", &prefixes)?;
899    /// // results[0] = rows for district 1, results[1] = rows for district 2, etc.
900    /// ```
901    pub fn lookup_by_index_prefix_batch<'a>(
902        &'a self,
903        index_name: &str,
904        prefixes: &[Vec<vibesql_types::SqlValue>],
905    ) -> Result<Vec<Vec<&'a Row>>, StorageError> {
906        // Get index metadata to find the table
907        let metadata = self
908            .get_index(index_name)
909            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
910        let table_name = metadata.table_name.clone();
911
912        // Get the index data
913        let index_data = self
914            .get_index_data(index_name)
915            .ok_or_else(|| StorageError::IndexNotFound(index_name.to_string()))?;
916
917        // Get the table once for O(1) row access
918        let table = self
919            .get_table(&table_name)
920            .ok_or_else(|| StorageError::TableNotFound(table_name.clone()))?;
921
922        // Look up each prefix and collect results
923        let mut results = Vec::with_capacity(prefixes.len());
924        for prefix in prefixes {
925            let row_indices = index_data.prefix_scan(prefix);
926            let matched_rows: Vec<_> =
927                row_indices.iter().filter_map(|&idx| table.get_row(idx)).collect();
928            results.push(matched_rows);
929        }
930
931        Ok(results)
932    }
933
934    // ============================================================================
935    // Fast Delete API (High-Performance OLTP)
936    // ============================================================================
937
938    /// Delete a single row by PK value - fast path that skips unnecessary overhead
939    ///
940    /// This method provides a highly optimized DELETE path for single-row PK deletes.
941    /// It bypasses the full DELETE executor overhead when:
942    /// - There are no triggers on the table
943    /// - There are no foreign key constraints referencing this table
944    /// - The WHERE clause is a simple PK equality (`id = ?`)
945    ///
946    /// # Arguments
947    /// * `table_name` - Name of the table
948    /// * `pk_values` - Primary key values to match
949    ///
950    /// # Returns
951    /// * `Ok(true)` - Row was deleted
952    /// * `Ok(false)` - No row found with this PK
953    /// * `Err(StorageError)` - Table not found or other error
954    ///
955    /// # Performance
956    /// This is ~2-3x faster than the full DELETE executor because it:
957    /// - Uses direct PK index lookup (O(1))
958    /// - Avoids cloning row data
959    /// - Skips ExpressionEvaluator creation
960    /// - Performs minimal index maintenance
961    ///
962    /// # Profiling
963    /// Set environment variables to enable profiling:
964    /// - `DELETE_PROFILE=1` - Enable timing collection and auto-print summary on thread exit
965    /// - `DELETE_PROFILE_VERBOSE=1` - Also print per-delete breakdown to stderr
966    ///
967    /// Use `print_delete_profile_summary()` to manually print aggregate stats.
968    /// Use `reset_delete_profile_stats()` to reset the stats before a benchmark.
969    ///
970    /// # Safety
971    /// Caller must ensure:
972    /// - No triggers exist on this table for DELETE
973    /// - No foreign key constraints reference this table
974    ///
975    /// Note: WAL logging is handled internally by this method.
976    ///
977    /// # Example
978    /// ```text
979    /// // Fast delete by PK
980    /// let deleted = db.delete_by_pk_fast("users", &[SqlValue::Integer(42)])?;
981    /// if deleted {
982    ///     println!("User 42 deleted");
983    /// }
984    /// ```
985    pub fn delete_by_pk_fast(
986        &mut self,
987        table_name: &str,
988        pk_values: &[vibesql_types::SqlValue],
989    ) -> Result<bool, StorageError> {
990        use std::time::Instant;
991
992        // Check if profiling is enabled
993        let profile = std::env::var("DELETE_PROFILE").is_ok();
994        let start = if profile { Some(Instant::now()) } else { None };
995        let mut phase_times: [u128; 6] = [0; 6]; // pk_lookup, value_clone, wal, index_update, row_remove, cache
996
997        // First, find the row index and clone only the values (not the full Row struct)
998        // This avoids double-cloning: previously we cloned the Row, then cloned its values for WAL.
999        // Now we clone values only once and use a reference for index updates.
1000        let (row_index, values) = {
1001            let phase_start = start.map(|_| Instant::now());
1002            let table = self
1003                .get_table(table_name)
1004                .ok_or_else(|| StorageError::TableNotFound(table_name.to_string()))?;
1005
1006            let row_index = match table.primary_key_index() {
1007                Some(pk_index) => match pk_index.get(pk_values) {
1008                    Some(&idx) => idx,
1009                    None => return Ok(false), // Row not found
1010                },
1011                None => return Err(StorageError::Other("Table has no primary key".to_string())),
1012            };
1013            if let Some(ps) = phase_start {
1014                phase_times[0] = ps.elapsed().as_nanos(); // pk_lookup
1015            }
1016
1017            // Get the row values - clone once for both WAL and index updates
1018            let phase_start = start.map(|_| Instant::now());
1019            let values = match table.get_row(row_index) {
1020                Some(r) => r.values.clone(),
1021                None => return Ok(false), // Row already deleted
1022            };
1023            if let Some(ps) = phase_start {
1024                phase_times[1] = ps.elapsed().as_nanos(); // value_clone
1025            }
1026
1027            (row_index, values)
1028        };
1029
1030        // Update user-defined indexes first (using reference to values)
1031        // This must happen before we move ownership of values to WAL
1032        let phase_start = start.map(|_| Instant::now());
1033        self.operations.update_indexes_for_delete_with_values(
1034            &self.catalog,
1035            table_name,
1036            &values,
1037            row_index,
1038        );
1039        if let Some(ps) = phase_start {
1040            phase_times[3] = ps.elapsed().as_nanos(); // index_update
1041        }
1042
1043        // Emit WAL entry before deleting (needed for crash recovery)
1044        // Only emit if persistence is enabled to avoid unnecessary work
1045        // Move ownership of values to avoid a second clone
1046        let phase_start = start.map(|_| Instant::now());
1047        if self.persistence_enabled() {
1048            self.emit_wal_delete(table_name, row_index as u64, values.to_vec());
1049        }
1050        if let Some(ps) = phase_start {
1051            phase_times[2] = ps.elapsed().as_nanos(); // wal
1052        }
1053
1054        // Now delete the row (this updates the internal PK hash index)
1055        let phase_start = start.map(|_| Instant::now());
1056        let table_mut = self
1057            .get_table_mut(table_name)
1058            .ok_or_else(|| StorageError::TableNotFound(table_name.to_string()))?;
1059
1060        let delete_result = table_mut.delete_by_indices(&[row_index]);
1061
1062        // If compaction occurred, rebuild user-defined indexes since row indices changed
1063        if delete_result.compacted {
1064            self.rebuild_indexes(table_name);
1065        }
1066        if let Some(ps) = phase_start {
1067            phase_times[4] = ps.elapsed().as_nanos(); // row_remove
1068        }
1069
1070        // Phase 5: Invalidate columnar cache
1071        let phase_start = start.map(|_| Instant::now());
1072        if delete_result.deleted_count > 0 {
1073            self.invalidate_columnar_cache(table_name);
1074        }
1075        if let Some(ps) = phase_start {
1076            phase_times[5] = ps.elapsed().as_nanos(); // cache
1077        }
1078
1079        // Record and optionally print profile summary
1080        if let Some(s) = start {
1081            let total = s.elapsed().as_nanos();
1082
1083            // Record to thread-local aggregate stats
1084            DELETE_PROFILE_STATS.with(|stats| {
1085                stats.borrow_mut().record(&phase_times, total);
1086            });
1087
1088            // Print per-delete output only if DELETE_PROFILE_VERBOSE is set
1089            if std::env::var("DELETE_PROFILE_VERBOSE").is_ok() {
1090                let total_us = total as f64 / 1000.0;
1091                eprintln!(
1092                    "DELETE_PROFILE: total={:.1}µs | pk_lookup={:.1}µs ({:.0}%) | value_clone={:.1}µs ({:.0}%) | wal={:.1}µs ({:.0}%) | index_update={:.1}µs ({:.0}%) | row_remove={:.1}µs ({:.0}%) | cache={:.1}µs ({:.0}%)",
1093                    total_us,
1094                    phase_times[0] as f64 / 1000.0,
1095                    if total > 0 { phase_times[0] as f64 / total as f64 * 100.0 } else { 0.0 },
1096                    phase_times[1] as f64 / 1000.0,
1097                    if total > 0 { phase_times[1] as f64 / total as f64 * 100.0 } else { 0.0 },
1098                    phase_times[2] as f64 / 1000.0,
1099                    if total > 0 { phase_times[2] as f64 / total as f64 * 100.0 } else { 0.0 },
1100                    phase_times[3] as f64 / 1000.0,
1101                    if total > 0 { phase_times[3] as f64 / total as f64 * 100.0 } else { 0.0 },
1102                    phase_times[4] as f64 / 1000.0,
1103                    if total > 0 { phase_times[4] as f64 / total as f64 * 100.0 } else { 0.0 },
1104                    phase_times[5] as f64 / 1000.0,
1105                    if total > 0 { phase_times[5] as f64 / total as f64 * 100.0 } else { 0.0 },
1106                );
1107            }
1108        }
1109
1110        Ok(delete_result.deleted_count > 0)
1111    }
1112
1113    // ============================================================================
1114    // Table Index Info for DML Cost Estimation
1115    // ============================================================================
1116
1117    /// Get table index information for DML cost estimation
1118    ///
1119    /// This method collects all the metadata needed by `CostEstimator::estimate_insert()`,
1120    /// `estimate_update()`, and `estimate_delete()` to compute accurate DML operation costs.
1121    ///
1122    /// # Arguments
1123    /// * `table_name` - Name of the table to get index info for
1124    ///
1125    /// # Returns
1126    /// * `Some(TableIndexInfo)` - Index information if table exists
1127    /// * `None` - If table doesn't exist
1128    ///
1129    /// # Example
1130    /// ```text
1131    /// let info = db.get_table_index_info("users")?;
1132    /// let insert_cost = cost_estimator.estimate_insert(&info);
1133    /// ```
1134    pub fn get_table_index_info(
1135        &self,
1136        table_name: &str,
1137    ) -> Option<crate::statistics::TableIndexInfo> {
1138        // Get the table
1139        let table = self.get_table(table_name)?;
1140
1141        // Count hash indexes: 1 for PK (if exists) + 1 per unique constraint
1142        let has_primary_key = table.schema.primary_key.is_some();
1143        let unique_constraint_count = table.schema.unique_constraints.len();
1144        let hash_index_count = if has_primary_key { 1 } else { 0 } + unique_constraint_count;
1145
1146        // Count B-tree indexes (user-defined indexes managed at Database level)
1147        let btree_index_count = self.list_indexes_for_table(table_name).len();
1148
1149        // Calculate deleted ratio
1150        let total_rows = table.physical_row_count();
1151        let deleted_ratio =
1152            if total_rows > 0 { table.deleted_count() as f64 / total_rows as f64 } else { 0.0 };
1153
1154        // Check if table uses native columnar storage
1155        let is_native_columnar = table.is_native_columnar();
1156
1157        // Get average row size: prefer actual statistics over schema-based heuristics
1158        //
1159        // When statistics are available (from ANALYZE), use the actual avg_row_bytes
1160        // which accounts for real string fill ratios, NULL prevalence, and actual
1161        // BLOB sizes. This provides more accurate WAL cost estimation.
1162        //
1163        // Fall back to schema-based estimation when no statistics are available.
1164        // See issue #3980 for details.
1165        let avg_row_size = table
1166            .get_statistics()
1167            .and_then(|stats| stats.avg_row_bytes)
1168            .map(|bytes| bytes as usize)
1169            .unwrap_or_else(|| {
1170                // Fall back to schema-based estimation
1171                let column_types: Vec<_> =
1172                    table.schema.columns.iter().map(|col| col.data_type.clone()).collect();
1173                crate::statistics::estimate_row_size(&column_types)
1174            });
1175
1176        Some(crate::statistics::TableIndexInfo::new(
1177            hash_index_count,
1178            btree_index_count,
1179            is_native_columnar,
1180            deleted_ratio,
1181            avg_row_size,
1182        ))
1183    }
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188    use vibesql_catalog::{ColumnSchema, TableSchema};
1189    use vibesql_types::{DataType, SqlValue};
1190
1191    use super::*;
1192
1193    #[test]
1194    fn test_get_table_index_info_basic() {
1195        let mut db = Database::new();
1196
1197        // Create a table with primary key and one unique constraint
1198        let schema = TableSchema::with_all_constraints(
1199            "users".to_string(),
1200            vec![
1201                ColumnSchema::new("id".to_string(), DataType::Integer, false),
1202                ColumnSchema::new(
1203                    "email".to_string(),
1204                    DataType::Varchar { max_length: Some(100) },
1205                    false,
1206                ),
1207                ColumnSchema::new(
1208                    "name".to_string(),
1209                    DataType::Varchar { max_length: Some(100) },
1210                    true,
1211                ),
1212            ],
1213            Some(vec!["id".to_string()]),
1214            vec![vec!["email".to_string()]],
1215        );
1216        db.create_table(schema).unwrap();
1217
1218        // Get table index info
1219        let info = db.get_table_index_info("users").unwrap();
1220
1221        // Should have 2 hash indexes: 1 PK + 1 unique constraint
1222        assert_eq!(info.hash_index_count, 2);
1223        // No B-tree indexes yet
1224        assert_eq!(info.btree_index_count, 0);
1225        // Not native columnar
1226        assert!(!info.is_native_columnar);
1227        // No deleted rows
1228        assert_eq!(info.deleted_ratio, 0.0);
1229    }
1230
1231    #[test]
1232    fn test_get_table_index_info_with_btree_index() {
1233        use vibesql_ast::IndexColumn;
1234
1235        let mut db = Database::new();
1236
1237        // Create a table with primary key
1238        let schema = TableSchema::with_primary_key(
1239            "products".to_string(),
1240            vec![
1241                ColumnSchema::new("id".to_string(), DataType::Integer, false),
1242                ColumnSchema::new(
1243                    "name".to_string(),
1244                    DataType::Varchar { max_length: Some(100) },
1245                    false,
1246                ),
1247                ColumnSchema::new(
1248                    "price".to_string(),
1249                    DataType::Decimal { precision: 10, scale: 2 },
1250                    false,
1251                ),
1252            ],
1253            vec!["id".to_string()],
1254        );
1255        db.create_table(schema).unwrap();
1256
1257        // Create a B-tree index on price
1258        db.create_index(
1259            "idx_products_price".to_string(),
1260            "products".to_string(),
1261            false,
1262            vec![IndexColumn::Column {
1263                column_name: "price".to_string(),
1264                direction: vibesql_ast::OrderDirection::Asc,
1265                prefix_length: None,
1266            }],
1267        )
1268        .unwrap();
1269
1270        // Get table index info
1271        let info = db.get_table_index_info("products").unwrap();
1272
1273        // Should have 1 hash index (PK)
1274        assert_eq!(info.hash_index_count, 1);
1275        // Should have 1 B-tree index
1276        assert_eq!(info.btree_index_count, 1);
1277    }
1278
1279    #[test]
1280    fn test_get_table_index_info_with_deleted_rows() {
1281        let mut db = Database::new();
1282
1283        // Create a table with primary key
1284        let schema = TableSchema::with_primary_key(
1285            "items".to_string(),
1286            vec![
1287                ColumnSchema::new("id".to_string(), DataType::Integer, false),
1288                ColumnSchema::new(
1289                    "name".to_string(),
1290                    DataType::Varchar { max_length: Some(100) },
1291                    false,
1292                ),
1293            ],
1294            vec!["id".to_string()],
1295        );
1296        db.create_table(schema).unwrap();
1297
1298        // Insert 10 rows
1299        for i in 0..10 {
1300            let row = Row::new(vec![
1301                SqlValue::Integer(i),
1302                SqlValue::Varchar(format!("Item {}", i).into()),
1303            ]);
1304            db.insert_row("items", row).unwrap();
1305        }
1306
1307        // Get initial info - no deleted rows
1308        let info = db.get_table_index_info("items").unwrap();
1309        assert_eq!(info.deleted_ratio, 0.0);
1310
1311        // Delete 3 rows (30% deletion)
1312        db.delete_by_pk_fast("items", &[SqlValue::Integer(0)]).unwrap();
1313        db.delete_by_pk_fast("items", &[SqlValue::Integer(1)]).unwrap();
1314        db.delete_by_pk_fast("items", &[SqlValue::Integer(2)]).unwrap();
1315
1316        // Get updated info - should show deleted ratio
1317        let info = db.get_table_index_info("items").unwrap();
1318        // 3 deleted out of 10 = 0.3
1319        assert!((info.deleted_ratio - 0.3).abs() < 0.01);
1320    }
1321
1322    #[test]
1323    fn test_get_table_index_info_nonexistent_table() {
1324        let db = Database::new();
1325
1326        // Should return None for nonexistent table
1327        let info = db.get_table_index_info("nonexistent");
1328        assert!(info.is_none());
1329    }
1330
1331    #[test]
1332    fn test_get_table_index_info_no_primary_key() {
1333        let mut db = Database::new();
1334
1335        // Create a table without primary key
1336        let schema = TableSchema::new(
1337            "logs".to_string(),
1338            vec![
1339                ColumnSchema::new(
1340                    "message".to_string(),
1341                    DataType::Varchar { max_length: Some(500) },
1342                    false,
1343                ),
1344                ColumnSchema::new("level".to_string(), DataType::Integer, false),
1345            ],
1346        );
1347        db.create_table(schema).unwrap();
1348
1349        // Get table index info
1350        let info = db.get_table_index_info("logs").unwrap();
1351
1352        // Should have 0 hash indexes (no PK, no unique constraints)
1353        assert_eq!(info.hash_index_count, 0);
1354        // No B-tree indexes
1355        assert_eq!(info.btree_index_count, 0);
1356    }
1357
1358    #[test]
1359    fn test_get_table_index_info_multiple_unique_constraints() {
1360        let mut db = Database::new();
1361
1362        // Create a table with PK and multiple unique constraints
1363        let schema = TableSchema::with_all_constraints(
1364            "accounts".to_string(),
1365            vec![
1366                ColumnSchema::new("id".to_string(), DataType::Integer, false),
1367                ColumnSchema::new(
1368                    "email".to_string(),
1369                    DataType::Varchar { max_length: Some(100) },
1370                    false,
1371                ),
1372                ColumnSchema::new(
1373                    "username".to_string(),
1374                    DataType::Varchar { max_length: Some(50) },
1375                    false,
1376                ),
1377                ColumnSchema::new(
1378                    "phone".to_string(),
1379                    DataType::Varchar { max_length: Some(20) },
1380                    true,
1381                ),
1382            ],
1383            Some(vec!["id".to_string()]),
1384            vec![vec!["email".to_string()], vec!["username".to_string()]],
1385        );
1386        db.create_table(schema).unwrap();
1387
1388        // Get table index info
1389        let info = db.get_table_index_info("accounts").unwrap();
1390
1391        // Should have 3 hash indexes: 1 PK + 2 unique constraints
1392        assert_eq!(info.hash_index_count, 3);
1393    }
1394
1395    // ============================================================================
1396    // avg_row_size Statistics Tests (Issue #3980)
1397    // ============================================================================
1398
1399    #[test]
1400    fn test_get_table_index_info_uses_schema_estimate_without_stats() {
1401        let mut db = Database::new();
1402
1403        // Create a table with VARCHAR columns
1404        let schema = TableSchema::with_primary_key(
1405            "items".to_string(),
1406            vec![
1407                ColumnSchema::new("id".to_string(), DataType::Integer, false),
1408                ColumnSchema::new(
1409                    "name".to_string(),
1410                    DataType::Varchar { max_length: Some(100) },
1411                    false,
1412                ),
1413            ],
1414            vec!["id".to_string()],
1415        );
1416        db.create_table(schema).unwrap();
1417
1418        // Without ANALYZE, should use schema-based estimation
1419        let info = db.get_table_index_info("items").unwrap();
1420
1421        // avg_row_size should be computed from schema heuristics
1422        // INTEGER (4) + VARCHAR(100) estimate (32) + overhead (8) = 44, min 64
1423        assert!(
1424            info.avg_row_size >= 64,
1425            "Schema-based avg_row_size should be at least base size: {}",
1426            info.avg_row_size
1427        );
1428    }
1429
1430    #[test]
1431    fn test_get_table_index_info_prefers_actual_statistics() {
1432        let mut db = Database::new();
1433
1434        // Create a table with VARCHAR columns
1435        let schema = TableSchema::with_primary_key(
1436            "items".to_string(),
1437            vec![
1438                ColumnSchema::new("id".to_string(), DataType::Integer, false),
1439                ColumnSchema::new(
1440                    "description".to_string(),
1441                    DataType::Varchar { max_length: Some(1000) },
1442                    false,
1443                ),
1444            ],
1445            vec!["id".to_string()],
1446        );
1447        db.create_table(schema).unwrap();
1448
1449        // Insert rows with LONG strings (filling the VARCHAR)
1450        for i in 0..10 {
1451            let long_description = "x".repeat(800); // Much longer than schema heuristic
1452            let row =
1453                Row::new(vec![SqlValue::Integer(i), SqlValue::Varchar(long_description.into())]);
1454            db.insert_row("items", row).unwrap();
1455        }
1456
1457        // Get info WITHOUT statistics - uses schema heuristic
1458        let info_without_stats = db.get_table_index_info("items").unwrap();
1459        let schema_estimate = info_without_stats.avg_row_size;
1460
1461        // Run ANALYZE to compute actual statistics
1462        db.get_table_mut("items").unwrap().analyze();
1463
1464        // Get info WITH statistics - should prefer actual avg_row_bytes
1465        let info_with_stats = db.get_table_index_info("items").unwrap();
1466        let actual_estimate = info_with_stats.avg_row_size;
1467
1468        // Actual statistics should show larger row size due to long strings
1469        // Schema heuristic: VARCHAR(1000) → min(500, 32) = 32 bytes
1470        // Actual data: 800 bytes per string
1471        assert!(
1472            actual_estimate > schema_estimate,
1473            "Actual statistics ({}) should show larger row size than schema heuristic ({}) for long strings",
1474            actual_estimate,
1475            schema_estimate
1476        );
1477    }
1478
1479    #[test]
1480    fn test_get_table_index_info_statistics_vs_schema_short_strings() {
1481        let mut db = Database::new();
1482
1483        // Create a table with VARCHAR columns
1484        let schema = TableSchema::with_primary_key(
1485            "items".to_string(),
1486            vec![
1487                ColumnSchema::new("id".to_string(), DataType::Integer, false),
1488                ColumnSchema::new(
1489                    "code".to_string(),
1490                    DataType::Varchar { max_length: Some(100) },
1491                    false,
1492                ),
1493            ],
1494            vec!["id".to_string()],
1495        );
1496        db.create_table(schema).unwrap();
1497
1498        // Insert rows with SHORT strings (much shorter than heuristic)
1499        for i in 0..10 {
1500            let short_code = format!("A{}", i); // 2-3 chars, much shorter than 32-byte heuristic
1501            let row = Row::new(vec![SqlValue::Integer(i), SqlValue::Varchar(short_code.into())]);
1502            db.insert_row("items", row).unwrap();
1503        }
1504
1505        // Run ANALYZE to compute actual statistics
1506        db.get_table_mut("items").unwrap().analyze();
1507
1508        // Get info WITH statistics
1509        let info = db.get_table_index_info("items").unwrap();
1510
1511        // Should have valid avg_row_size (from actual statistics)
1512        assert!(info.avg_row_size > 0, "avg_row_size should be positive: {}", info.avg_row_size);
1513    }
1514}