vibesql_storage/database/table_api.rs
1// ============================================================================
2// Table Operations API
3// ============================================================================
4//
5// This module provides table management methods for the Database struct.
6// Includes create, drop, insert, update operations.
7
8use super::transactions::TransactionChange;
9use super::Database;
10use crate::change_events::ChangeEvent;
11use crate::wal::WalOp;
12use crate::{Row, StorageError, Table};
13use vibesql_catalog::TableIdentifier;
14
15impl Database {
16 // ============================================================================
17 // Table Operations
18 // ============================================================================
19
20 /// Check if a table name refers to a temporary table (in any temp schema)
21 ///
22 /// Temporary tables are not persisted to WAL.
23 /// This checks if the table is in ANY temp schema (temp_1, temp_2, etc.)
24 fn is_temp_table(&self, table_name: &str) -> bool {
25 // Check if the table name is qualified with a temp schema prefix (e.g., "temp_1.foo")
26 if let Some((schema, _)) = table_name.split_once('.') {
27 vibesql_catalog::Catalog::is_temp_schema(schema)
28 } else {
29 // Unqualified name - check if it exists in this session's temp schema
30 let temp_qualified = format!(
31 "{}.{}",
32 self.catalog.temp_schema_name(),
33 table_name.to_lowercase()
34 );
35 self.tables.contains_key(&temp_qualified)
36 }
37 }
38
39 /// Create a table with SQL:1999 identifier semantics.
40 ///
41 /// The `identifier` parameter determines how the table name is stored:
42 /// - Quoted identifiers: stored with exact case
43 /// - Unquoted identifiers: stored with lowercase canonical form
44 /// - Qualified identifiers: schema and table have independent case handling
45 ///
46 /// Temporary tables (in the "temp" schema) are not persisted to WAL.
47 pub fn create_table_with_identifier(
48 &mut self,
49 schema: vibesql_catalog::TableSchema,
50 identifier: TableIdentifier,
51 ) -> Result<(), StorageError> {
52 self.catalog
53 .create_table_with_identifier(schema.clone(), identifier.clone())
54 .map_err(|e| StorageError::CatalogError(e.to_string()))?;
55
56 // Build qualified name from identifier
57 let qualified_name = if identifier.is_qualified() {
58 // Identifier already includes schema qualification
59 identifier.canonical().to_string()
60 } else {
61 // Add current schema to unqualified identifier
62 let current_schema = &self.catalog.get_current_schema();
63 format!("{}.{}", current_schema, identifier.canonical())
64 };
65
66 // Check if this is a temporary table (in any temp schema)
67 // Temp tables are not persisted to WAL
68 let is_temp = identifier
69 .schema_canonical()
70 .is_some_and(|s| vibesql_catalog::Catalog::is_temp_schema(s));
71
72 if !is_temp {
73 // Assign table ID and emit WAL entry for persistence
74 let table_id = self.next_table_id();
75
76 // Serialize schema for WAL (use a simple binary format)
77 let schema_data = serialize_table_schema(&schema);
78
79 self.emit_wal_op(WalOp::CreateTable {
80 table_id,
81 table_name: qualified_name.clone(),
82 schema_data,
83 });
84 }
85
86 let table = Table::new(schema);
87 self.tables.insert(qualified_name, table);
88
89 Ok(())
90 }
91
92 /// Create a table
93 /// Legacy method - uses global case_sensitive_identifiers setting
94 pub fn create_table(
95 &mut self,
96 schema: vibesql_catalog::TableSchema,
97 ) -> Result<(), StorageError> {
98 let table_name = schema.name.clone();
99
100 self.operations.create_table(&mut self.catalog, schema.clone())?;
101
102 // Normalize table name for storage (matches catalog normalization)
103 let normalized_table_name = if self.catalog.is_case_sensitive_identifiers() {
104 table_name.clone()
105 } else {
106 table_name.to_lowercase()
107 };
108
109 let current_schema = &self.catalog.get_current_schema();
110 let qualified_name = format!("{}.{}", current_schema, normalized_table_name);
111
112 // Assign table ID and emit WAL entry for persistence
113 let table_id = self.next_table_id();
114
115 // Serialize schema for WAL (use a simple binary format)
116 let schema_data = serialize_table_schema(&schema);
117
118 self.emit_wal_op(WalOp::CreateTable {
119 table_id,
120 table_name: qualified_name.clone(),
121 schema_data,
122 });
123
124 let table = Table::new(schema);
125 self.tables.insert(qualified_name, table);
126
127 Ok(())
128 }
129
130 /// Get a table by identifier using SQL:1999 case semantics.
131 ///
132 /// Uses the canonical form of the identifier for direct lookup without fallbacks.
133 /// Supports both simple and schema-qualified identifiers.
134 pub fn get_table_by_identifier(&self, identifier: &TableIdentifier) -> Option<&Table> {
135 let qualified_name = if identifier.is_qualified() {
136 // Identifier already includes schema qualification
137 identifier.canonical().to_string()
138 } else {
139 // Add current schema to unqualified identifier
140 let current_schema = &self.catalog.get_current_schema();
141 format!("{}.{}", current_schema, identifier.canonical())
142 };
143 self.tables.get(&qualified_name)
144 }
145
146 /// Get a table for reading
147 /// Legacy method with fallback lookups for backward compatibility
148 ///
149 /// For unqualified names, checks temp schema first (SQLite semantics).
150 /// SQLite Compatibility: The "temp" schema name is mapped to the session's
151 /// temp schema, allowing `temp.tablename` syntax.
152 pub fn get_table(&self, name: &str) -> Option<&Table> {
153 // For qualified names with "temp" schema, resolve to session's temp schema
154 // This enables `SELECT * FROM temp.t1` syntax
155 let resolved_name = if let Some((schema_part, table_part)) = name.split_once('.') {
156 if schema_part.eq_ignore_ascii_case(vibesql_catalog::TEMP_SCHEMA) {
157 std::borrow::Cow::Owned(format!(
158 "{}.{}",
159 self.catalog.temp_schema_name(),
160 table_part
161 ))
162 } else {
163 std::borrow::Cow::Borrowed(name)
164 }
165 } else {
166 std::borrow::Cow::Borrowed(name)
167 };
168 let name = resolved_name.as_ref();
169
170 // Try the name as-is first (for delimited identifiers)
171 if let Some(table) = self.tables.get(name) {
172 return Some(table);
173 }
174
175 // Try lowercase normalization (standard for unquoted identifiers)
176 let lowercase_name = name.to_lowercase();
177 if lowercase_name != name {
178 if let Some(table) = self.tables.get(&lowercase_name) {
179 return Some(table);
180 }
181 }
182
183 // Try uppercase normalization (for backward compatibility with old data)
184 let uppercase_name = name.to_uppercase();
185 if uppercase_name != name && uppercase_name != lowercase_name {
186 if let Some(table) = self.tables.get(&uppercase_name) {
187 return Some(table);
188 }
189 }
190
191 // For qualified names (schema.table), try normalizing each part separately
192 // This handles the case where storage normalized table name but not schema
193 if let Some((schema_part, table_part)) = name.split_once('.') {
194 // Try schema lowercase, table uppercase (current storage behavior)
195 let mixed_case =
196 format!("{}.{}", schema_part.to_lowercase(), table_part.to_uppercase());
197 if mixed_case != name && mixed_case != uppercase_name && mixed_case != lowercase_name {
198 if let Some(table) = self.tables.get(&mixed_case) {
199 return Some(table);
200 }
201 }
202 }
203
204 // For unqualified names, check session's temp schema first (SQLite semantics)
205 // Temp tables shadow tables in the main schema
206 if !name.contains('.') {
207 // Check session's temp schema first
208 let temp_qualified = format!("{}.{}", self.catalog.temp_schema_name(), lowercase_name);
209 if let Some(table) = self.tables.get(&temp_qualified) {
210 return Some(table);
211 }
212
213 let current_schema = &self.catalog.get_current_schema();
214
215 // Try as-is with schema prefix
216 let qualified_name_original = format!("{}.{}", current_schema, name);
217 if let Some(table) = self.tables.get(&qualified_name_original) {
218 return Some(table);
219 }
220
221 // Try uppercase with schema prefix
222 let qualified_name_uppercase = format!("{}.{}", current_schema, uppercase_name);
223 if qualified_name_uppercase != qualified_name_original {
224 if let Some(table) = self.tables.get(&qualified_name_uppercase) {
225 return Some(table);
226 }
227 }
228
229 // Try lowercase with schema prefix
230 let qualified_name_lowercase = format!("{}.{}", current_schema, lowercase_name);
231 if qualified_name_lowercase != qualified_name_original
232 && qualified_name_lowercase != qualified_name_uppercase
233 {
234 return self.tables.get(&qualified_name_lowercase);
235 }
236 }
237
238 None
239 }
240
241 /// Get a table for writing
242 ///
243 /// For unqualified names, checks temp schema first (SQLite semantics).
244 /// SQLite Compatibility: The "temp" schema name is mapped to the session's
245 /// temp schema, allowing `temp.tablename` syntax.
246 pub fn get_table_mut(&mut self, name: &str) -> Option<&mut Table> {
247 // For qualified names with "temp" schema, resolve to session's temp schema
248 // This enables `UPDATE temp.t1 SET ...` syntax
249 let resolved_name = if let Some((schema_part, table_part)) = name.split_once('.') {
250 if schema_part.eq_ignore_ascii_case(vibesql_catalog::TEMP_SCHEMA) {
251 Some(format!("{}.{}", self.catalog.temp_schema_name(), table_part))
252 } else {
253 None
254 }
255 } else {
256 None
257 };
258 let name = resolved_name.as_deref().unwrap_or(name);
259
260 // Try the name as-is first (for delimited identifiers)
261 if self.tables.contains_key(name) {
262 return self.tables.get_mut(name);
263 }
264
265 // Try lowercase normalization (standard for unquoted identifiers)
266 let lowercase_name = name.to_lowercase();
267 if lowercase_name != name && self.tables.contains_key(&lowercase_name) {
268 return self.tables.get_mut(&lowercase_name);
269 }
270
271 // Try uppercase normalization (for backward compatibility with old data)
272 let uppercase_name = name.to_uppercase();
273 if uppercase_name != name
274 && uppercase_name != lowercase_name
275 && self.tables.contains_key(&uppercase_name)
276 {
277 return self.tables.get_mut(&uppercase_name);
278 }
279
280 // For unqualified names, check session's temp schema first (SQLite semantics)
281 // Temp tables shadow tables in the main schema
282 if !name.contains('.') {
283 // Check session's temp schema first
284 let temp_qualified = format!("{}.{}", self.catalog.temp_schema_name(), lowercase_name);
285 if self.tables.contains_key(&temp_qualified) {
286 return self.tables.get_mut(&temp_qualified);
287 }
288
289 let current_schema = &self.catalog.get_current_schema().to_string();
290
291 // Try as-is with schema prefix
292 let qualified_name_original = format!("{}.{}", current_schema, name);
293 if self.tables.contains_key(&qualified_name_original) {
294 return self.tables.get_mut(&qualified_name_original);
295 }
296
297 // Try lowercase with schema prefix (standard)
298 let qualified_name_lowercase = format!("{}.{}", current_schema, lowercase_name);
299 if qualified_name_lowercase != qualified_name_original
300 && self.tables.contains_key(&qualified_name_lowercase)
301 {
302 return self.tables.get_mut(&qualified_name_lowercase);
303 }
304
305 // Try uppercase with schema prefix (backward compatibility)
306 let qualified_name_uppercase = format!("{}.{}", current_schema, uppercase_name);
307 if qualified_name_uppercase != qualified_name_original
308 && qualified_name_uppercase != qualified_name_lowercase
309 && self.tables.contains_key(&qualified_name_uppercase)
310 {
311 return self.tables.get_mut(&qualified_name_uppercase);
312 }
313 }
314
315 None
316 }
317
318 /// Drop a table
319 ///
320 /// Temporary tables (in the "temp" schema) are not persisted to WAL.
321 pub fn drop_table(&mut self, name: &str) -> Result<(), StorageError> {
322 // Emit WAL entry for persistence before dropping (skip for temp tables)
323 if !self.is_temp_table(name) {
324 self.emit_wal_op(WalOp::DropTable {
325 table_id: self.table_name_to_id(name),
326 table_name: name.to_string(),
327 });
328 }
329
330 // Invalidate columnar cache before dropping
331 self.columnar_cache.invalidate(name);
332 self.operations.drop_table(&mut self.catalog, &mut self.tables, name)
333 }
334
335 /// Insert a row into a table
336 ///
337 /// Temporary tables (in the "temp" schema) are not persisted to WAL.
338 pub fn insert_row(&mut self, table_name: &str, row: Row) -> Result<(), StorageError> {
339 let row_index =
340 self.operations.insert_row(&self.catalog, &mut self.tables, table_name, row.clone())?;
341
342 self.record_change(TransactionChange::Insert {
343 table_name: table_name.to_string(),
344 row: row.clone(),
345 });
346
347 // Emit WAL entry for persistence (skip for temp tables)
348 if !self.is_temp_table(table_name) {
349 self.emit_wal_op(WalOp::Insert {
350 table_id: self.table_name_to_id(table_name),
351 row_id: row_index as u64,
352 values: row.values.to_vec(),
353 });
354 }
355
356 // Broadcast change event to subscribers
357 self.broadcast_change(ChangeEvent::Insert {
358 table_name: table_name.to_string(),
359 row_index,
360 });
361
362 // Invalidate columnar cache for this table
363 self.columnar_cache.invalidate(table_name);
364
365 Ok(())
366 }
367
368 /// Insert multiple rows into a table in a single batch
369 ///
370 /// This method is optimized for bulk data loading and provides significant
371 /// performance improvements over repeated `insert_row` calls:
372 ///
373 /// - **Pre-allocation**: Vector capacity reserved upfront
374 /// - **Batch validation**: All rows validated before any insertion
375 /// - **Deferred index rebuild**: Indexes rebuilt once after all inserts
376 /// - **Single cache invalidation**: Columnar cache invalidated once at end
377 ///
378 /// # Arguments
379 ///
380 /// * `table_name` - Name of the table to insert into
381 /// * `rows` - Vector of rows to insert
382 ///
383 /// # Returns
384 ///
385 /// * `Ok(usize)` - Number of rows successfully inserted
386 /// * `Err(StorageError)` - If validation fails (no rows inserted on error)
387 ///
388 /// # Performance
389 ///
390 /// For large batches (1000+ rows), expect 10-50x speedup vs single-row inserts.
391 ///
392 /// # Example
393 ///
394 /// ```text
395 /// let rows = vec![
396 /// Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))]),
397 /// Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))]),
398 /// ];
399 /// let count = db.insert_rows_batch("users", rows)?;
400 /// ```
401 pub fn insert_rows_batch(
402 &mut self,
403 table_name: &str,
404 rows: Vec<Row>,
405 ) -> Result<usize, StorageError> {
406 if rows.is_empty() {
407 return Ok(0);
408 }
409
410 let row_indices = self.operations.insert_rows_batch(
411 &self.catalog,
412 &mut self.tables,
413 table_name,
414 rows.clone(),
415 )?;
416
417 let table_id = self.table_name_to_id(table_name);
418 let is_temp = self.is_temp_table(table_name);
419
420 // Record changes for transaction management, emit WAL entries, and broadcast events
421 for (row, &row_index) in rows.into_iter().zip(row_indices.iter()) {
422 self.record_change(TransactionChange::Insert {
423 table_name: table_name.to_string(),
424 row: row.clone(),
425 });
426
427 // Emit WAL entry for persistence (skip for temp tables)
428 if !is_temp {
429 self.emit_wal_op(WalOp::Insert {
430 table_id,
431 row_id: row_index as u64,
432 values: row.values.to_vec(),
433 });
434 }
435
436 // Broadcast change event to subscribers
437 self.broadcast_change(ChangeEvent::Insert {
438 table_name: table_name.to_string(),
439 row_index,
440 });
441 }
442
443 // Invalidate columnar cache for this table
444 self.columnar_cache.invalidate(table_name);
445
446 Ok(row_indices.len())
447 }
448
449 /// Insert rows from an iterator in a streaming fashion
450 ///
451 /// This method is optimized for very large datasets that may not fit
452 /// in memory all at once. Rows are processed in configurable batch sizes,
453 /// balancing memory usage with performance.
454 ///
455 /// # Arguments
456 ///
457 /// * `table_name` - Name of the table to insert into
458 /// * `rows` - Iterator yielding rows to insert
459 /// * `batch_size` - Number of rows per batch (0 defaults to 1000)
460 ///
461 /// # Returns
462 ///
463 /// * `Ok(usize)` - Total number of rows successfully inserted
464 /// * `Err(StorageError)` - If any batch fails validation
465 ///
466 /// # Note
467 ///
468 /// Unlike `insert_rows_batch`, this method commits rows batch-by-batch.
469 /// A failure partway through will leave previously committed batches
470 /// in the table. Use `insert_rows_batch` for all-or-nothing semantics.
471 ///
472 /// # Example
473 ///
474 /// ```text
475 /// // Stream 100K rows in batches of 5000
476 /// let rows = (0..100_000).map(|i| Row::new(vec![SqlValue::Integer(i)]));
477 /// let count = db.insert_rows_iter("numbers", rows, 5000)?;
478 /// ```
479 pub fn insert_rows_iter<I>(
480 &mut self,
481 table_name: &str,
482 rows: I,
483 batch_size: usize,
484 ) -> Result<usize, StorageError>
485 where
486 I: Iterator<Item = Row>,
487 {
488 let batch_size = if batch_size == 0 { 1000 } else { batch_size };
489 let mut total_inserted = 0;
490 let mut batch = Vec::with_capacity(batch_size);
491
492 for row in rows {
493 batch.push(row);
494
495 if batch.len() >= batch_size {
496 let count = self.insert_rows_batch(table_name, std::mem::take(&mut batch))?;
497 total_inserted += count;
498 batch = Vec::with_capacity(batch_size);
499 }
500 }
501
502 // Insert any remaining rows
503 if !batch.is_empty() {
504 let count = self.insert_rows_batch(table_name, batch)?;
505 total_inserted += count;
506 }
507
508 Ok(total_inserted)
509 }
510
511 /// Update a single row by primary key value (direct API, no SQL parsing)
512 ///
513 /// This method provides a high-performance update path that bypasses SQL parsing,
514 /// making it suitable for benchmarking and performance-critical code paths.
515 ///
516 /// # Arguments
517 ///
518 /// * `table_name` - Name of the table
519 /// * `pk_value` - Primary key value to match (single column PK only)
520 /// * `column_updates` - List of (column_name, new_value) pairs to update
521 ///
522 /// # Returns
523 ///
524 /// * `Ok(true)` - Row was found and updated
525 /// * `Ok(false)` - Row was not found (no error)
526 /// * `Err(StorageError)` - Table not found, column not found, or constraint violation
527 ///
528 /// # Example
529 ///
530 /// ```text
531 /// // Update column 'name' for row with id=5
532 /// let updated = db.update_row_by_pk(
533 /// "users",
534 /// SqlValue::Integer(5),
535 /// vec![("name", SqlValue::Varchar(arcstr::ArcStr::from("Alice")))],
536 /// )?;
537 /// ```
538 pub fn update_row_by_pk(
539 &mut self,
540 table_name: &str,
541 pk_value: vibesql_types::SqlValue,
542 column_updates: Vec<(&str, vibesql_types::SqlValue)>,
543 ) -> Result<bool, StorageError> {
544 // First phase: read data (immutable borrow)
545 let (row_index, old_row, schema, resolved_name) = {
546 // Get table using existing lookup logic (handles schema prefixes)
547 let table = self
548 .get_table(table_name)
549 .ok_or_else(|| StorageError::TableNotFound(table_name.to_string()))?;
550
551 // Look up row by PK
552 let pk_index = table
553 .primary_key_index()
554 .ok_or_else(|| StorageError::Other("Table has no primary key index".to_string()))?;
555
556 let row_index = match pk_index.get(&vec![pk_value.clone()]) {
557 Some(&idx) => idx,
558 None => return Ok(false), // Row not found
559 };
560
561 // Get old row and schema
562 let old_row = table.scan()[row_index].clone();
563 let schema = table.schema.clone();
564 let resolved_name = schema.name.clone();
565
566 (row_index, old_row, schema, resolved_name)
567 };
568
569 // Second phase: apply updates
570 let mut new_row = old_row.clone();
571 let mut changed_columns = std::collections::HashSet::new();
572
573 for (col_name, new_value) in &column_updates {
574 let col_index =
575 schema.get_column_index(col_name).ok_or_else(|| StorageError::ColumnNotFound {
576 column_name: col_name.to_string(),
577 table_name: resolved_name.clone(),
578 })?;
579
580 // Check NOT NULL constraint
581 let column = &schema.columns[col_index];
582 if !column.nullable && *new_value == vibesql_types::SqlValue::Null {
583 return Err(StorageError::NullConstraintViolation { column: col_name.to_string() });
584 }
585
586 new_row.set(col_index, new_value.clone())?;
587 changed_columns.insert(col_index);
588 }
589
590 // Third phase: write data (mutable borrow)
591 let table_mut = self.get_table_mut(table_name).unwrap();
592 table_mut.update_row_selective(row_index, new_row.clone(), &changed_columns)?;
593
594 // Update user-defined indexes (pass changed_columns to skip unaffected indexes)
595 self.operations.update_indexes_for_update(
596 &self.catalog,
597 &resolved_name,
598 &old_row,
599 &new_row,
600 row_index,
601 Some(&changed_columns),
602 );
603
604 // Emit WAL entry for persistence (skip for temp tables)
605 if !self.is_temp_table(&resolved_name) {
606 self.emit_wal_op(WalOp::Update {
607 table_id: self.table_name_to_id(&resolved_name),
608 row_id: row_index as u64,
609 old_values: old_row.values.to_vec(),
610 new_values: new_row.values.to_vec(),
611 });
612 }
613
614 // Broadcast change event to subscribers
615 self.broadcast_change(ChangeEvent::Update { table_name: resolved_name.clone(), row_index });
616
617 // Invalidate columnar cache
618 self.columnar_cache.invalidate(&resolved_name);
619
620 Ok(true)
621 }
622
623 /// List all table names
624 pub fn list_tables(&self) -> Vec<String> {
625 self.catalog.list_tables()
626 }
627}
628
629/// Serialize a TableSchema to bytes for WAL storage
630///
631/// Uses a simple format: JSON serialization of the schema.
632/// This is for WAL recovery purposes and doesn't need to be maximally efficient.
633pub(super) fn serialize_table_schema(schema: &vibesql_catalog::TableSchema) -> Vec<u8> {
634 // Simple approach: serialize the table name and column info as text
635 // Format: table_name\0col1_name\0col1_type\0nullable\0...
636 let mut data = Vec::new();
637
638 // Write table name
639 data.extend_from_slice(schema.name.as_bytes());
640 data.push(0);
641
642 // Write column count
643 data.extend_from_slice(&(schema.columns.len() as u32).to_le_bytes());
644
645 // Write each column
646 for col in &schema.columns {
647 // Column name
648 data.extend_from_slice(col.name.as_bytes());
649 data.push(0);
650
651 // Data type (as debug string for simplicity)
652 let type_str = format!("{:?}", col.data_type);
653 data.extend_from_slice(type_str.as_bytes());
654 data.push(0);
655
656 // Nullable flag
657 data.push(if col.nullable { 1 } else { 0 });
658 }
659
660 data
661}