vibesql_storage/database/
table_api.rs

1// ============================================================================
2// Table Operations API
3// ============================================================================
4//
5// This module provides table management methods for the Database struct.
6// Includes create, drop, insert, update operations.
7
8use super::transactions::TransactionChange;
9use super::Database;
10use crate::change_events::ChangeEvent;
11use crate::wal::WalOp;
12use crate::{Row, StorageError, Table};
13use vibesql_catalog::TableIdentifier;
14
15impl Database {
16    // ============================================================================
17    // Table Operations
18    // ============================================================================
19
20    /// Check if a table name refers to a temporary table (in any temp schema)
21    ///
22    /// Temporary tables are not persisted to WAL.
23    /// This checks if the table is in ANY temp schema (temp_1, temp_2, etc.)
24    fn is_temp_table(&self, table_name: &str) -> bool {
25        // Check if the table name is qualified with a temp schema prefix (e.g., "temp_1.foo")
26        if let Some((schema, _)) = table_name.split_once('.') {
27            vibesql_catalog::Catalog::is_temp_schema(schema)
28        } else {
29            // Unqualified name - check if it exists in this session's temp schema
30            let temp_qualified = format!(
31                "{}.{}",
32                self.catalog.temp_schema_name(),
33                table_name.to_lowercase()
34            );
35            self.tables.contains_key(&temp_qualified)
36        }
37    }
38
39    /// Create a table with SQL:1999 identifier semantics.
40    ///
41    /// The `identifier` parameter determines how the table name is stored:
42    /// - Quoted identifiers: stored with exact case
43    /// - Unquoted identifiers: stored with lowercase canonical form
44    /// - Qualified identifiers: schema and table have independent case handling
45    ///
46    /// Temporary tables (in the "temp" schema) are not persisted to WAL.
47    pub fn create_table_with_identifier(
48        &mut self,
49        schema: vibesql_catalog::TableSchema,
50        identifier: TableIdentifier,
51    ) -> Result<(), StorageError> {
52        self.catalog
53            .create_table_with_identifier(schema.clone(), identifier.clone())
54            .map_err(|e| StorageError::CatalogError(e.to_string()))?;
55
56        // Build qualified name from identifier
57        let qualified_name = if identifier.is_qualified() {
58            // Identifier already includes schema qualification
59            identifier.canonical().to_string()
60        } else {
61            // Add current schema to unqualified identifier
62            let current_schema = &self.catalog.get_current_schema();
63            format!("{}.{}", current_schema, identifier.canonical())
64        };
65
66        // Check if this is a temporary table (in any temp schema)
67        // Temp tables are not persisted to WAL
68        let is_temp = identifier
69            .schema_canonical()
70            .is_some_and(|s| vibesql_catalog::Catalog::is_temp_schema(s));
71
72        if !is_temp {
73            // Assign table ID and emit WAL entry for persistence
74            let table_id = self.next_table_id();
75
76            // Serialize schema for WAL (use a simple binary format)
77            let schema_data = serialize_table_schema(&schema);
78
79            self.emit_wal_op(WalOp::CreateTable {
80                table_id,
81                table_name: qualified_name.clone(),
82                schema_data,
83            });
84        }
85
86        let table = Table::new(schema);
87        self.tables.insert(qualified_name, table);
88
89        Ok(())
90    }
91
92    /// Create a table
93    /// Legacy method - uses global case_sensitive_identifiers setting
94    pub fn create_table(
95        &mut self,
96        schema: vibesql_catalog::TableSchema,
97    ) -> Result<(), StorageError> {
98        let table_name = schema.name.clone();
99
100        self.operations.create_table(&mut self.catalog, schema.clone())?;
101
102        // Normalize table name for storage (matches catalog normalization)
103        let normalized_table_name = if self.catalog.is_case_sensitive_identifiers() {
104            table_name.clone()
105        } else {
106            table_name.to_lowercase()
107        };
108
109        let current_schema = &self.catalog.get_current_schema();
110        let qualified_name = format!("{}.{}", current_schema, normalized_table_name);
111
112        // Assign table ID and emit WAL entry for persistence
113        let table_id = self.next_table_id();
114
115        // Serialize schema for WAL (use a simple binary format)
116        let schema_data = serialize_table_schema(&schema);
117
118        self.emit_wal_op(WalOp::CreateTable {
119            table_id,
120            table_name: qualified_name.clone(),
121            schema_data,
122        });
123
124        let table = Table::new(schema);
125        self.tables.insert(qualified_name, table);
126
127        Ok(())
128    }
129
130    /// Get a table by identifier using SQL:1999 case semantics.
131    ///
132    /// Uses the canonical form of the identifier for direct lookup without fallbacks.
133    /// Supports both simple and schema-qualified identifiers.
134    pub fn get_table_by_identifier(&self, identifier: &TableIdentifier) -> Option<&Table> {
135        let qualified_name = if identifier.is_qualified() {
136            // Identifier already includes schema qualification
137            identifier.canonical().to_string()
138        } else {
139            // Add current schema to unqualified identifier
140            let current_schema = &self.catalog.get_current_schema();
141            format!("{}.{}", current_schema, identifier.canonical())
142        };
143        self.tables.get(&qualified_name)
144    }
145
146    /// Get a table for reading
147    /// Legacy method with fallback lookups for backward compatibility
148    ///
149    /// For unqualified names, checks temp schema first (SQLite semantics).
150    /// SQLite Compatibility: The "temp" schema name is mapped to the session's
151    /// temp schema, allowing `temp.tablename` syntax.
152    pub fn get_table(&self, name: &str) -> Option<&Table> {
153        // For qualified names with "temp" schema, resolve to session's temp schema
154        // This enables `SELECT * FROM temp.t1` syntax
155        let resolved_name = if let Some((schema_part, table_part)) = name.split_once('.') {
156            if schema_part.eq_ignore_ascii_case(vibesql_catalog::TEMP_SCHEMA) {
157                std::borrow::Cow::Owned(format!(
158                    "{}.{}",
159                    self.catalog.temp_schema_name(),
160                    table_part
161                ))
162            } else {
163                std::borrow::Cow::Borrowed(name)
164            }
165        } else {
166            std::borrow::Cow::Borrowed(name)
167        };
168        let name = resolved_name.as_ref();
169
170        // Try the name as-is first (for delimited identifiers)
171        if let Some(table) = self.tables.get(name) {
172            return Some(table);
173        }
174
175        // Try lowercase normalization (standard for unquoted identifiers)
176        let lowercase_name = name.to_lowercase();
177        if lowercase_name != name {
178            if let Some(table) = self.tables.get(&lowercase_name) {
179                return Some(table);
180            }
181        }
182
183        // Try uppercase normalization (for backward compatibility with old data)
184        let uppercase_name = name.to_uppercase();
185        if uppercase_name != name && uppercase_name != lowercase_name {
186            if let Some(table) = self.tables.get(&uppercase_name) {
187                return Some(table);
188            }
189        }
190
191        // For qualified names (schema.table), try normalizing each part separately
192        // This handles the case where storage normalized table name but not schema
193        if let Some((schema_part, table_part)) = name.split_once('.') {
194            // Try schema lowercase, table uppercase (current storage behavior)
195            let mixed_case =
196                format!("{}.{}", schema_part.to_lowercase(), table_part.to_uppercase());
197            if mixed_case != name && mixed_case != uppercase_name && mixed_case != lowercase_name {
198                if let Some(table) = self.tables.get(&mixed_case) {
199                    return Some(table);
200                }
201            }
202        }
203
204        // For unqualified names, check session's temp schema first (SQLite semantics)
205        // Temp tables shadow tables in the main schema
206        if !name.contains('.') {
207            // Check session's temp schema first
208            let temp_qualified = format!("{}.{}", self.catalog.temp_schema_name(), lowercase_name);
209            if let Some(table) = self.tables.get(&temp_qualified) {
210                return Some(table);
211            }
212
213            let current_schema = &self.catalog.get_current_schema();
214
215            // Try as-is with schema prefix
216            let qualified_name_original = format!("{}.{}", current_schema, name);
217            if let Some(table) = self.tables.get(&qualified_name_original) {
218                return Some(table);
219            }
220
221            // Try uppercase with schema prefix
222            let qualified_name_uppercase = format!("{}.{}", current_schema, uppercase_name);
223            if qualified_name_uppercase != qualified_name_original {
224                if let Some(table) = self.tables.get(&qualified_name_uppercase) {
225                    return Some(table);
226                }
227            }
228
229            // Try lowercase with schema prefix
230            let qualified_name_lowercase = format!("{}.{}", current_schema, lowercase_name);
231            if qualified_name_lowercase != qualified_name_original
232                && qualified_name_lowercase != qualified_name_uppercase
233            {
234                return self.tables.get(&qualified_name_lowercase);
235            }
236        }
237
238        None
239    }
240
241    /// Get a table for writing
242    ///
243    /// For unqualified names, checks temp schema first (SQLite semantics).
244    /// SQLite Compatibility: The "temp" schema name is mapped to the session's
245    /// temp schema, allowing `temp.tablename` syntax.
246    pub fn get_table_mut(&mut self, name: &str) -> Option<&mut Table> {
247        // For qualified names with "temp" schema, resolve to session's temp schema
248        // This enables `UPDATE temp.t1 SET ...` syntax
249        let resolved_name = if let Some((schema_part, table_part)) = name.split_once('.') {
250            if schema_part.eq_ignore_ascii_case(vibesql_catalog::TEMP_SCHEMA) {
251                Some(format!("{}.{}", self.catalog.temp_schema_name(), table_part))
252            } else {
253                None
254            }
255        } else {
256            None
257        };
258        let name = resolved_name.as_deref().unwrap_or(name);
259
260        // Try the name as-is first (for delimited identifiers)
261        if self.tables.contains_key(name) {
262            return self.tables.get_mut(name);
263        }
264
265        // Try lowercase normalization (standard for unquoted identifiers)
266        let lowercase_name = name.to_lowercase();
267        if lowercase_name != name && self.tables.contains_key(&lowercase_name) {
268            return self.tables.get_mut(&lowercase_name);
269        }
270
271        // Try uppercase normalization (for backward compatibility with old data)
272        let uppercase_name = name.to_uppercase();
273        if uppercase_name != name
274            && uppercase_name != lowercase_name
275            && self.tables.contains_key(&uppercase_name)
276        {
277            return self.tables.get_mut(&uppercase_name);
278        }
279
280        // For unqualified names, check session's temp schema first (SQLite semantics)
281        // Temp tables shadow tables in the main schema
282        if !name.contains('.') {
283            // Check session's temp schema first
284            let temp_qualified = format!("{}.{}", self.catalog.temp_schema_name(), lowercase_name);
285            if self.tables.contains_key(&temp_qualified) {
286                return self.tables.get_mut(&temp_qualified);
287            }
288
289            let current_schema = &self.catalog.get_current_schema().to_string();
290
291            // Try as-is with schema prefix
292            let qualified_name_original = format!("{}.{}", current_schema, name);
293            if self.tables.contains_key(&qualified_name_original) {
294                return self.tables.get_mut(&qualified_name_original);
295            }
296
297            // Try lowercase with schema prefix (standard)
298            let qualified_name_lowercase = format!("{}.{}", current_schema, lowercase_name);
299            if qualified_name_lowercase != qualified_name_original
300                && self.tables.contains_key(&qualified_name_lowercase)
301            {
302                return self.tables.get_mut(&qualified_name_lowercase);
303            }
304
305            // Try uppercase with schema prefix (backward compatibility)
306            let qualified_name_uppercase = format!("{}.{}", current_schema, uppercase_name);
307            if qualified_name_uppercase != qualified_name_original
308                && qualified_name_uppercase != qualified_name_lowercase
309                && self.tables.contains_key(&qualified_name_uppercase)
310            {
311                return self.tables.get_mut(&qualified_name_uppercase);
312            }
313        }
314
315        None
316    }
317
318    /// Drop a table
319    ///
320    /// Temporary tables (in the "temp" schema) are not persisted to WAL.
321    pub fn drop_table(&mut self, name: &str) -> Result<(), StorageError> {
322        // Emit WAL entry for persistence before dropping (skip for temp tables)
323        if !self.is_temp_table(name) {
324            self.emit_wal_op(WalOp::DropTable {
325                table_id: self.table_name_to_id(name),
326                table_name: name.to_string(),
327            });
328        }
329
330        // Invalidate columnar cache before dropping
331        self.columnar_cache.invalidate(name);
332        self.operations.drop_table(&mut self.catalog, &mut self.tables, name)
333    }
334
335    /// Insert a row into a table
336    ///
337    /// Temporary tables (in the "temp" schema) are not persisted to WAL.
338    pub fn insert_row(&mut self, table_name: &str, row: Row) -> Result<(), StorageError> {
339        let row_index =
340            self.operations.insert_row(&self.catalog, &mut self.tables, table_name, row.clone())?;
341
342        self.record_change(TransactionChange::Insert {
343            table_name: table_name.to_string(),
344            row: row.clone(),
345        });
346
347        // Emit WAL entry for persistence (skip for temp tables)
348        if !self.is_temp_table(table_name) {
349            self.emit_wal_op(WalOp::Insert {
350                table_id: self.table_name_to_id(table_name),
351                row_id: row_index as u64,
352                values: row.values.to_vec(),
353            });
354        }
355
356        // Broadcast change event to subscribers
357        self.broadcast_change(ChangeEvent::Insert {
358            table_name: table_name.to_string(),
359            row_index,
360        });
361
362        // Invalidate columnar cache for this table
363        self.columnar_cache.invalidate(table_name);
364
365        Ok(())
366    }
367
368    /// Insert multiple rows into a table in a single batch
369    ///
370    /// This method is optimized for bulk data loading and provides significant
371    /// performance improvements over repeated `insert_row` calls:
372    ///
373    /// - **Pre-allocation**: Vector capacity reserved upfront
374    /// - **Batch validation**: All rows validated before any insertion
375    /// - **Deferred index rebuild**: Indexes rebuilt once after all inserts
376    /// - **Single cache invalidation**: Columnar cache invalidated once at end
377    ///
378    /// # Arguments
379    ///
380    /// * `table_name` - Name of the table to insert into
381    /// * `rows` - Vector of rows to insert
382    ///
383    /// # Returns
384    ///
385    /// * `Ok(usize)` - Number of rows successfully inserted
386    /// * `Err(StorageError)` - If validation fails (no rows inserted on error)
387    ///
388    /// # Performance
389    ///
390    /// For large batches (1000+ rows), expect 10-50x speedup vs single-row inserts.
391    ///
392    /// # Example
393    ///
394    /// ```text
395    /// let rows = vec![
396    ///     Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))]),
397    ///     Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))]),
398    /// ];
399    /// let count = db.insert_rows_batch("users", rows)?;
400    /// ```
401    pub fn insert_rows_batch(
402        &mut self,
403        table_name: &str,
404        rows: Vec<Row>,
405    ) -> Result<usize, StorageError> {
406        if rows.is_empty() {
407            return Ok(0);
408        }
409
410        let row_indices = self.operations.insert_rows_batch(
411            &self.catalog,
412            &mut self.tables,
413            table_name,
414            rows.clone(),
415        )?;
416
417        let table_id = self.table_name_to_id(table_name);
418        let is_temp = self.is_temp_table(table_name);
419
420        // Record changes for transaction management, emit WAL entries, and broadcast events
421        for (row, &row_index) in rows.into_iter().zip(row_indices.iter()) {
422            self.record_change(TransactionChange::Insert {
423                table_name: table_name.to_string(),
424                row: row.clone(),
425            });
426
427            // Emit WAL entry for persistence (skip for temp tables)
428            if !is_temp {
429                self.emit_wal_op(WalOp::Insert {
430                    table_id,
431                    row_id: row_index as u64,
432                    values: row.values.to_vec(),
433                });
434            }
435
436            // Broadcast change event to subscribers
437            self.broadcast_change(ChangeEvent::Insert {
438                table_name: table_name.to_string(),
439                row_index,
440            });
441        }
442
443        // Invalidate columnar cache for this table
444        self.columnar_cache.invalidate(table_name);
445
446        Ok(row_indices.len())
447    }
448
449    /// Insert rows from an iterator in a streaming fashion
450    ///
451    /// This method is optimized for very large datasets that may not fit
452    /// in memory all at once. Rows are processed in configurable batch sizes,
453    /// balancing memory usage with performance.
454    ///
455    /// # Arguments
456    ///
457    /// * `table_name` - Name of the table to insert into
458    /// * `rows` - Iterator yielding rows to insert
459    /// * `batch_size` - Number of rows per batch (0 defaults to 1000)
460    ///
461    /// # Returns
462    ///
463    /// * `Ok(usize)` - Total number of rows successfully inserted
464    /// * `Err(StorageError)` - If any batch fails validation
465    ///
466    /// # Note
467    ///
468    /// Unlike `insert_rows_batch`, this method commits rows batch-by-batch.
469    /// A failure partway through will leave previously committed batches
470    /// in the table. Use `insert_rows_batch` for all-or-nothing semantics.
471    ///
472    /// # Example
473    ///
474    /// ```text
475    /// // Stream 100K rows in batches of 5000
476    /// let rows = (0..100_000).map(|i| Row::new(vec![SqlValue::Integer(i)]));
477    /// let count = db.insert_rows_iter("numbers", rows, 5000)?;
478    /// ```
479    pub fn insert_rows_iter<I>(
480        &mut self,
481        table_name: &str,
482        rows: I,
483        batch_size: usize,
484    ) -> Result<usize, StorageError>
485    where
486        I: Iterator<Item = Row>,
487    {
488        let batch_size = if batch_size == 0 { 1000 } else { batch_size };
489        let mut total_inserted = 0;
490        let mut batch = Vec::with_capacity(batch_size);
491
492        for row in rows {
493            batch.push(row);
494
495            if batch.len() >= batch_size {
496                let count = self.insert_rows_batch(table_name, std::mem::take(&mut batch))?;
497                total_inserted += count;
498                batch = Vec::with_capacity(batch_size);
499            }
500        }
501
502        // Insert any remaining rows
503        if !batch.is_empty() {
504            let count = self.insert_rows_batch(table_name, batch)?;
505            total_inserted += count;
506        }
507
508        Ok(total_inserted)
509    }
510
511    /// Update a single row by primary key value (direct API, no SQL parsing)
512    ///
513    /// This method provides a high-performance update path that bypasses SQL parsing,
514    /// making it suitable for benchmarking and performance-critical code paths.
515    ///
516    /// # Arguments
517    ///
518    /// * `table_name` - Name of the table
519    /// * `pk_value` - Primary key value to match (single column PK only)
520    /// * `column_updates` - List of (column_name, new_value) pairs to update
521    ///
522    /// # Returns
523    ///
524    /// * `Ok(true)` - Row was found and updated
525    /// * `Ok(false)` - Row was not found (no error)
526    /// * `Err(StorageError)` - Table not found, column not found, or constraint violation
527    ///
528    /// # Example
529    ///
530    /// ```text
531    /// // Update column 'name' for row with id=5
532    /// let updated = db.update_row_by_pk(
533    ///     "users",
534    ///     SqlValue::Integer(5),
535    ///     vec![("name", SqlValue::Varchar(arcstr::ArcStr::from("Alice")))],
536    /// )?;
537    /// ```
538    pub fn update_row_by_pk(
539        &mut self,
540        table_name: &str,
541        pk_value: vibesql_types::SqlValue,
542        column_updates: Vec<(&str, vibesql_types::SqlValue)>,
543    ) -> Result<bool, StorageError> {
544        // First phase: read data (immutable borrow)
545        let (row_index, old_row, schema, resolved_name) = {
546            // Get table using existing lookup logic (handles schema prefixes)
547            let table = self
548                .get_table(table_name)
549                .ok_or_else(|| StorageError::TableNotFound(table_name.to_string()))?;
550
551            // Look up row by PK
552            let pk_index = table
553                .primary_key_index()
554                .ok_or_else(|| StorageError::Other("Table has no primary key index".to_string()))?;
555
556            let row_index = match pk_index.get(&vec![pk_value.clone()]) {
557                Some(&idx) => idx,
558                None => return Ok(false), // Row not found
559            };
560
561            // Get old row and schema
562            let old_row = table.scan()[row_index].clone();
563            let schema = table.schema.clone();
564            let resolved_name = schema.name.clone();
565
566            (row_index, old_row, schema, resolved_name)
567        };
568
569        // Second phase: apply updates
570        let mut new_row = old_row.clone();
571        let mut changed_columns = std::collections::HashSet::new();
572
573        for (col_name, new_value) in &column_updates {
574            let col_index =
575                schema.get_column_index(col_name).ok_or_else(|| StorageError::ColumnNotFound {
576                    column_name: col_name.to_string(),
577                    table_name: resolved_name.clone(),
578                })?;
579
580            // Check NOT NULL constraint
581            let column = &schema.columns[col_index];
582            if !column.nullable && *new_value == vibesql_types::SqlValue::Null {
583                return Err(StorageError::NullConstraintViolation { column: col_name.to_string() });
584            }
585
586            new_row.set(col_index, new_value.clone())?;
587            changed_columns.insert(col_index);
588        }
589
590        // Third phase: write data (mutable borrow)
591        let table_mut = self.get_table_mut(table_name).unwrap();
592        table_mut.update_row_selective(row_index, new_row.clone(), &changed_columns)?;
593
594        // Update user-defined indexes (pass changed_columns to skip unaffected indexes)
595        self.operations.update_indexes_for_update(
596            &self.catalog,
597            &resolved_name,
598            &old_row,
599            &new_row,
600            row_index,
601            Some(&changed_columns),
602        );
603
604        // Emit WAL entry for persistence (skip for temp tables)
605        if !self.is_temp_table(&resolved_name) {
606            self.emit_wal_op(WalOp::Update {
607                table_id: self.table_name_to_id(&resolved_name),
608                row_id: row_index as u64,
609                old_values: old_row.values.to_vec(),
610                new_values: new_row.values.to_vec(),
611            });
612        }
613
614        // Broadcast change event to subscribers
615        self.broadcast_change(ChangeEvent::Update { table_name: resolved_name.clone(), row_index });
616
617        // Invalidate columnar cache
618        self.columnar_cache.invalidate(&resolved_name);
619
620        Ok(true)
621    }
622
623    /// List all table names
624    pub fn list_tables(&self) -> Vec<String> {
625        self.catalog.list_tables()
626    }
627}
628
629/// Serialize a TableSchema to bytes for WAL storage
630///
631/// Uses a simple format: JSON serialization of the schema.
632/// This is for WAL recovery purposes and doesn't need to be maximally efficient.
633pub(super) fn serialize_table_schema(schema: &vibesql_catalog::TableSchema) -> Vec<u8> {
634    // Simple approach: serialize the table name and column info as text
635    // Format: table_name\0col1_name\0col1_type\0nullable\0...
636    let mut data = Vec::new();
637
638    // Write table name
639    data.extend_from_slice(schema.name.as_bytes());
640    data.push(0);
641
642    // Write column count
643    data.extend_from_slice(&(schema.columns.len() as u32).to_le_bytes());
644
645    // Write each column
646    for col in &schema.columns {
647        // Column name
648        data.extend_from_slice(col.name.as_bytes());
649        data.push(0);
650
651        // Data type (as debug string for simplicity)
652        let type_str = format!("{:?}", col.data_type);
653        data.extend_from_slice(type_str.as_bytes());
654        data.push(0);
655
656        // Nullable flag
657        data.push(if col.nullable { 1 } else { 0 });
658    }
659
660    data
661}