Skip to main content

dbx_core/sql/
interface.rs

1//! SQL Execution Pipeline — SQL query execution methods
2
3use crate::engine::Database;
4use crate::error::{DbxError, DbxResult};
5use crate::sql::executor::{
6    FilterOperator, HashAggregateOperator, HashJoinOperator, LimitOperator, PhysicalOperator,
7    ProjectionOperator, SortOperator, TableScanOperator,
8};
9use crate::sql::planner::{LogicalPlanner, PhysicalExpr, PhysicalPlan, PhysicalPlanner};
10use crate::storage::columnar_cache::ColumnarCache;
11use arrow::array::RecordBatch;
12use arrow::datatypes::Schema;
13use arrow::ipc::writer::StreamWriter;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17// ════════════════════════════════════════════
18// Arrow IPC Serialization
19// ════════════════════════════════════════════
20
21/// Infer Arrow schema from row values
22fn infer_schema_from_values(values: &[PhysicalExpr]) -> DbxResult<Schema> {
23    use crate::storage::columnar::ScalarValue;
24    use arrow::datatypes::{DataType, Field};
25
26    let fields: Vec<Field> = values
27        .iter()
28        .enumerate()
29        .map(|(i, expr)| {
30            let name = format!("col_{}", i);
31            let data_type = match expr {
32                PhysicalExpr::Literal(scalar) => match scalar {
33                    ScalarValue::Int32(_) => DataType::Int32,
34                    ScalarValue::Int64(_) => DataType::Int64,
35                    ScalarValue::Float64(_) => DataType::Float64,
36                    ScalarValue::Utf8(_) => DataType::Utf8,
37                    ScalarValue::Boolean(_) => DataType::Boolean,
38                    ScalarValue::Binary(_) => DataType::Binary,
39                    ScalarValue::Null => DataType::Utf8, // Default to string for nulls
40                },
41                _ => DataType::Utf8, // Fallback to string
42            };
43            Field::new(name, data_type, true) // All fields nullable
44        })
45        .collect();
46
47    Ok(Schema::new(fields))
48}
49
50/// Serialize row values to Arrow IPC format
51fn serialize_to_arrow_ipc(schema: &Schema, row_values: &[PhysicalExpr]) -> DbxResult<Vec<u8>> {
52    use crate::storage::columnar::ScalarValue;
53    use arrow::array::*;
54
55    // Build arrays for each field
56    let mut arrays: Vec<ArrayRef> = Vec::new();
57
58    for (field, expr) in schema.fields().iter().zip(row_values) {
59        let array: ArrayRef = match expr {
60            PhysicalExpr::Literal(scalar) => {
61                match (field.data_type(), scalar) {
62                    (arrow::datatypes::DataType::Int32, ScalarValue::Int32(v)) => {
63                        Arc::new(Int32Array::from(vec![*v]))
64                    }
65                    (arrow::datatypes::DataType::Int64, ScalarValue::Int64(v)) => {
66                        Arc::new(Int64Array::from(vec![*v]))
67                    }
68                    // Auto-convert Int32 to Int64
69                    (arrow::datatypes::DataType::Int64, ScalarValue::Int32(v)) => {
70                        Arc::new(Int64Array::from(vec![*v as i64]))
71                    }
72                    (arrow::datatypes::DataType::Float64, ScalarValue::Float64(v)) => {
73                        Arc::new(Float64Array::from(vec![*v]))
74                    }
75                    (arrow::datatypes::DataType::Utf8, ScalarValue::Utf8(s)) => {
76                        Arc::new(StringArray::from(vec![s.as_str()]))
77                    }
78                    (arrow::datatypes::DataType::Boolean, ScalarValue::Boolean(b)) => {
79                        Arc::new(BooleanArray::from(vec![*b]))
80                    }
81                    (arrow::datatypes::DataType::Binary, ScalarValue::Binary(b)) => {
82                        Arc::new(BinaryArray::from(vec![b.as_slice()]))
83                    }
84                    (_, ScalarValue::Null) => {
85                        // Create null array of appropriate type
86                        match field.data_type() {
87                            arrow::datatypes::DataType::Int32 => {
88                                Arc::new(Int32Array::from(vec![None as Option<i32>]))
89                            }
90                            arrow::datatypes::DataType::Int64 => {
91                                Arc::new(Int64Array::from(vec![None as Option<i64>]))
92                            }
93                            arrow::datatypes::DataType::Float64 => {
94                                Arc::new(Float64Array::from(vec![None as Option<f64>]))
95                            }
96                            arrow::datatypes::DataType::Utf8 => {
97                                Arc::new(StringArray::from(vec![None as Option<&str>]))
98                            }
99                            arrow::datatypes::DataType::Boolean => {
100                                Arc::new(BooleanArray::from(vec![None as Option<bool>]))
101                            }
102                            arrow::datatypes::DataType::Binary => {
103                                Arc::new(BinaryArray::from(vec![None as Option<&[u8]>]))
104                            }
105                            _ => {
106                                return Err(DbxError::NotImplemented(format!(
107                                    "Unsupported null type: {:?}",
108                                    field.data_type()
109                                )));
110                            }
111                        }
112                    }
113                    _ => {
114                        return Err(DbxError::NotImplemented(format!(
115                            "Type mismatch: field {:?} vs scalar {:?}",
116                            field.data_type(),
117                            scalar
118                        )));
119                    }
120                }
121            }
122            _ => {
123                return Err(DbxError::NotImplemented(
124                    "Non-literal value in INSERT".to_string(),
125                ));
126            }
127        };
128        arrays.push(array);
129    }
130
131    // Create RecordBatch
132    let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays)
133        .map_err(|e| DbxError::Storage(e.to_string()))?;
134
135    // Serialize to Arrow IPC Stream format
136    let mut buffer = Vec::new();
137    {
138        let mut writer = StreamWriter::try_new(&mut buffer, schema)
139            .map_err(|e| DbxError::Serialization(e.to_string()))?;
140        writer
141            .write(&batch)
142            .map_err(|e| DbxError::Serialization(e.to_string()))?;
143        writer
144            .finish()
145            .map_err(|e| DbxError::Serialization(e.to_string()))?;
146    }
147
148    Ok(buffer)
149}
150
151// ════════════════════════════════════════════
152// Helper Functions for WHERE Clause Evaluation
153// ════════════════════════════════════════════
154
155/// Reconstruct a full record by prepending the key (col 0) to the stored value JSON.
156/// INSERT stores `row_values[1..]` as JSON, excluding the key column.
157/// This function re-adds the key as the first element so that schema column indices
158/// align correctly with the data during filter evaluation.
159fn reconstruct_full_record(key: &[u8], value_bytes: &[u8]) -> Vec<u8> {
160    // Try to interpret the key as an integer (le_bytes from INSERT)
161    let key_json = if key.len() == 4 {
162        let val = i32::from_le_bytes(key.try_into().unwrap_or([0; 4]));
163        serde_json::Value::Number(val.into())
164    } else if key.len() == 8 {
165        let val = i64::from_le_bytes(key.try_into().unwrap_or([0; 8]));
166        serde_json::Value::Number(val.into())
167    } else {
168        // Treat as UTF-8 string key
169        let s = String::from_utf8_lossy(key).to_string();
170        serde_json::Value::String(s)
171    };
172
173    // Parse existing values and prepend key
174    let mut values: Vec<serde_json::Value> =
175        serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
176    values.insert(0, key_json);
177    serde_json::to_vec(&values).unwrap_or_else(|_| value_bytes.to_vec())
178}
179
180/// Convert a single JSON record to a RecordBatch for filter evaluation
181fn json_record_to_batch(value_bytes: &[u8]) -> DbxResult<RecordBatch> {
182    use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray};
183    use arrow::datatypes::{DataType, Field, Schema};
184
185    let current_values: Vec<serde_json::Value> =
186        serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
187
188    let mut fields = Vec::new();
189    let mut columns: Vec<ArrayRef> = Vec::new();
190
191    for (i, val) in current_values.iter().enumerate() {
192        match val {
193            serde_json::Value::Number(n) => {
194                if n.is_i64() {
195                    fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
196                    columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
197                } else if n.is_f64() {
198                    fields.push(Field::new(format!("col_{}", i), DataType::Float64, true));
199                    columns.push(Arc::new(Float64Array::from(vec![n.as_f64()])));
200                } else {
201                    fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
202                    columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
203                }
204            }
205            serde_json::Value::String(s) => {
206                fields.push(Field::new(format!("col_{}", i), DataType::Utf8, true));
207                columns.push(Arc::new(StringArray::from(vec![Some(s.as_str())])));
208            }
209            serde_json::Value::Bool(b) => {
210                fields.push(Field::new(format!("col_{}", i), DataType::Boolean, true));
211                columns.push(Arc::new(BooleanArray::from(vec![Some(*b)])));
212            }
213            serde_json::Value::Null => {
214                fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
215                columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
216            }
217            _ => {
218                fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
219                columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
220            }
221        }
222    }
223
224    let schema = Arc::new(Schema::new(fields));
225    RecordBatch::try_new(schema, columns).map_err(DbxError::from)
226}
227
228/// Evaluate a filter expression for a single record
229/// Returns true if the record matches the filter (or if no filter is provided)
230fn evaluate_filter_for_record(
231    filter_expr: Option<&PhysicalExpr>,
232    value_bytes: &[u8],
233) -> DbxResult<bool> {
234    if let Some(expr) = filter_expr {
235        let batch = json_record_to_batch(value_bytes)?;
236
237        use crate::sql::executor::evaluate_expr;
238        let result = evaluate_expr(expr, &batch)?;
239
240        use arrow::array::BooleanArray;
241        let bool_array = result
242            .as_any()
243            .downcast_ref::<BooleanArray>()
244            .ok_or_else(|| DbxError::TypeMismatch {
245                expected: "BooleanArray".to_string(),
246                actual: format!("{:?}", result.data_type()),
247            })?;
248
249        Ok(bool_array.value(0))
250    } else {
251        Ok(true) // No filter, all records match
252    }
253}
254
255impl Database {
256    // ════════════════════════════════════════════
257    // SQL Execution Pipeline
258    // ════════════════════════════════════════════
259
260    /// Register table data for SQL queries.
261    ///
262    /// Tables registered here can be queried via `execute_sql()`.
263    pub fn register_table(&self, name: &str, batches: Vec<RecordBatch>) {
264        // Store schema from first batch
265        if let Some(first_batch) = batches.first() {
266            let schema = first_batch.schema();
267            let mut schemas = self.table_schemas.write().unwrap();
268            schemas.insert(name.to_string(), schema);
269        }
270
271        // Store batches
272        let mut tables = self.tables.write().unwrap();
273        tables.insert(name.to_string(), batches);
274    }
275
276    /// Append a RecordBatch to an existing registered table.
277    pub fn append_batch(&self, table: &str, batch: RecordBatch) {
278        let mut tables = self.tables.write().unwrap();
279        tables.entry(table.to_string()).or_default().push(batch);
280    }
281
282    /// Execute a SQL query and return RecordBatch results.
283    ///
284    /// Full pipeline: Parse → LogicalPlan → Optimize → PhysicalPlan → Execute
285    ///
286    /// # Example
287    ///
288    /// ```rust
289    /// # use dbx_core::Database;
290    /// # fn main() -> dbx_core::DbxResult<()> {
291    /// let db = Database::open_in_memory()?;
292    /// // Register table data first, then:
293    /// // let batches = db.execute_sql("SELECT * FROM users WHERE age > 18")?;
294    /// # Ok(())
295    /// # }
296    /// ```
297    pub fn execute_sql(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
298        // Step 1: Parse SQL → AST
299        let statements = self.sql_parser.parse(sql)?;
300        if statements.is_empty() {
301            return Ok(vec![]);
302        }
303
304        // Step 2: Logical Plan
305        let planner = LogicalPlanner::new();
306        let logical_plan = planner.plan(&statements[0])?;
307
308        // Step 3: Optimize
309        let optimized = self.sql_optimizer.optimize(logical_plan)?;
310
311        // Step 4: Physical Plan
312        let physical_planner = PhysicalPlanner::new(Arc::clone(&self.table_schemas));
313        let physical_plan = physical_planner.plan(&optimized)?;
314
315        // Step 5: Execute
316        self.execute_physical_plan(&physical_plan)
317    }
318
319    /// Execute a physical plan against registered table data.
320    fn execute_physical_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
321        match plan {
322            PhysicalPlan::Insert {
323                table,
324                columns: _,
325                values,
326            } => {
327                // Execute INSERT: convert PhysicalExpr values to bytes and insert into Delta Store
328                let mut rows_inserted = 0;
329
330                for row_values in values {
331                    // For simplicity, use first column as key, rest as value
332                    // TODO: Proper schema-based serialization
333                    if row_values.is_empty() {
334                        continue;
335                    }
336
337                    // Extract key from first value
338                    let key = match &row_values[0] {
339                        PhysicalExpr::Literal(scalar) => {
340                            use crate::storage::columnar::ScalarValue;
341                            match scalar {
342                                ScalarValue::Utf8(s) => s.as_bytes().to_vec(),
343                                ScalarValue::Int32(i) => i.to_le_bytes().to_vec(),
344                                ScalarValue::Int64(i) => i.to_le_bytes().to_vec(),
345                                ScalarValue::Float64(f) => f.to_le_bytes().to_vec(),
346                                ScalarValue::Boolean(b) => vec![if *b { 1 } else { 0 }],
347                                // The following cases are not expected here, but are added as per instruction
348                                // They would typically be handled in a `build_operator` method or similar
349                                // where DDL/DML plans are distinguished from query plans.
350                                // For now, we'll place them here as a placeholder for the instruction.
351                                _ => {
352                                    return Err(DbxError::NotImplemented(
353                                        "Non-literal key in INSERT".to_string(),
354                                    ));
355                                }
356                            }
357                        }
358                        _ => {
359                            return Err(DbxError::NotImplemented(
360                                "Non-literal key in INSERT".to_string(),
361                            ));
362                        }
363                    };
364
365                    // Get or infer schema for Arrow IPC serialization
366                    let schema = {
367                        let schemas = self.table_schemas.read().unwrap();
368                        schemas.get(table.as_str()).cloned()
369                    }
370                    .unwrap_or_else(|| {
371                        // No registered schema: infer from row values
372                        Arc::new(
373                            infer_schema_from_values(row_values)
374                                .expect("Failed to infer schema from values"),
375                        )
376                    });
377
378                    // Always use Arrow IPC serialization
379                    let value_bytes = serialize_to_arrow_ipc(&schema, row_values)?;
380
381                    // Insert into Delta Store
382                    self.insert(table, &key, &value_bytes)?;
383                    rows_inserted += 1;
384                }
385
386                // Return result batch indicating success
387                use arrow::array::{Int64Array, RecordBatch};
388                use arrow::datatypes::{DataType, Field, Schema};
389
390                let schema = Arc::new(Schema::new(vec![Field::new(
391                    "rows_inserted",
392                    DataType::Int64,
393                    false,
394                )]));
395                let array = Int64Array::from(vec![rows_inserted as i64]);
396                let batch =
397                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
398
399                Ok(vec![batch])
400            }
401            PhysicalPlan::Update {
402                table,
403                assignments,
404                filter,
405            } => {
406                // Execute UPDATE: scan, evaluate filter, update matching records
407                let all_records = self.scan(table)?;
408                let mut rows_updated = 0_i64;
409
410                // Build column name → index mapping from schema
411                let column_index_map = {
412                    let schemas = self.table_schemas.read().unwrap();
413                    schemas.get(table.as_str()).map(|schema| {
414                        schema
415                            .fields()
416                            .iter()
417                            .enumerate()
418                            .map(|(i, field)| (field.name().clone(), i))
419                            .collect::<std::collections::HashMap<String, usize>>()
420                    })
421                };
422
423                for (key, value_bytes) in all_records {
424                    // Reconstruct full record: prepend key (col 0) to value
425                    // INSERT stores row_values[1..] as JSON, so key must be re-added
426                    let full_record = reconstruct_full_record(&key, &value_bytes);
427                    let should_update = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
428
429                    if should_update {
430                        let mut current_values: Vec<serde_json::Value> =
431                            serde_json::from_slice(&full_record).unwrap_or_else(|_| vec![]);
432
433                        // Apply assignments using schema-based column mapping
434                        for (column_name, expr) in assignments.iter() {
435                            let target_idx = column_index_map
436                                .as_ref()
437                                .and_then(|map| map.get(column_name).copied())
438                                .unwrap_or_else(|| {
439                                    // Fallback: linear search by position
440                                    assignments
441                                        .iter()
442                                        .position(|(n, _)| n == column_name)
443                                        .unwrap_or(0)
444                                });
445
446                            if let PhysicalExpr::Literal(scalar) = expr {
447                                use crate::storage::columnar::ScalarValue;
448                                let new_value = match scalar {
449                                    ScalarValue::Utf8(s) => serde_json::Value::String(s.clone()),
450                                    ScalarValue::Int32(v) => serde_json::Value::Number((*v).into()),
451                                    ScalarValue::Int64(v) => serde_json::Value::Number((*v).into()),
452                                    ScalarValue::Float64(f) => serde_json::Number::from_f64(*f)
453                                        .map(serde_json::Value::Number)
454                                        .unwrap_or(serde_json::Value::Null),
455                                    ScalarValue::Boolean(b) => serde_json::Value::Bool(*b),
456                                    ScalarValue::Binary(b) => {
457                                        // Encode binary as base64 string
458                                        serde_json::Value::String(base64::Engine::encode(
459                                            &base64::engine::general_purpose::STANDARD,
460                                            b,
461                                        ))
462                                    }
463                                    ScalarValue::Null => serde_json::Value::Null,
464                                };
465
466                                if target_idx < current_values.len() {
467                                    current_values[target_idx] = new_value;
468                                }
469                            }
470                        }
471
472                        // Remove key (col 0) before serializing back to storage
473                        let storage_values = if current_values.len() > 1 {
474                            &current_values[1..]
475                        } else {
476                            &current_values[..]
477                        };
478                        let new_value_bytes = serde_json::to_vec(storage_values)
479                            .map_err(|e| DbxError::Serialization(e.to_string()))?;
480
481                        self.insert(table, &key, &new_value_bytes)?;
482                        rows_updated += 1;
483                    }
484                }
485
486                // Return result batch
487                use arrow::array::{Int64Array, RecordBatch};
488                use arrow::datatypes::{DataType, Field, Schema};
489
490                let schema = Arc::new(Schema::new(vec![Field::new(
491                    "rows_updated",
492                    DataType::Int64,
493                    false,
494                )]));
495                let array = Int64Array::from(vec![rows_updated]);
496                let batch =
497                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
498
499                Ok(vec![batch])
500            }
501            PhysicalPlan::Delete { table, filter } => {
502                // Execute DELETE: scan table, evaluate filter, delete matching records
503
504                // Step 1: Scan all records from the table
505                let all_records = self.scan(table)?;
506
507                let mut rows_deleted = 0_i64;
508
509                // Step 2: Process each record
510                for (key, value_bytes) in all_records {
511                    // Reconstruct full record: prepend key (col 0) to value
512                    let full_record = reconstruct_full_record(&key, &value_bytes);
513                    let should_delete = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
514
515                    if should_delete {
516                        // Delete from Delta Store
517                        self.delete(table, &key)?;
518                        rows_deleted += 1;
519                    }
520                }
521
522                // Return result batch
523                use arrow::array::{Int64Array, RecordBatch};
524                use arrow::datatypes::{DataType, Field, Schema};
525
526                let schema = Arc::new(Schema::new(vec![Field::new(
527                    "rows_deleted",
528                    DataType::Int64,
529                    false,
530                )]));
531                let array = Int64Array::from(vec![rows_deleted]);
532                let batch =
533                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
534
535                Ok(vec![batch])
536            }
537            PhysicalPlan::DropTable { table, if_exists } => {
538                // DROP TABLE implementation
539                use arrow::array::{Int64Array, RecordBatch};
540                use arrow::datatypes::{DataType, Field, Schema};
541
542                // Check if table exists
543                let exists = self.table_schemas.read().unwrap().contains_key(table);
544
545                if !exists && !if_exists {
546                    return Err(DbxError::TableNotFound(table.clone()));
547                }
548
549                if exists {
550                    // Remove table schema from memory
551                    self.table_schemas.write().unwrap().remove(table);
552
553                    // Delete schema from persistent storage
554                    self.wos.delete_schema_metadata(table)?;
555
556                    // Note: Data deletion from Delta Store/WOS would require
557                    // scanning and deleting all keys with table prefix
558                    // For now, we just remove the schema metadata
559                }
560
561                // Return success
562                let schema = Arc::new(Schema::new(vec![Field::new(
563                    "rows_affected",
564                    DataType::Int64,
565                    false,
566                )]));
567                let array = Int64Array::from(vec![1]);
568                let batch =
569                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
570
571                Ok(vec![batch])
572            }
573            PhysicalPlan::CreateTable {
574                table,
575                columns,
576                if_not_exists,
577            } => {
578                // CREATE TABLE implementation
579                use arrow::array::{Int64Array, RecordBatch};
580                use arrow::datatypes::{DataType, Field, Schema};
581
582                // Check if table already exists
583                let exists = self.table_schemas.read().unwrap().contains_key(table);
584
585                if exists && !if_not_exists {
586                    return Err(DbxError::Schema(format!(
587                        "Table '{}' already exists",
588                        table
589                    )));
590                }
591
592                if !exists {
593                    // Create Arrow schema from column definitions
594                    let fields: Vec<Field> = columns
595                        .iter()
596                        .map(|(name, type_str)| {
597                            let data_type = match type_str.to_uppercase().as_str() {
598                                "INT" | "INTEGER" => DataType::Int64,
599                                "TEXT" | "STRING" | "VARCHAR" => DataType::Utf8,
600                                "FLOAT" | "DOUBLE" => DataType::Float64,
601                                "BOOL" | "BOOLEAN" => DataType::Boolean,
602                                _ => DataType::Utf8, // Default to string
603                            };
604                            Field::new(name, data_type, true)
605                        })
606                        .collect();
607
608                    let schema = Arc::new(Schema::new(fields));
609
610                    // Store schema in memory
611                    self.table_schemas
612                        .write()
613                        .unwrap()
614                        .insert(table.clone(), schema.clone());
615
616                    // Persist schema to storage
617                    self.wos.save_schema_metadata(table, &schema)?;
618                }
619
620                // Return success
621                let schema = Arc::new(Schema::new(vec![Field::new(
622                    "rows_affected",
623                    DataType::Int64,
624                    false,
625                )]));
626                let array = Int64Array::from(vec![1]);
627                let batch =
628                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
629
630                Ok(vec![batch])
631            }
632            PhysicalPlan::CreateIndex {
633                table,
634                index_name,
635                columns,
636                if_not_exists,
637            } => {
638                use arrow::array::{Int64Array, RecordBatch};
639                use arrow::datatypes::{DataType, Field, Schema};
640
641                // Validate: target table must exist
642                {
643                    let schemas = self.table_schemas.read().unwrap();
644                    if !schemas.contains_key(table.as_str()) {
645                        return Err(DbxError::Schema(format!(
646                            "Table '{}' does not exist",
647                            table
648                        )));
649                    }
650                }
651
652                let column = columns.first().ok_or_else(|| {
653                    DbxError::Schema("CREATE INDEX requires at least one column".to_string())
654                })?;
655
656                let exists = self.index.has_index(table, column);
657
658                if exists && !if_not_exists {
659                    return Err(DbxError::IndexAlreadyExists {
660                        table: table.clone(),
661                        column: column.clone(),
662                    });
663                }
664
665                if !exists {
666                    self.index.create_index(table, column)?;
667
668                    // Register index_name → (table, column) mapping in memory
669                    self.index_registry
670                        .write()
671                        .unwrap()
672                        .insert(index_name.clone(), (table.clone(), column.clone()));
673
674                    // Persist index metadata to storage
675                    self.wos.save_index_metadata(index_name, table, column)?;
676                }
677
678                let schema = Arc::new(Schema::new(vec![Field::new(
679                    "rows_affected",
680                    DataType::Int64,
681                    false,
682                )]));
683                let array = Int64Array::from(vec![1]);
684                let batch =
685                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
686
687                Ok(vec![batch])
688            }
689            PhysicalPlan::DropIndex {
690                table,
691                index_name,
692                if_exists,
693            } => {
694                use arrow::array::{Int64Array, RecordBatch};
695                use arrow::datatypes::{DataType, Field, Schema};
696
697                // Resolve index_name → actual column via registry
698                let resolved_column = {
699                    let registry = self.index_registry.read().unwrap();
700                    registry
701                        .get(index_name.as_str())
702                        .map(|(_, col)| col.clone())
703                };
704
705                // Fallback: if not in registry, try index_name as column name
706                let column = resolved_column.as_deref().unwrap_or(index_name.as_str());
707
708                let exists = self.index.has_index(table, column);
709
710                if !exists && !if_exists {
711                    return Err(DbxError::IndexNotFound {
712                        table: table.clone(),
713                        column: column.to_string(),
714                    });
715                }
716
717                if exists {
718                    self.index.drop_index(table, column)?;
719
720                    // Remove from registry in memory
721                    self.index_registry
722                        .write()
723                        .unwrap()
724                        .remove(index_name.as_str());
725
726                    // Delete index metadata from storage
727                    self.wos.delete_index_metadata(index_name)?;
728                }
729
730                let schema = Arc::new(Schema::new(vec![Field::new(
731                    "rows_affected",
732                    DataType::Int64,
733                    false,
734                )]));
735                let array = Int64Array::from(vec![1]);
736                let batch =
737                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
738
739                Ok(vec![batch])
740            }
741            PhysicalPlan::AlterTable { table, operation } => {
742                // ALTER TABLE implementation
743                use crate::sql::planner::types::AlterTableOperation;
744                use arrow::array::{Int64Array, RecordBatch};
745                use arrow::datatypes::{DataType, Field, Schema};
746
747                match operation {
748                    AlterTableOperation::AddColumn {
749                        column_name,
750                        data_type,
751                    } => {
752                        // Get current schema
753                        let mut schemas = self.table_schemas.write().unwrap();
754                        let current_schema = schemas.get(table).ok_or_else(|| {
755                            DbxError::Schema(format!("Table '{}' not found", table))
756                        })?;
757
758                        // Convert data type string to Arrow DataType
759                        let arrow_type = match data_type.to_uppercase().as_str() {
760                            "INT" | "INTEGER" => DataType::Int64,
761                            "TEXT" | "VARCHAR" | "STRING" => DataType::Utf8,
762                            "FLOAT" | "DOUBLE" | "REAL" => DataType::Float64,
763                            "BOOL" | "BOOLEAN" => DataType::Boolean,
764                            _ => DataType::Utf8, // Default to string
765                        };
766
767                        // Create new field
768                        let new_field = Field::new(column_name, arrow_type, true);
769
770                        // Create new schema with added column
771                        let mut fields: Vec<Field> = current_schema
772                            .fields()
773                            .iter()
774                            .map(|f| f.as_ref().clone())
775                            .collect();
776                        fields.push(new_field);
777                        let new_schema = Arc::new(Schema::new(fields));
778
779                        // Update schema in memory
780                        schemas.insert(table.clone(), new_schema.clone());
781
782                        // Persist updated schema
783                        drop(schemas); // Release lock before calling wos
784                        self.wos.save_schema_metadata(table, &new_schema)?;
785                    }
786                    AlterTableOperation::DropColumn { column_name } => {
787                        // Get current schema
788                        let mut schemas = self.table_schemas.write().unwrap();
789                        let current_schema = schemas.get(table).ok_or_else(|| {
790                            DbxError::Schema(format!("Table '{}' not found", table))
791                        })?;
792
793                        // Find the column to drop
794                        let fields: Vec<Field> = current_schema
795                            .fields()
796                            .iter()
797                            .filter(|f| f.name() != column_name)
798                            .map(|f| f.as_ref().clone())
799                            .collect();
800
801                        // Check if column was found
802                        if fields.len() == current_schema.fields().len() {
803                            return Err(DbxError::Schema(format!(
804                                "Column '{}' not found in table '{}'",
805                                column_name, table
806                            )));
807                        }
808
809                        // Create new schema without the dropped column
810                        let new_schema = Arc::new(Schema::new(fields));
811
812                        // Update schema in memory
813                        schemas.insert(table.clone(), new_schema.clone());
814
815                        // Persist updated schema
816                        drop(schemas); // Release lock
817                        self.wos.save_schema_metadata(table, &new_schema)?;
818                    }
819                    AlterTableOperation::RenameColumn { old_name, new_name } => {
820                        // Get current schema
821                        let mut schemas = self.table_schemas.write().unwrap();
822                        let current_schema = schemas.get(table).ok_or_else(|| {
823                            DbxError::Schema(format!("Table '{}' not found", table))
824                        })?;
825
826                        // Find and rename the column
827                        let mut found = false;
828                        let fields: Vec<Field> = current_schema
829                            .fields()
830                            .iter()
831                            .map(|f| {
832                                if f.name() == old_name {
833                                    found = true;
834                                    Field::new(new_name, f.data_type().clone(), f.is_nullable())
835                                } else {
836                                    f.as_ref().clone()
837                                }
838                            })
839                            .collect();
840
841                        // Check if column was found
842                        if !found {
843                            return Err(DbxError::Schema(format!(
844                                "Column '{}' not found in table '{}'",
845                                old_name, table
846                            )));
847                        }
848
849                        // Create new schema with renamed column
850                        let new_schema = Arc::new(Schema::new(fields));
851
852                        // Update schema in memory
853                        schemas.insert(table.clone(), new_schema.clone());
854
855                        // Persist updated schema
856                        drop(schemas); // Release lock
857                        self.wos.save_schema_metadata(table, &new_schema)?;
858                    }
859                }
860
861                // Return success
862                let schema = Arc::new(Schema::new(vec![Field::new(
863                    "rows_affected",
864                    DataType::Int64,
865                    false,
866                )]));
867                let array = Int64Array::from(vec![1]);
868                let batch =
869                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
870
871                Ok(vec![batch])
872            }
873            _ => {
874                // Original logic for SELECT queries
875                self.execute_select_plan(plan)
876            }
877        }
878    }
879
880    /// Execute SELECT query plans (original logic)
881    fn execute_select_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
882        // Automatic Tier Selection & Loading
883        // Ensure all tables involved in the query are synced to Columnar Cache (Tier 2).
884        for table in plan.tables() {
885            if !self.columnar_cache.has_table(&table) {
886                let _ = self.sync_columnar_cache(&table);
887            }
888        }
889
890        let tables = self.tables.read().unwrap();
891        let mut operator = self.build_operator(plan, &tables, &self.columnar_cache)?;
892        Self::drain_operator(&mut *operator)
893    }
894
895    /// Build an operator tree from a physical plan.
896    fn build_operator(
897        &self,
898        plan: &PhysicalPlan,
899        tables: &HashMap<String, Vec<RecordBatch>>,
900        columnar_cache: &ColumnarCache,
901    ) -> DbxResult<Box<dyn PhysicalOperator>> {
902        match plan {
903            PhysicalPlan::TableScan {
904                table,
905                projection,
906                filter,
907            } => {
908                let mut filter_pushed_down = false;
909
910                // Try Columnar Cache first (with projection AND filter pushdown!)
911                let cached_results = if let Some(filter_expr) = filter {
912                    let filter_expr_clone = filter_expr.clone();
913                    // Use pushdown with filter
914                    let result = columnar_cache.get_batches_with_filter(
915                        table,
916                        if projection.is_empty() {
917                            None
918                        } else {
919                            Some(projection)
920                        },
921                        move |batch| {
922                            use crate::sql::executor::evaluate_expr;
923                            use arrow::array::BooleanArray;
924
925                            let array = evaluate_expr(&filter_expr_clone, batch)?;
926                            let boolean_array = array
927                                .as_any()
928                                .downcast_ref::<BooleanArray>()
929                                .ok_or_else(|| DbxError::TypeMismatch {
930                                    expected: "BooleanArray".to_string(),
931                                    actual: format!("{:?}", array.data_type()),
932                                })?;
933                            Ok(boolean_array.clone())
934                        },
935                    )?;
936                    if result.is_some() {
937                        filter_pushed_down = true;
938                    }
939                    result
940                } else {
941                    columnar_cache.get_batches(
942                        table,
943                        if projection.is_empty() {
944                            None
945                        } else {
946                            Some(projection)
947                        },
948                    )?
949                };
950
951                let (batches, schema, projection_to_use) =
952                    if let Some(cached_batches) = cached_results {
953                        if cached_batches.is_empty() {
954                            // Table exists but is empty in cache.
955                            // We need the schema to return an empty scan.
956                            let schema = {
957                                let schemas = self.table_schemas.read().unwrap();
958                                schemas
959                                    .get(table)
960                                    .or_else(|| {
961                                        let table_lower = table.to_lowercase();
962                                        schemas
963                                            .iter()
964                                            .find(|(k, _)| k.to_lowercase() == table_lower)
965                                            .map(|(_, v)| v)
966                                    })
967                                    .cloned()
968                            }
969                            .ok_or_else(|| DbxError::TableNotFound(table.clone()))?;
970                            (vec![], schema, projection.clone())
971                        } else {
972                            let schema = cached_batches[0].schema();
973                            (cached_batches, schema, vec![])
974                        }
975                    } else {
976                        // Try cache again
977                        let cached_after_sync = columnar_cache.get_batches(
978                            table,
979                            if projection.is_empty() {
980                                None
981                            } else {
982                                Some(projection)
983                            },
984                        )?;
985
986                        if let Some(batches) = cached_after_sync {
987                            if !batches.is_empty() {
988                                let schema = batches[0].schema();
989                                (batches, schema, vec![])
990                            } else {
991                                // Table exists but is empty
992                                let schema = {
993                                    let schemas = self.table_schemas.read().unwrap();
994                                    schemas
995                                        .get(table)
996                                        .or_else(|| {
997                                            let table_lower = table.to_lowercase();
998                                            schemas
999                                                .iter()
1000                                                .find(|(k, _)| k.to_lowercase() == table_lower)
1001                                                .map(|(_, v)| v)
1002                                        })
1003                                        .cloned()
1004                                }
1005                                .ok_or_else(|| DbxError::TableNotFound(table.clone()))?;
1006                                (vec![], schema, projection.clone())
1007                            }
1008                        } else {
1009                            // Not in cache, check HashMap fallback
1010                            let batches_opt = tables.get(table).or_else(|| {
1011                                let table_lower = table.to_lowercase();
1012                                tables
1013                                    .iter()
1014                                    .find(|(k, _)| k.to_lowercase() == table_lower)
1015                                    .map(|(_, v)| v)
1016                            });
1017
1018                            if let Some(batches) = batches_opt {
1019                                if batches.is_empty() {
1020                                    let schema = {
1021                                        let schemas = self.table_schemas.read().unwrap();
1022                                        schemas
1023                                            .get(table)
1024                                            .or_else(|| {
1025                                                let table_lower = table.to_lowercase();
1026                                                schemas
1027                                                    .iter()
1028                                                    .find(|(k, _)| k.to_lowercase() == table_lower)
1029                                                    .map(|(_, v)| v)
1030                                            })
1031                                            .cloned()
1032                                    }
1033                                    .ok_or_else(|| DbxError::TableNotFound(table.clone()))?;
1034                                    (vec![], schema, projection.clone())
1035                                } else {
1036                                    let schema = batches[0].schema();
1037                                    (batches.clone(), schema, projection.clone())
1038                                }
1039                            } else {
1040                                // Truly not found anywhere
1041                                return Err(DbxError::TableNotFound(table.clone()));
1042                            }
1043                        }
1044                    };
1045
1046                let mut scan =
1047                    TableScanOperator::new(table.clone(), Arc::clone(&schema), projection_to_use);
1048                scan.set_data(batches);
1049
1050                // Wrap with filter if needed AND NOT pushed down
1051                if let Some(filter_expr) = filter {
1052                    if !filter_pushed_down {
1053                        Ok(Box::new(FilterOperator::new(
1054                            Box::new(scan),
1055                            filter_expr.clone(),
1056                        )))
1057                    } else {
1058                        // Filter already applied in scan (via cache)
1059                        Ok(Box::new(scan))
1060                    }
1061                } else {
1062                    Ok(Box::new(scan))
1063                }
1064            }
1065
1066            PhysicalPlan::Projection {
1067                input,
1068                exprs,
1069                aliases,
1070            } => {
1071                let input_op = self.build_operator(input, tables, columnar_cache)?;
1072                use arrow::datatypes::Field;
1073
1074                let input_schema = input_op.schema();
1075                let fields: Vec<Field> = exprs
1076                    .iter()
1077                    .enumerate()
1078                    .map(|(i, expr)| {
1079                        let data_type = expr.get_type(input_schema);
1080                        let field_name = if let Some(Some(alias)) = aliases.get(i) {
1081                            alias.clone()
1082                        } else {
1083                            format!("col_{}", i)
1084                        };
1085                        Field::new(&field_name, data_type, true)
1086                    })
1087                    .collect();
1088
1089                let output_schema = Arc::new(Schema::new(fields));
1090                Ok(Box::new(ProjectionOperator::new(
1091                    input_op,
1092                    output_schema,
1093                    exprs.clone(),
1094                )))
1095            }
1096
1097            PhysicalPlan::Limit {
1098                input,
1099                count,
1100                offset,
1101            } => {
1102                let input_op = self.build_operator(input, tables, columnar_cache)?;
1103                Ok(Box::new(LimitOperator::new(input_op, *count, *offset)))
1104            }
1105
1106            PhysicalPlan::SortMerge { input, order_by } => {
1107                let input_op = self.build_operator(input, tables, columnar_cache)?;
1108                Ok(Box::new(SortOperator::new(input_op, order_by.clone())))
1109            }
1110
1111            PhysicalPlan::HashAggregate {
1112                input,
1113                group_by,
1114                aggregates,
1115            } => {
1116                let input_op = self.build_operator(input, tables, columnar_cache)?;
1117                let input_schema = input_op.schema();
1118                let mut fields = Vec::new();
1119
1120                for &idx in group_by {
1121                    fields.push(input_schema.field(idx).clone());
1122                }
1123
1124                for agg in aggregates {
1125                    use arrow::datatypes::DataType;
1126                    let data_type =
1127                        if matches!(agg.function, crate::sql::planner::AggregateFunction::Count) {
1128                            DataType::Int64
1129                        } else {
1130                            input_schema.field(agg.input).data_type().clone()
1131                        };
1132                    let name = agg
1133                        .alias
1134                        .clone()
1135                        .unwrap_or_else(|| format!("agg_{:?}", agg.function));
1136                    fields.push(arrow::datatypes::Field::new(&name, data_type, true));
1137                }
1138
1139                let agg_schema = Arc::new(arrow::datatypes::Schema::new(fields));
1140                Ok(Box::new(
1141                    HashAggregateOperator::new(
1142                        input_op,
1143                        agg_schema,
1144                        group_by.clone(),
1145                        aggregates.clone(),
1146                    )
1147                    .with_gpu(self.gpu_manager.clone()),
1148                ))
1149            }
1150
1151            PhysicalPlan::HashJoin {
1152                left,
1153                right,
1154                on,
1155                join_type,
1156            } => {
1157                use arrow::datatypes::Field;
1158
1159                let left_op = self.build_operator(left, tables, columnar_cache)?;
1160                let right_op = self.build_operator(right, tables, columnar_cache)?;
1161
1162                // Build joined schema: left columns + right columns
1163                let left_schema = left_op.schema();
1164                let right_schema = right_op.schema();
1165
1166                let mut joined_fields: Vec<Field> = Vec::new();
1167                for field in left_schema.fields().iter() {
1168                    let mut f = field.as_ref().clone();
1169                    // In RIGHT JOIN, left side can be null
1170                    if matches!(join_type, crate::sql::planner::JoinType::Right) {
1171                        f = f.with_nullable(true);
1172                    }
1173                    joined_fields.push(f);
1174                }
1175                for field in right_schema.fields().iter() {
1176                    let mut f = field.as_ref().clone();
1177                    // In LEFT JOIN, right side can be null
1178                    if matches!(join_type, crate::sql::planner::JoinType::Left) {
1179                        f = f.with_nullable(true);
1180                    }
1181                    joined_fields.push(f);
1182                }
1183
1184                let joined_schema = Arc::new(Schema::new(joined_fields));
1185
1186                Ok(Box::new(HashJoinOperator::new(
1187                    left_op,
1188                    right_op,
1189                    joined_schema,
1190                    on.clone(),
1191                    *join_type,
1192                )))
1193            }
1194
1195            PhysicalPlan::Insert { .. } => {
1196                // INSERT should be handled in execute_physical_plan, not here
1197                unreachable!("INSERT should not reach build_operator")
1198            }
1199            PhysicalPlan::Update { .. } => {
1200                // UPDATE should be handled in execute_physical_plan, not here
1201                unreachable!("UPDATE should not reach build_operator")
1202            }
1203            PhysicalPlan::Delete { .. } => {
1204                // DELETE should be handled in execute_physical_plan, not here
1205                unreachable!("DELETE should not reach build_operator")
1206            }
1207            PhysicalPlan::DropTable { .. } => {
1208                // DROP TABLE should be handled in execute_physical_plan, not here
1209                unreachable!("DROP TABLE should not reach build_operator")
1210            }
1211            PhysicalPlan::CreateTable { .. } => {
1212                // CREATE TABLE should be handled in execute_physical_plan, not here
1213                unreachable!("CREATE TABLE should not reach build_operator")
1214            }
1215            PhysicalPlan::CreateIndex { .. } => {
1216                // CREATE INDEX should be handled in execute_physical_plan, not here
1217                unreachable!("CREATE INDEX should not reach build_operator")
1218            }
1219            PhysicalPlan::DropIndex { .. } => {
1220                // DROP INDEX should be handled in execute_physical_plan, not here
1221                unreachable!("DROP INDEX should not reach build_operator")
1222            }
1223            PhysicalPlan::AlterTable { .. } => {
1224                // ALTER TABLE should be handled in execute_physical_plan, not here
1225                unreachable!("ALTER TABLE should not reach build_operator")
1226            }
1227            PhysicalPlan::CreateFunction { .. } => {
1228                unreachable!("CREATE FUNCTION should not reach build_operator")
1229            }
1230            PhysicalPlan::CreateTrigger { .. } => {
1231                unreachable!("CREATE TRIGGER should not reach build_operator")
1232            }
1233            PhysicalPlan::CreateJob { .. } => {
1234                unreachable!("CREATE JOB should not reach build_operator")
1235            }
1236            PhysicalPlan::DropFunction { .. } => {
1237                unreachable!("DROP FUNCTION should not reach build_operator")
1238            }
1239            PhysicalPlan::DropTrigger { .. } => {
1240                unreachable!("DROP TRIGGER should not reach build_operator")
1241            }
1242            PhysicalPlan::DropJob { .. } => {
1243                unreachable!("DROP JOB should not reach build_operator")
1244            }
1245        }
1246    }
1247
1248    /// Drain all batches from an operator.
1249    fn drain_operator(op: &mut dyn PhysicalOperator) -> DbxResult<Vec<RecordBatch>> {
1250        let mut results = Vec::new();
1251        while let Some(batch) = op.next()? {
1252            if batch.num_rows() > 0 {
1253                results.push(batch);
1254            }
1255        }
1256        Ok(results)
1257    }
1258}
1259
1260// ════════════════════════════════════════════
1261// DatabaseSql Trait Implementation
1262// ════════════════════════════════════════════
1263
1264impl crate::traits::DatabaseSql for Database {
1265    fn execute_sql(&self, sql: &str) -> DbxResult<Vec<arrow::record_batch::RecordBatch>> {
1266        // Reuse existing implementation
1267        Database::execute_sql(self, sql)
1268    }
1269
1270    fn register_table(&self, name: &str, batches: Vec<arrow::record_batch::RecordBatch>) {
1271        // Reuse existing implementation
1272        Database::register_table(self, name, batches)
1273    }
1274
1275    fn append_batch(&self, table: &str, batch: arrow::record_batch::RecordBatch) -> DbxResult<()> {
1276        // Reuse existing implementation
1277        Database::append_batch(self, table, batch);
1278        Ok(())
1279    }
1280}