Skip to main content

dbx_core/sql/
interface.rs

1//! SQL Execution Pipeline — SQL query execution methods
2
3use crate::engine::Database;
4use crate::error::{DbxError, DbxResult};
5use crate::sql::executor::{
6    FilterOperator, HashAggregateOperator, HashJoinOperator, LimitOperator, PhysicalOperator,
7    ProjectionOperator, SortOperator, TableScanOperator,
8};
9use crate::sql::planner::{LogicalPlanner, PhysicalExpr, PhysicalPlan, PhysicalPlanner};
10use crate::storage::columnar_cache::ColumnarCache;
11use arrow::array::RecordBatch;
12use arrow::datatypes::Schema;
13use arrow::ipc::writer::StreamWriter;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17// ════════════════════════════════════════════
18// Arrow IPC Serialization
19// ════════════════════════════════════════════
20
21/// Infer Arrow schema from row values
22fn infer_schema_from_values(values: &[PhysicalExpr]) -> DbxResult<Schema> {
23    use crate::storage::columnar::ScalarValue;
24    use arrow::datatypes::{DataType, Field};
25
26    let fields: Vec<Field> = values
27        .iter()
28        .enumerate()
29        .map(|(i, expr)| {
30            let name = format!("col_{}", i);
31            let data_type = match expr {
32                PhysicalExpr::Literal(scalar) => match scalar {
33                    ScalarValue::Int32(_) => DataType::Int32,
34                    ScalarValue::Int64(_) => DataType::Int64,
35                    ScalarValue::Float64(_) => DataType::Float64,
36                    ScalarValue::Utf8(_) => DataType::Utf8,
37                    ScalarValue::Boolean(_) => DataType::Boolean,
38                    ScalarValue::Binary(_) => DataType::Binary,
39                    ScalarValue::Null => DataType::Utf8, // Default to string for nulls
40                },
41                _ => DataType::Utf8, // Fallback to string
42            };
43            Field::new(name, data_type, true) // All fields nullable
44        })
45        .collect();
46
47    Ok(Schema::new(fields))
48}
49
50/// Serialize row values to Arrow IPC format
51fn serialize_to_arrow_ipc(schema: &Schema, row_values: &[PhysicalExpr]) -> DbxResult<Vec<u8>> {
52    use crate::storage::columnar::ScalarValue;
53    use arrow::array::*;
54
55    // Build arrays for each field
56    let mut arrays: Vec<ArrayRef> = Vec::new();
57
58    for (field, expr) in schema.fields().iter().zip(row_values) {
59        let array: ArrayRef = match expr {
60            PhysicalExpr::Literal(scalar) => {
61                match (field.data_type(), scalar) {
62                    (arrow::datatypes::DataType::Int32, ScalarValue::Int32(v)) => {
63                        Arc::new(Int32Array::from(vec![*v]))
64                    }
65                    (arrow::datatypes::DataType::Int64, ScalarValue::Int64(v)) => {
66                        Arc::new(Int64Array::from(vec![*v]))
67                    }
68                    // Auto-convert Int32 to Int64
69                    (arrow::datatypes::DataType::Int64, ScalarValue::Int32(v)) => {
70                        Arc::new(Int64Array::from(vec![*v as i64]))
71                    }
72                    (arrow::datatypes::DataType::Float64, ScalarValue::Float64(v)) => {
73                        Arc::new(Float64Array::from(vec![*v]))
74                    }
75                    (arrow::datatypes::DataType::Utf8, ScalarValue::Utf8(s)) => {
76                        Arc::new(StringArray::from(vec![s.as_str()]))
77                    }
78                    (arrow::datatypes::DataType::Boolean, ScalarValue::Boolean(b)) => {
79                        Arc::new(BooleanArray::from(vec![*b]))
80                    }
81                    (arrow::datatypes::DataType::Binary, ScalarValue::Binary(b)) => {
82                        Arc::new(BinaryArray::from(vec![b.as_slice()]))
83                    }
84                    (_, ScalarValue::Null) => {
85                        // Create null array of appropriate type
86                        match field.data_type() {
87                            arrow::datatypes::DataType::Int32 => {
88                                Arc::new(Int32Array::from(vec![None as Option<i32>]))
89                            }
90                            arrow::datatypes::DataType::Int64 => {
91                                Arc::new(Int64Array::from(vec![None as Option<i64>]))
92                            }
93                            arrow::datatypes::DataType::Float64 => {
94                                Arc::new(Float64Array::from(vec![None as Option<f64>]))
95                            }
96                            arrow::datatypes::DataType::Utf8 => {
97                                Arc::new(StringArray::from(vec![None as Option<&str>]))
98                            }
99                            arrow::datatypes::DataType::Boolean => {
100                                Arc::new(BooleanArray::from(vec![None as Option<bool>]))
101                            }
102                            arrow::datatypes::DataType::Binary => {
103                                Arc::new(BinaryArray::from(vec![None as Option<&[u8]>]))
104                            }
105                            _ => {
106                                return Err(DbxError::NotImplemented(format!(
107                                    "Unsupported null type: {:?}",
108                                    field.data_type()
109                                )));
110                            }
111                        }
112                    }
113                    _ => {
114                        return Err(DbxError::NotImplemented(format!(
115                            "Type mismatch: field {:?} vs scalar {:?}",
116                            field.data_type(),
117                            scalar
118                        )));
119                    }
120                }
121            }
122            _ => {
123                return Err(DbxError::NotImplemented(
124                    "Non-literal value in INSERT".to_string(),
125                ));
126            }
127        };
128        arrays.push(array);
129    }
130
131    // Create RecordBatch
132    let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays)
133        .map_err(|e| DbxError::Storage(e.to_string()))?;
134
135    // Serialize to Arrow IPC Stream format
136    let mut buffer = Vec::new();
137    {
138        let mut writer = StreamWriter::try_new(&mut buffer, schema)
139            .map_err(|e| DbxError::Serialization(e.to_string()))?;
140        writer
141            .write(&batch)
142            .map_err(|e| DbxError::Serialization(e.to_string()))?;
143        writer
144            .finish()
145            .map_err(|e| DbxError::Serialization(e.to_string()))?;
146    }
147
148    Ok(buffer)
149}
150
151// ════════════════════════════════════════════
152// Helper Functions for WHERE Clause Evaluation
153// ════════════════════════════════════════════
154
155/// Reconstruct a full record by prepending the key (col 0) to the stored value JSON.
156/// INSERT stores `row_values[1..]` as JSON, excluding the key column.
157/// This function re-adds the key as the first element so that schema column indices
158/// align correctly with the data during filter evaluation.
159fn reconstruct_full_record(key: &[u8], value_bytes: &[u8]) -> Vec<u8> {
160    // Try to interpret the key as an integer (le_bytes from INSERT)
161    let key_json = if key.len() == 4 {
162        let val = i32::from_le_bytes(key.try_into().unwrap_or([0; 4]));
163        serde_json::Value::Number(val.into())
164    } else if key.len() == 8 {
165        let val = i64::from_le_bytes(key.try_into().unwrap_or([0; 8]));
166        serde_json::Value::Number(val.into())
167    } else {
168        // Treat as UTF-8 string key
169        let s = String::from_utf8_lossy(key).to_string();
170        serde_json::Value::String(s)
171    };
172
173    // Parse existing values and prepend key
174    let mut values: Vec<serde_json::Value> =
175        serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
176    values.insert(0, key_json);
177    serde_json::to_vec(&values).unwrap_or_else(|_| value_bytes.to_vec())
178}
179
180/// Convert a single JSON record to a RecordBatch for filter evaluation
181fn json_record_to_batch(value_bytes: &[u8]) -> DbxResult<RecordBatch> {
182    use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray};
183    use arrow::datatypes::{DataType, Field, Schema};
184
185    let current_values: Vec<serde_json::Value> =
186        serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
187
188    let mut fields = Vec::new();
189    let mut columns: Vec<ArrayRef> = Vec::new();
190
191    for (i, val) in current_values.iter().enumerate() {
192        match val {
193            serde_json::Value::Number(n) => {
194                if n.is_i64() {
195                    fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
196                    columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
197                } else if n.is_f64() {
198                    fields.push(Field::new(format!("col_{}", i), DataType::Float64, true));
199                    columns.push(Arc::new(Float64Array::from(vec![n.as_f64()])));
200                } else {
201                    fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
202                    columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
203                }
204            }
205            serde_json::Value::String(s) => {
206                fields.push(Field::new(format!("col_{}", i), DataType::Utf8, true));
207                columns.push(Arc::new(StringArray::from(vec![Some(s.as_str())])));
208            }
209            serde_json::Value::Bool(b) => {
210                fields.push(Field::new(format!("col_{}", i), DataType::Boolean, true));
211                columns.push(Arc::new(BooleanArray::from(vec![Some(*b)])));
212            }
213            serde_json::Value::Null => {
214                fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
215                columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
216            }
217            _ => {
218                fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
219                columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
220            }
221        }
222    }
223
224    let schema = Arc::new(Schema::new(fields));
225    RecordBatch::try_new(schema, columns).map_err(DbxError::from)
226}
227
228/// Evaluate a filter expression for a single record
229/// Returns true if the record matches the filter (or if no filter is provided)
230fn evaluate_filter_for_record(
231    filter_expr: Option<&PhysicalExpr>,
232    value_bytes: &[u8],
233) -> DbxResult<bool> {
234    if let Some(expr) = filter_expr {
235        let batch = json_record_to_batch(value_bytes)?;
236
237        use crate::sql::executor::evaluate_expr;
238        let result = evaluate_expr(expr, &batch)?;
239
240        use arrow::array::BooleanArray;
241        let bool_array = result
242            .as_any()
243            .downcast_ref::<BooleanArray>()
244            .ok_or_else(|| DbxError::TypeMismatch {
245                expected: "BooleanArray".to_string(),
246                actual: format!("{:?}", result.data_type()),
247            })?;
248
249        Ok(bool_array.value(0))
250    } else {
251        Ok(true) // No filter, all records match
252    }
253}
254
255impl Database {
256    // ════════════════════════════════════════════
257    // Phase 3: 파티셔닝 (Partition Pruning Helper)
258    // ════════════════════════════════════════════
259
260    /// 파티셔닝 라우팅 리스트를 가져옵니다.
261    /// 조건이 없으면 모든 파티션을 스캔, 조건이 있으면 (향후) 필요한 파티션만 스캔합니다.
262    pub(crate) fn get_tables_to_scan(
263        &self,
264        table: &str,
265        _filter: Option<&PhysicalExpr>,
266    ) -> Vec<String> {
267        let maps = self.partition_maps.read().unwrap();
268        if let Some(map) = maps.get(table) {
269            // MVP: 조건절을 분석하여 특정 파티션만 추출하는 프루닝 로직 대신 전체 파티션을 반환.
270            // TODO: Extract PartitionValue from _filter and prune.
271            map.all_partitions()
272        } else {
273            // 파티션 매핑이 없으면 셔딩 라우터에 쓰인 모든 샤드 가상 테이블을 조회
274            // ════════════════════════════════════════════
275            // Phase 6: Sharding Scatter Read (Scatter-Gather)
276            // ════════════════════════════════════════════
277            self.sharding_router
278                .all_shards()
279                .iter()
280                .map(|shard| format!("{}__shard_{}", table, shard.id))
281                .collect()
282        }
283    }
284
285    // ════════════════════════════════════════════
286    // SQL Execution Pipeline
287    // ════════════════════════════════════════════
288
289    /// Register table data for SQL queries.
290    ///
291    /// Tables registered here can be queried via `execute_sql()`.
292    pub fn register_table(&self, name: &str, batches: Vec<RecordBatch>) {
293        // Store schema from first batch
294        if let Some(first_batch) = batches.first() {
295            let schema = first_batch.schema();
296            let mut schemas = self.table_schemas.write().unwrap();
297            schemas.insert(name.to_string(), schema);
298        }
299
300        // Store batches
301        let mut tables = self.tables.write().unwrap();
302        tables.insert(name.to_string(), batches);
303    }
304
305    /// Append a RecordBatch to an existing registered table.
306    pub fn append_batch(&self, table: &str, batch: RecordBatch) {
307        let mut tables = self.tables.write().unwrap();
308        tables.entry(table.to_string()).or_default().push(batch);
309    }
310
311    /// Execute a SQL query and return RecordBatch results.
312    ///
313    /// Full pipeline: Parse → LogicalPlan → Optimize → PhysicalPlan → Execute
314    ///
315    /// # Example
316    ///
317    /// ```rust
318    /// # use dbx_core::Database;
319    /// # fn main() -> dbx_core::DbxResult<()> {
320    /// let db = Database::open_in_memory()?;
321    /// // Register table data first, then:
322    /// // let batches = db.execute_sql("SELECT * FROM users WHERE age > 18")?;
323    /// # Ok(())
324    /// # }
325    /// ```
326    pub fn execute_sql(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
327        let t0 = std::time::Instant::now();
328
329        // Step 0: Intercept CREATE VIEW / DROP VIEW (before parser)
330        let sql_trimmed = sql.trim();
331        let sql_upper = sql_trimmed.to_uppercase();
332
333        if sql_upper.starts_with("CREATE VIEW") {
334            return self.handle_create_view(sql_trimmed);
335        }
336        if sql_upper.starts_with("DROP VIEW") {
337            return self.handle_drop_view(sql_trimmed);
338        }
339
340        // Materialized View 인터셀트 (CREATE/DROP/REFRESH)
341        if sql_upper.starts_with("CREATE MATERIALIZED VIEW") {
342            return self.handle_create_materialized_view(sql_trimmed);
343        }
344        if sql_upper.starts_with("DROP MATERIALIZED VIEW") {
345            return self.handle_drop_materialized_view(sql_trimmed);
346        }
347        if sql_upper.starts_with("REFRESH MATERIALIZED VIEW") {
348            return self.handle_refresh_materialized_view(sql_trimmed);
349        }
350
351        // SELECT 시 Materialized View 캐시 히트 확인
352        if sql_upper.starts_with("SELECT")
353            && let Some(cached) = self.try_matview_cache(sql_trimmed)
354        {
355            return Ok(cached);
356        }
357
358        // Step 0b: Expand views in FROM clauses
359        let expanded = self.view_registry.expand(sql_trimmed);
360        let sql = expanded.as_str();
361
362        // Step 1: Parse SQL → AST
363        let statements = self.sql_parser.parse(sql)?;
364        if statements.is_empty() {
365            return Ok(vec![]);
366        }
367
368        // Step 2: Logical Plan
369        let planner = LogicalPlanner::new();
370        let logical_plan = planner.plan(&statements[0])?;
371
372        // Step 3: Optimize
373        let optimized = self.sql_optimizer.optimize(logical_plan)?;
374
375        // Step 4: Physical Plan
376        let physical_planner = PhysicalPlanner::new(Arc::clone(&self.table_schemas));
377        let physical_plan = physical_planner.plan(&optimized)?;
378
379        // Step 5: Execute
380        let result = self.execute_physical_plan(&physical_plan);
381
382        // ── Metrics ─────────────────────────────────────────────────
383        let elapsed_us = t0.elapsed().as_micros() as u64;
384        self.metrics.inc_sql_queries();
385        self.metrics.query_latency_us.observe(elapsed_us);
386
387        result
388    }
389
390    /// CREATE VIEW handler
391    fn handle_create_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
392        // Syntax: CREATE VIEW <name> AS <select_sql>
393        let upper = sql.to_uppercase();
394        let after_view = &sql[upper.find("VIEW").unwrap() + 4..]
395            .trim_start()
396            .to_owned();
397        let as_pos = after_view
398            .to_uppercase()
399            .find(" AS ")
400            .ok_or_else(|| DbxError::SqlParse {
401                message: "CREATE VIEW requires AS".to_string(),
402                sql: sql.to_string(),
403            })?;
404        let view_name = after_view[..as_pos].trim();
405        let view_sql = after_view[as_pos + 4..].trim();
406        self.view_registry.create(view_name, view_sql)?;
407        self.one_row_affected_batch()
408    }
409
410    /// DROP VIEW handler
411    fn handle_drop_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
412        // Syntax: DROP VIEW [IF EXISTS] <name>
413        let upper = sql.to_uppercase();
414        let after_view = sql[upper.find("VIEW").unwrap() + 4..]
415            .trim_start()
416            .to_owned();
417        let (if_exists, name_part) = if after_view.to_uppercase().starts_with("IF EXISTS") {
418            (true, after_view[9..].trim_start().to_string())
419        } else {
420            (false, after_view.clone())
421        };
422        let view_name = name_part.trim();
423
424        if if_exists && !self.view_registry.exists(view_name) {
425            return self.one_row_affected_batch();
426        }
427
428        self.view_registry.drop(view_name)?;
429        self.one_row_affected_batch()
430    }
431
432    // ────────────────────────────────────────────────────────────────────────
433    // Materialized View Handlers
434    // ────────────────────────────────────────────────────────────────────────
435
436    /// CREATE MATERIALIZED VIEW <name> [REFRESH EVERY <secs>] AS <select_sql>
437    fn handle_create_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
438        let upper = sql.to_uppercase();
439
440        // 뷰 이름 / SQL 추출 (VIEW 키워드 이후)
441        let after_view = sql[upper.find("VIEW").unwrap() + 4..]
442            .trim_start()
443            .to_owned();
444        let after_upper = after_view.to_uppercase();
445
446        // "REFRESH EVERY <n> AS ..." 패턴 감지 (선택적)
447        let (refresh_interval_secs, body) = if let Some(re_pos) = after_upper.find("REFRESH EVERY")
448        {
449            // 이름은 REFRESH EVERY 이전
450            let name_part = after_view[..re_pos].trim().to_string();
451            let rest = after_view[re_pos + 13..].trim_start().to_string();
452            let as_pos = rest
453                .to_uppercase()
454                .find(" AS ")
455                .ok_or_else(|| DbxError::SqlParse {
456                    message: "CREATE MATERIALIZED VIEW ... REFRESH EVERY <n> AS <sql>".to_string(),
457                    sql: sql.to_string(),
458                })?;
459            let interval_str = rest[..as_pos].trim();
460            let interval_secs: u64 = interval_str.parse().map_err(|_| DbxError::SqlParse {
461                message: format!(
462                    "REFRESH EVERY requires integer seconds, got '{}'",
463                    interval_str
464                ),
465                sql: sql.to_string(),
466            })?;
467            let view_sql = rest[as_pos + 4..].trim().to_string();
468            (Some(interval_secs), (name_part, view_sql))
469        } else {
470            let as_pos = after_upper.find(" AS ").ok_or_else(|| DbxError::SqlParse {
471                message: "CREATE MATERIALIZED VIEW requires AS".to_string(),
472                sql: sql.to_string(),
473            })?;
474            let name = after_view[..as_pos].trim().to_string();
475            let view_sql = after_view[as_pos + 4..].trim().to_string();
476            (None, (name, view_sql))
477        };
478
479        self.mat_view_registry
480            .create(&body.0, &body.1, refresh_interval_secs)?;
481        self.one_row_affected_batch()
482    }
483
484    /// DROP MATERIALIZED VIEW <name>
485    fn handle_drop_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
486        let upper = sql.to_uppercase();
487        let name = sql[upper.find("VIEW").unwrap() + 4..].trim();
488        self.mat_view_registry.remove(name)?;
489        self.one_row_affected_batch()
490    }
491
492    /// REFRESH MATERIALIZED VIEW <name>
493    fn handle_refresh_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
494        let upper = sql.to_uppercase();
495        let name = sql[upper.find("VIEW").unwrap() + 4..].trim().to_lowercase();
496        let view_sql = self.mat_view_registry.get_sql(&name).ok_or_else(|| {
497            DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
498        })?;
499        // 뷰 쿼리 실행 후 캐시 저장
500        let batches = self.execute_sql(&view_sql)?;
501        self.mat_view_registry.set_cache(&name, batches)?;
502        self.one_row_affected_batch()
503    }
504
505    /// SELECT 쿼리에서 Materialized View 캐시 히트 확인
506    ///
507    /// "SELECT ... FROM <mv_name>" 패턴에서 FROM 다음 토큰이 등록된 MV이고
508    /// 캐시가 fresh하면 캐시를 반환합니다.
509    fn try_matview_cache(&self, sql: &str) -> Option<Vec<RecordBatch>> {
510        let upper = sql.to_uppercase();
511        let from_pos = upper.find(" FROM ")?;
512        let after_from = sql[from_pos + 6..].trim();
513        // 첫 번째 토큰 (공백/세미콜론/닫는 괄호 기준)
514        let name = after_from
515            .split(|c: char| c.is_whitespace() || c == ';' || c == ')')
516            .next()?;
517        if self.mat_view_registry.is_fresh(name) {
518            self.mat_view_registry.get_cache(name)
519        } else {
520            None
521        }
522    }
523
524    /// Helper: single-row `rows_affected = 1` result batch
525    fn one_row_affected_batch(&self) -> DbxResult<Vec<RecordBatch>> {
526        use arrow::array::Int64Array;
527        use arrow::datatypes::{DataType, Field, Schema};
528        let schema = Arc::new(Schema::new(vec![Field::new(
529            "rows_affected",
530            DataType::Int64,
531            false,
532        )]));
533        let array = Int64Array::from(vec![1_i64]);
534        let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
535        Ok(vec![batch])
536    }
537
538    /// Execute a physical plan against registered table data.
539    fn execute_physical_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
540        match plan {
541            PhysicalPlan::Insert {
542                table,
543                columns: _,
544                values,
545            } => {
546                // Track OLTP Workload
547                self.workload_analyzer
548                    .write()
549                    .unwrap()
550                    .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
551
552                // Execute INSERT: convert PhysicalExpr values to bytes and insert into Delta Store
553                let mut rows_inserted = 0;
554
555                for row_values in values {
556                    // For simplicity, use first column as key, rest as value
557                    // TODO: Proper schema-based serialization
558                    if row_values.is_empty() {
559                        continue;
560                    }
561
562                    // Extract key from first value
563                    let key = match &row_values[0] {
564                        PhysicalExpr::Literal(scalar) => {
565                            use crate::storage::columnar::ScalarValue;
566                            match scalar {
567                                ScalarValue::Utf8(s) => s.as_bytes().to_vec(),
568                                ScalarValue::Int32(i) => i.to_le_bytes().to_vec(),
569                                ScalarValue::Int64(i) => i.to_le_bytes().to_vec(),
570                                ScalarValue::Float64(f) => f.to_le_bytes().to_vec(),
571                                ScalarValue::Boolean(b) => vec![if *b { 1 } else { 0 }],
572                                // The following cases are not expected here, but are added as per instruction
573                                // They would typically be handled in a `build_operator` method or similar
574                                // where DDL/DML plans are distinguished from query plans.
575                                // For now, we'll place them here as a placeholder for the instruction.
576                                _ => {
577                                    return Err(DbxError::NotImplemented(
578                                        "Non-literal key in INSERT".to_string(),
579                                    ));
580                                }
581                            }
582                        }
583                        _ => {
584                            return Err(DbxError::NotImplemented(
585                                "Non-literal key in INSERT".to_string(),
586                            ));
587                        }
588                    };
589
590                    // Get or infer schema for Arrow IPC serialization
591                    let schema = {
592                        let schemas = self.table_schemas.read().unwrap();
593                        schemas.get(table.as_str()).cloned()
594                    }
595                    .unwrap_or_else(|| {
596                        // No registered schema: infer from row values
597                        Arc::new(
598                            infer_schema_from_values(row_values)
599                                .expect("Failed to infer schema from values"),
600                        )
601                    });
602
603                    // Always use Arrow IPC serialization
604                    let value_bytes = serialize_to_arrow_ipc(&schema, row_values)?;
605
606                    // ════════════════════════════════════════════
607                    // Phase 6: Sharding Scatter Write
608                    // ════════════════════════════════════════════
609                    self.scatter_gather.scatter_write(
610                        &key,
611                        &value_bytes,
612                        |shard_id, sub_key, sub_val| {
613                            let shard_table = format!("{}__shard_{}", table, shard_id);
614                            // Write to the specific shard virtual table (which will then be partitioned by crud.rs if PartitionMap exists)
615                            let _ = self.insert(&shard_table, sub_key, sub_val);
616                        },
617                    );
618
619                    rows_inserted += 1;
620                }
621
622                // Return result batch indicating success
623                use arrow::array::{Int64Array, RecordBatch};
624                use arrow::datatypes::{DataType, Field, Schema};
625
626                let schema = Arc::new(Schema::new(vec![Field::new(
627                    "rows_inserted",
628                    DataType::Int64,
629                    false,
630                )]));
631                let array = Int64Array::from(vec![rows_inserted as i64]);
632                let batch =
633                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
634
635                Ok(vec![batch])
636            }
637            PhysicalPlan::Update {
638                table,
639                assignments,
640                filter,
641            } => {
642                // Track OLTP Workload
643                self.workload_analyzer
644                    .write()
645                    .unwrap()
646                    .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
647
648                // Execute UPDATE: scan (across partitions), evaluate filter, update matching records
649                let target_tables = self.get_tables_to_scan(table, filter.as_ref());
650                let mut all_records = Vec::new();
651                for target_table in &target_tables {
652                    all_records.extend(self.scan(target_table)?);
653                }
654
655                let mut rows_updated = 0_i64;
656
657                // Build column name → index mapping from schema
658                let column_index_map = {
659                    let schemas = self.table_schemas.read().unwrap();
660                    schemas.get(table.as_str()).map(|schema| {
661                        schema
662                            .fields()
663                            .iter()
664                            .enumerate()
665                            .map(|(i, field)| (field.name().clone(), i))
666                            .collect::<std::collections::HashMap<String, usize>>()
667                    })
668                };
669
670                for (key, value_bytes) in all_records {
671                    // Reconstruct full record: prepend key (col 0) to value
672                    // INSERT stores row_values[1..] as JSON, so key must be re-added
673                    let full_record = reconstruct_full_record(&key, &value_bytes);
674                    let should_update = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
675
676                    if should_update {
677                        let mut current_values: Vec<serde_json::Value> =
678                            serde_json::from_slice(&full_record).unwrap_or_else(|_| vec![]);
679
680                        // Apply assignments using schema-based column mapping
681                        for (column_name, expr) in assignments.iter() {
682                            let target_idx = column_index_map
683                                .as_ref()
684                                .and_then(|map| map.get(column_name).copied())
685                                .unwrap_or_else(|| {
686                                    // Fallback: linear search by position
687                                    assignments
688                                        .iter()
689                                        .position(|(n, _)| n == column_name)
690                                        .unwrap_or(0)
691                                });
692
693                            if let PhysicalExpr::Literal(scalar) = expr {
694                                use crate::storage::columnar::ScalarValue;
695                                let new_value = match scalar {
696                                    ScalarValue::Utf8(s) => serde_json::Value::String(s.clone()),
697                                    ScalarValue::Int32(v) => serde_json::Value::Number((*v).into()),
698                                    ScalarValue::Int64(v) => serde_json::Value::Number((*v).into()),
699                                    ScalarValue::Float64(f) => serde_json::Number::from_f64(*f)
700                                        .map(serde_json::Value::Number)
701                                        .unwrap_or(serde_json::Value::Null),
702                                    ScalarValue::Boolean(b) => serde_json::Value::Bool(*b),
703                                    ScalarValue::Binary(b) => {
704                                        // Encode binary as base64 string
705                                        serde_json::Value::String(base64::Engine::encode(
706                                            &base64::engine::general_purpose::STANDARD,
707                                            b,
708                                        ))
709                                    }
710                                    ScalarValue::Null => serde_json::Value::Null,
711                                };
712
713                                if target_idx < current_values.len() {
714                                    current_values[target_idx] = new_value;
715                                }
716                            }
717                        }
718
719                        // Remove key (col 0) before serializing back to storage
720                        let storage_values = if current_values.len() > 1 {
721                            &current_values[1..]
722                        } else {
723                            &current_values[..]
724                        };
725                        let new_value_bytes = serde_json::to_vec(storage_values)
726                            .map_err(|e| DbxError::Serialization(e.to_string()))?;
727
728                        self.insert(table, &key, &new_value_bytes)?;
729                        rows_updated += 1;
730                    }
731                }
732
733                // Return result batch
734                use arrow::array::{Int64Array, RecordBatch};
735                use arrow::datatypes::{DataType, Field, Schema};
736
737                let schema = Arc::new(Schema::new(vec![Field::new(
738                    "rows_updated",
739                    DataType::Int64,
740                    false,
741                )]));
742                let array = Int64Array::from(vec![rows_updated]);
743                let batch =
744                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
745
746                Ok(vec![batch])
747            }
748            PhysicalPlan::Delete { table, filter } => {
749                // Track OLTP Workload
750                self.workload_analyzer
751                    .write()
752                    .unwrap()
753                    .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
754
755                // Execute DELETE: scan table (across partitions), evaluate filter, delete matching records
756
757                // Step 1: Scan all records from the target partition(s)
758                let target_tables = self.get_tables_to_scan(table, filter.as_ref());
759                let mut all_records = Vec::new();
760                for target_table in &target_tables {
761                    all_records.extend(self.scan(target_table)?);
762                }
763
764                let mut rows_deleted = 0_i64;
765
766                // Step 2: Process each record
767                for (key, value_bytes) in all_records {
768                    // Reconstruct full record: prepend key (col 0) to value
769                    let full_record = reconstruct_full_record(&key, &value_bytes);
770                    let should_delete = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
771
772                    if should_delete {
773                        // Delete from Delta Store
774                        self.delete(table, &key)?;
775                        rows_deleted += 1;
776                    }
777                }
778
779                // Return result batch
780                use arrow::array::{Int64Array, RecordBatch};
781                use arrow::datatypes::{DataType, Field, Schema};
782
783                let schema = Arc::new(Schema::new(vec![Field::new(
784                    "rows_deleted",
785                    DataType::Int64,
786                    false,
787                )]));
788                let array = Int64Array::from(vec![rows_deleted]);
789                let batch =
790                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
791
792                Ok(vec![batch])
793            }
794            PhysicalPlan::DropTable { table, if_exists } => {
795                // DROP TABLE implementation
796                use arrow::array::{Int64Array, RecordBatch};
797                use arrow::datatypes::{DataType, Field, Schema};
798
799                // Check if table exists
800                let exists = self.table_schemas.read().unwrap().contains_key(table);
801
802                if !exists && !if_exists {
803                    return Err(DbxError::TableNotFound(table.clone()));
804                }
805
806                if exists {
807                    // Remove table schema from memory
808                    self.table_schemas.write().unwrap().remove(table);
809
810                    // Delete schema from persistent storage
811                    self.wos_for_metadata().delete_schema_metadata(table)?;
812
813                    // Note: Data deletion from Delta Store/WOS would require
814                    // scanning and deleting all keys with table prefix
815                    // For now, we just remove the schema metadata
816                }
817
818                // Return success
819                let schema = Arc::new(Schema::new(vec![Field::new(
820                    "rows_affected",
821                    DataType::Int64,
822                    false,
823                )]));
824                let array = Int64Array::from(vec![1]);
825                let batch =
826                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
827
828                Ok(vec![batch])
829            }
830            PhysicalPlan::CreateTable {
831                table,
832                columns,
833                if_not_exists,
834            } => {
835                // CREATE TABLE implementation
836                use arrow::array::{Int64Array, RecordBatch};
837                use arrow::datatypes::{DataType, Field, Schema};
838
839                // Check if table already exists
840                let exists = self.table_schemas.read().unwrap().contains_key(table);
841
842                if exists && !if_not_exists {
843                    return Err(DbxError::Schema(format!(
844                        "Table '{}' already exists",
845                        table
846                    )));
847                }
848
849                if !exists {
850                    // Create Arrow schema from column definitions
851                    let fields: Vec<Field> = columns
852                        .iter()
853                        .map(|(name, type_str)| {
854                            let data_type = match type_str.to_uppercase().as_str() {
855                                "INT" | "INTEGER" => DataType::Int64,
856                                "TEXT" | "STRING" | "VARCHAR" => DataType::Utf8,
857                                "FLOAT" | "DOUBLE" => DataType::Float64,
858                                "BOOL" | "BOOLEAN" => DataType::Boolean,
859                                _ => DataType::Utf8, // Default to string
860                            };
861                            Field::new(name, data_type, true)
862                        })
863                        .collect();
864
865                    let schema = Arc::new(Schema::new(fields));
866
867                    // Store schema in memory
868                    self.table_schemas
869                        .write()
870                        .unwrap()
871                        .insert(table.clone(), schema.clone());
872
873                    // Persist schema to storage
874                    self.wos_for_metadata()
875                        .save_schema_metadata(table, &schema)?;
876                }
877
878                // Return success
879                let schema = Arc::new(Schema::new(vec![Field::new(
880                    "rows_affected",
881                    DataType::Int64,
882                    false,
883                )]));
884                let array = Int64Array::from(vec![1]);
885                let batch =
886                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
887
888                Ok(vec![batch])
889            }
890            PhysicalPlan::CreateIndex {
891                table,
892                index_name,
893                columns,
894                if_not_exists,
895            } => {
896                use arrow::array::{Int64Array, RecordBatch};
897                use arrow::datatypes::{DataType, Field, Schema};
898
899                // Validate: target table must exist
900                {
901                    let schemas = self.table_schemas.read().unwrap();
902                    if !schemas.contains_key(table.as_str()) {
903                        return Err(DbxError::Schema(format!(
904                            "Table '{}' does not exist",
905                            table
906                        )));
907                    }
908                }
909
910                let column = columns.first().ok_or_else(|| {
911                    DbxError::Schema("CREATE INDEX requires at least one column".to_string())
912                })?;
913
914                let exists = self.index.has_index(table, column);
915
916                if exists && !if_not_exists {
917                    return Err(DbxError::IndexAlreadyExists {
918                        table: table.clone(),
919                        column: column.clone(),
920                    });
921                }
922
923                if !exists {
924                    self.index.create_index(table, column)?;
925
926                    // Register index_name → (table, column) mapping in memory
927                    self.index_registry
928                        .write()
929                        .unwrap()
930                        .insert(index_name.clone(), (table.clone(), column.clone()));
931
932                    // Persist index metadata to storage
933                    self.wos_for_metadata()
934                        .save_index_metadata(index_name, table, column)?;
935                }
936
937                let schema = Arc::new(Schema::new(vec![Field::new(
938                    "rows_affected",
939                    DataType::Int64,
940                    false,
941                )]));
942                let array = Int64Array::from(vec![1]);
943                let batch =
944                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
945
946                Ok(vec![batch])
947            }
948            PhysicalPlan::DropIndex {
949                table,
950                index_name,
951                if_exists,
952            } => {
953                use arrow::array::{Int64Array, RecordBatch};
954                use arrow::datatypes::{DataType, Field, Schema};
955
956                // Resolve index_name → actual column via registry
957                let resolved_column = {
958                    let registry = self.index_registry.read().unwrap();
959                    registry
960                        .get(index_name.as_str())
961                        .map(|(_, col)| col.clone())
962                };
963
964                // Fallback: if not in registry, try index_name as column name
965                let column = resolved_column.as_deref().unwrap_or(index_name.as_str());
966
967                let exists = self.index.has_index(table, column);
968
969                if !exists && !if_exists {
970                    return Err(DbxError::IndexNotFound {
971                        table: table.clone(),
972                        column: column.to_string(),
973                    });
974                }
975
976                if exists {
977                    self.index.drop_index(table, column)?;
978
979                    // Remove from registry in memory
980                    self.index_registry
981                        .write()
982                        .unwrap()
983                        .remove(index_name.as_str());
984
985                    // Delete index metadata from storage
986                    self.wos_for_metadata().delete_index_metadata(index_name)?;
987                }
988
989                let schema = Arc::new(Schema::new(vec![Field::new(
990                    "rows_affected",
991                    DataType::Int64,
992                    false,
993                )]));
994                let array = Int64Array::from(vec![1]);
995                let batch =
996                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
997
998                Ok(vec![batch])
999            }
1000            PhysicalPlan::AlterTable { table, operation } => {
1001                // ALTER TABLE implementation
1002                use crate::sql::planner::types::AlterTableOperation;
1003                use arrow::array::{Int64Array, RecordBatch};
1004                use arrow::datatypes::{DataType, Field, Schema};
1005
1006                match operation {
1007                    AlterTableOperation::AddColumn {
1008                        column_name,
1009                        data_type,
1010                    } => {
1011                        // Get current schema
1012                        let mut schemas = self.table_schemas.write().unwrap();
1013                        let current_schema = schemas.get(table).ok_or_else(|| {
1014                            DbxError::Schema(format!("Table '{}' not found", table))
1015                        })?;
1016
1017                        // Convert data type string to Arrow DataType
1018                        let arrow_type = match data_type.to_uppercase().as_str() {
1019                            "INT" | "INTEGER" => DataType::Int64,
1020                            "TEXT" | "VARCHAR" | "STRING" => DataType::Utf8,
1021                            "FLOAT" | "DOUBLE" | "REAL" => DataType::Float64,
1022                            "BOOL" | "BOOLEAN" => DataType::Boolean,
1023                            _ => DataType::Utf8, // Default to string
1024                        };
1025
1026                        // Create new field
1027                        let new_field = Field::new(column_name, arrow_type, true);
1028
1029                        // Create new schema with added column
1030                        let mut fields: Vec<Field> = current_schema
1031                            .fields()
1032                            .iter()
1033                            .map(|f| f.as_ref().clone())
1034                            .collect();
1035                        fields.push(new_field);
1036                        let new_schema = Arc::new(Schema::new(fields));
1037
1038                        // Update schema in memory
1039                        schemas.insert(table.clone(), new_schema.clone());
1040
1041                        // Persist updated schema
1042                        drop(schemas); // Release lock before calling wos
1043                        self.wos_for_metadata()
1044                            .save_schema_metadata(table, &new_schema)?;
1045                    }
1046                    AlterTableOperation::DropColumn { column_name } => {
1047                        // Get current schema
1048                        let mut schemas = self.table_schemas.write().unwrap();
1049                        let current_schema = schemas.get(table).ok_or_else(|| {
1050                            DbxError::Schema(format!("Table '{}' not found", table))
1051                        })?;
1052
1053                        // Find the column to drop
1054                        let fields: Vec<Field> = current_schema
1055                            .fields()
1056                            .iter()
1057                            .filter(|f| f.name() != column_name)
1058                            .map(|f| f.as_ref().clone())
1059                            .collect();
1060
1061                        // Check if column was found
1062                        if fields.len() == current_schema.fields().len() {
1063                            return Err(DbxError::Schema(format!(
1064                                "Column '{}' not found in table '{}'",
1065                                column_name, table
1066                            )));
1067                        }
1068
1069                        // Create new schema without the dropped column
1070                        let new_schema = Arc::new(Schema::new(fields));
1071
1072                        // Update schema in memory
1073                        schemas.insert(table.clone(), new_schema.clone());
1074
1075                        // Persist updated schema
1076                        drop(schemas); // Release lock
1077                        self.wos_for_metadata()
1078                            .save_schema_metadata(table, &new_schema)?;
1079                    }
1080                    AlterTableOperation::RenameColumn { old_name, new_name } => {
1081                        // Get current schema
1082                        let mut schemas = self.table_schemas.write().unwrap();
1083                        let current_schema = schemas.get(table).ok_or_else(|| {
1084                            DbxError::Schema(format!("Table '{}' not found", table))
1085                        })?;
1086
1087                        // Find and rename the column
1088                        let mut found = false;
1089                        let fields: Vec<Field> = current_schema
1090                            .fields()
1091                            .iter()
1092                            .map(|f| {
1093                                if f.name() == old_name {
1094                                    found = true;
1095                                    Field::new(new_name, f.data_type().clone(), f.is_nullable())
1096                                } else {
1097                                    f.as_ref().clone()
1098                                }
1099                            })
1100                            .collect();
1101
1102                        // Check if column was found
1103                        if !found {
1104                            return Err(DbxError::Schema(format!(
1105                                "Column '{}' not found in table '{}'",
1106                                old_name, table
1107                            )));
1108                        }
1109
1110                        // Create new schema with renamed column
1111                        let new_schema = Arc::new(Schema::new(fields));
1112
1113                        // Update schema in memory
1114                        schemas.insert(table.clone(), new_schema.clone());
1115
1116                        // Persist updated schema
1117                        drop(schemas); // Release lock
1118                        self.wos_for_metadata()
1119                            .save_schema_metadata(table, &new_schema)?;
1120                    }
1121                }
1122
1123                // Return success
1124                let schema = Arc::new(Schema::new(vec![Field::new(
1125                    "rows_affected",
1126                    DataType::Int64,
1127                    false,
1128                )]));
1129                let array = Int64Array::from(vec![1]);
1130                let batch =
1131                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
1132
1133                Ok(vec![batch])
1134            }
1135            _ => {
1136                // Original logic for SELECT queries
1137                self.execute_select_plan(plan)
1138            }
1139        }
1140    }
1141
1142    /// Execute SELECT query plans (original logic)
1143    fn execute_select_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
1144        // Automatic Tier Selection & Loading
1145        // Ensure all tables involved in the query are synced to Columnar Cache (Tier 2).
1146        for table in plan.tables() {
1147            let target_tables = self.get_tables_to_scan(&table, None);
1148            for t in target_tables {
1149                if !self.columnar_cache.has_table(&t) {
1150                    let _ = self.sync_columnar_cache(&t);
1151                }
1152            }
1153        }
1154
1155        let tables = self.tables.read().unwrap();
1156        let mut operator = self.build_operator(plan, &tables, &self.columnar_cache)?;
1157        Self::drain_operator(&mut *operator)
1158    }
1159
1160    /// Build an operator tree from a physical plan.
1161    fn build_operator(
1162        &self,
1163        plan: &PhysicalPlan,
1164        tables: &HashMap<String, Vec<RecordBatch>>,
1165        columnar_cache: &ColumnarCache,
1166    ) -> DbxResult<Box<dyn PhysicalOperator>> {
1167        match plan {
1168            PhysicalPlan::TableScan {
1169                table,
1170                projection,
1171                filter,
1172            } => {
1173                // Track OLAP Workload
1174                self.workload_analyzer
1175                    .write()
1176                    .unwrap()
1177                    .record(crate::engine::workload_analyzer::QueryPattern::RangeScan);
1178
1179                let mut filter_pushed_down = false;
1180                let target_tables = self.get_tables_to_scan(table, filter.as_ref());
1181
1182                let mut all_batches = Vec::new();
1183                let mut base_schema = None;
1184
1185                for t in target_tables {
1186                    // Try Columnar Cache first (with projection AND filter pushdown!)
1187                    let cached_results = if let Some(filter_expr) = filter {
1188                        let filter_expr_clone = filter_expr.clone();
1189                        // Use pushdown with filter
1190                        let result = columnar_cache.get_batches_with_filter(
1191                            &t,
1192                            if projection.is_empty() {
1193                                None
1194                            } else {
1195                                Some(projection)
1196                            },
1197                            move |batch| {
1198                                use crate::sql::executor::evaluate_expr;
1199                                use arrow::array::BooleanArray;
1200
1201                                let array = evaluate_expr(&filter_expr_clone, batch)?;
1202                                let boolean_array = array
1203                                    .as_any()
1204                                    .downcast_ref::<BooleanArray>()
1205                                    .ok_or_else(|| DbxError::TypeMismatch {
1206                                    expected: "BooleanArray".to_string(),
1207                                    actual: format!("{:?}", array.data_type()),
1208                                })?;
1209                                Ok(boolean_array.clone())
1210                            },
1211                        )?;
1212                        if result.is_some() {
1213                            filter_pushed_down = true;
1214                        }
1215                        result
1216                    } else {
1217                        columnar_cache.get_batches(
1218                            &t,
1219                            if projection.is_empty() {
1220                                None
1221                            } else {
1222                                Some(projection)
1223                            },
1224                        )?
1225                    };
1226
1227                    let (batches, schema, _) = if let Some(cached_batches) = cached_results {
1228                        if cached_batches.is_empty() {
1229                            // Table exists but is empty in cache.
1230                            // We need the schema to return an empty scan.
1231                            let schema = {
1232                                let schemas = self.table_schemas.read().unwrap();
1233                                schemas
1234                                    .get(table)
1235                                    .or_else(|| {
1236                                        let table_lower = table.to_lowercase();
1237                                        schemas
1238                                            .iter()
1239                                            .find(|(k, _)| k.to_lowercase() == table_lower)
1240                                            .map(|(_, v)| v)
1241                                    })
1242                                    .cloned()
1243                            }
1244                            .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1245                            (vec![], schema, projection.clone())
1246                        } else {
1247                            let schema = cached_batches[0].schema();
1248                            (cached_batches, schema, vec![])
1249                        }
1250                    } else {
1251                        // Try cache again
1252                        let cached_after_sync = columnar_cache.get_batches(
1253                            &t,
1254                            if projection.is_empty() {
1255                                None
1256                            } else {
1257                                Some(projection)
1258                            },
1259                        )?;
1260
1261                        if let Some(batches) = cached_after_sync {
1262                            if !batches.is_empty() {
1263                                let schema = batches[0].schema();
1264                                (batches, schema, vec![])
1265                            } else {
1266                                // Table exists but is empty
1267                                let schema = {
1268                                    let schemas = self.table_schemas.read().unwrap();
1269                                    schemas
1270                                        .get(table)
1271                                        .or_else(|| {
1272                                            let table_lower = table.to_lowercase();
1273                                            schemas
1274                                                .iter()
1275                                                .find(|(k, _)| k.to_lowercase() == table_lower)
1276                                                .map(|(_, v)| v)
1277                                        })
1278                                        .cloned()
1279                                }
1280                                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1281                                (vec![], schema, projection.clone())
1282                            }
1283                        } else {
1284                            // Not in cache, check HashMap fallback
1285                            let batches_opt = tables.get(&t).or_else(|| {
1286                                let table_lower = t.to_lowercase();
1287                                tables
1288                                    .iter()
1289                                    .find(|(k, _)| k.to_lowercase() == table_lower)
1290                                    .map(|(_, v)| v)
1291                            });
1292
1293                            if let Some(batches) = batches_opt {
1294                                if batches.is_empty() {
1295                                    let schema = {
1296                                        let schemas = self.table_schemas.read().unwrap();
1297                                        schemas
1298                                            .get(table)
1299                                            .or_else(|| {
1300                                                let table_lower = table.to_lowercase();
1301                                                schemas
1302                                                    .iter()
1303                                                    .find(|(k, _)| k.to_lowercase() == table_lower)
1304                                                    .map(|(_, v)| v)
1305                                            })
1306                                            .cloned()
1307                                    }
1308                                    .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1309                                    (vec![], schema, projection.clone())
1310                                } else {
1311                                    let schema = batches[0].schema();
1312                                    (batches.clone(), schema, projection.clone())
1313                                }
1314                            } else {
1315                                // Skip missing sub-table if it has no data yet
1316                                let schema = {
1317                                    let schemas = self.table_schemas.read().unwrap();
1318                                    schemas.get(table).cloned()
1319                                }
1320                                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1321                                (vec![], schema, projection.clone())
1322                            }
1323                        }
1324                    };
1325
1326                    if base_schema.is_none() {
1327                        base_schema = Some(schema);
1328                    }
1329                    all_batches.extend(batches);
1330                }
1331
1332                // Use the base schema from the loop, or try to look it up if all partitions were empty
1333                let final_schema = base_schema.unwrap_or_else(|| {
1334                    let schemas = self.table_schemas.read().unwrap();
1335                    schemas
1336                        .get(table)
1337                        .cloned()
1338                        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()))
1339                });
1340
1341                let projection_to_use = if projection.is_empty() {
1342                    vec![]
1343                } else {
1344                    projection.clone()
1345                };
1346                let mut scan =
1347                    TableScanOperator::new(table.clone(), final_schema, projection_to_use);
1348                scan.set_data(all_batches);
1349
1350                // Wrap with filter if needed AND NOT pushed down
1351                if let Some(filter_expr) = filter {
1352                    if !filter_pushed_down {
1353                        Ok(Box::new(FilterOperator::new(
1354                            Box::new(scan),
1355                            filter_expr.clone(),
1356                        )))
1357                    } else {
1358                        // Filter already applied in scan (via cache)
1359                        Ok(Box::new(scan))
1360                    }
1361                } else {
1362                    Ok(Box::new(scan))
1363                }
1364            }
1365
1366            PhysicalPlan::Projection {
1367                input,
1368                exprs,
1369                aliases,
1370            } => {
1371                let input_op = self.build_operator(input, tables, columnar_cache)?;
1372                use arrow::datatypes::Field;
1373
1374                let input_schema = input_op.schema();
1375                let fields: Vec<Field> = exprs
1376                    .iter()
1377                    .enumerate()
1378                    .map(|(i, expr)| {
1379                        let data_type = expr.get_type(input_schema);
1380                        let field_name = if let Some(Some(alias)) = aliases.get(i) {
1381                            alias.clone()
1382                        } else {
1383                            format!("col_{}", i)
1384                        };
1385                        Field::new(&field_name, data_type, true)
1386                    })
1387                    .collect();
1388
1389                let output_schema = Arc::new(Schema::new(fields));
1390                Ok(Box::new(ProjectionOperator::new(
1391                    input_op,
1392                    output_schema,
1393                    exprs.clone(),
1394                )))
1395            }
1396
1397            PhysicalPlan::Limit {
1398                input,
1399                count,
1400                offset,
1401            } => {
1402                let input_op = self.build_operator(input, tables, columnar_cache)?;
1403                Ok(Box::new(LimitOperator::new(input_op, *count, *offset)))
1404            }
1405
1406            PhysicalPlan::SortMerge { input, order_by } => {
1407                let input_op = self.build_operator(input, tables, columnar_cache)?;
1408                Ok(Box::new(SortOperator::new(input_op, order_by.clone())))
1409            }
1410
1411            PhysicalPlan::HashAggregate {
1412                input,
1413                group_by,
1414                aggregates,
1415            } => {
1416                // Track OLAP Workload
1417                self.workload_analyzer
1418                    .write()
1419                    .unwrap()
1420                    .record(crate::engine::workload_analyzer::QueryPattern::Aggregation);
1421
1422                let input_op = self.build_operator(input, tables, columnar_cache)?;
1423                let input_schema = input_op.schema();
1424                let mut fields = Vec::new();
1425
1426                for &idx in group_by {
1427                    fields.push(input_schema.field(idx).clone());
1428                }
1429
1430                for agg in aggregates {
1431                    use arrow::datatypes::DataType;
1432                    let data_type = match agg.function {
1433                        crate::sql::planner::AggregateFunction::Count => DataType::Int64,
1434                        crate::sql::planner::AggregateFunction::Sum
1435                        | crate::sql::planner::AggregateFunction::Avg
1436                        | crate::sql::planner::AggregateFunction::Min
1437                        | crate::sql::planner::AggregateFunction::Max => DataType::Float64,
1438                    };
1439                    let name = agg
1440                        .alias
1441                        .clone()
1442                        .unwrap_or_else(|| format!("agg_{:?}", agg.function));
1443                    fields.push(arrow::datatypes::Field::new(&name, data_type, true));
1444                }
1445
1446                let agg_schema = Arc::new(arrow::datatypes::Schema::new(fields));
1447                Ok(Box::new(
1448                    HashAggregateOperator::new(
1449                        input_op,
1450                        agg_schema,
1451                        group_by.clone(),
1452                        aggregates.clone(),
1453                    )
1454                    .with_gpu(self.gpu_manager.clone()),
1455                ))
1456            }
1457
1458            PhysicalPlan::HashJoin {
1459                left,
1460                right,
1461                on,
1462                join_type,
1463            } => {
1464                // Track OLAP Workload
1465                self.workload_analyzer
1466                    .write()
1467                    .unwrap()
1468                    .record(crate::engine::workload_analyzer::QueryPattern::Join);
1469
1470                use arrow::datatypes::Field;
1471
1472                let left_op = self.build_operator(left, tables, columnar_cache)?;
1473                let right_op = self.build_operator(right, tables, columnar_cache)?;
1474
1475                // Build joined schema: left columns + right columns
1476                let left_schema = left_op.schema();
1477                let right_schema = right_op.schema();
1478
1479                let mut joined_fields: Vec<Field> = Vec::new();
1480                for field in left_schema.fields().iter() {
1481                    let mut f = field.as_ref().clone();
1482                    // In RIGHT JOIN, left side can be null
1483                    if matches!(join_type, crate::sql::planner::JoinType::Right) {
1484                        f = f.with_nullable(true);
1485                    }
1486                    joined_fields.push(f);
1487                }
1488                for field in right_schema.fields().iter() {
1489                    let mut f = field.as_ref().clone();
1490                    // In LEFT JOIN, right side can be null
1491                    if matches!(join_type, crate::sql::planner::JoinType::Left) {
1492                        f = f.with_nullable(true);
1493                    }
1494                    joined_fields.push(f);
1495                }
1496
1497                let joined_schema = Arc::new(Schema::new(joined_fields));
1498
1499                Ok(Box::new(HashJoinOperator::new(
1500                    left_op,
1501                    right_op,
1502                    joined_schema,
1503                    on.clone(),
1504                    *join_type,
1505                )))
1506            }
1507
1508            PhysicalPlan::Insert { .. } => {
1509                // INSERT should be handled in execute_physical_plan, not here
1510                unreachable!("INSERT should not reach build_operator")
1511            }
1512            PhysicalPlan::Update { .. } => {
1513                // UPDATE should be handled in execute_physical_plan, not here
1514                unreachable!("UPDATE should not reach build_operator")
1515            }
1516            PhysicalPlan::Delete { .. } => {
1517                // DELETE should be handled in execute_physical_plan, not here
1518                unreachable!("DELETE should not reach build_operator")
1519            }
1520            PhysicalPlan::DropTable { .. } => {
1521                // DROP TABLE should be handled in execute_physical_plan, not here
1522                unreachable!("DROP TABLE should not reach build_operator")
1523            }
1524            PhysicalPlan::CreateTable { .. } => {
1525                // CREATE TABLE should be handled in execute_physical_plan, not here
1526                unreachable!("CREATE TABLE should not reach build_operator")
1527            }
1528            PhysicalPlan::CreateIndex { .. } => {
1529                // CREATE INDEX should be handled in execute_physical_plan, not here
1530                unreachable!("CREATE INDEX should not reach build_operator")
1531            }
1532            PhysicalPlan::DropIndex { .. } => {
1533                // DROP INDEX should be handled in execute_physical_plan, not here
1534                unreachable!("DROP INDEX should not reach build_operator")
1535            }
1536            PhysicalPlan::AlterTable { .. } => {
1537                // ALTER TABLE should be handled in execute_physical_plan, not here
1538                unreachable!("ALTER TABLE should not reach build_operator")
1539            }
1540            PhysicalPlan::CreateFunction { .. } => {
1541                unreachable!("CREATE FUNCTION should not reach build_operator")
1542            }
1543            PhysicalPlan::CreateTrigger { .. } => {
1544                unreachable!("CREATE TRIGGER should not reach build_operator")
1545            }
1546            PhysicalPlan::CreateJob { .. } => {
1547                unreachable!("CREATE JOB should not reach build_operator")
1548            }
1549            PhysicalPlan::DropFunction { .. } => {
1550                unreachable!("DROP FUNCTION should not reach build_operator")
1551            }
1552            PhysicalPlan::DropTrigger { .. } => {
1553                unreachable!("DROP TRIGGER should not reach build_operator")
1554            }
1555            PhysicalPlan::DropJob { .. } => {
1556                unreachable!("DROP JOB should not reach build_operator")
1557            }
1558        }
1559    }
1560
1561    /// Drain all batches from an operator.
1562    fn drain_operator(op: &mut dyn PhysicalOperator) -> DbxResult<Vec<RecordBatch>> {
1563        let mut results = Vec::new();
1564        while let Some(batch) = op.next()? {
1565            if batch.num_rows() > 0 {
1566                results.push(batch);
1567            }
1568        }
1569        Ok(results)
1570    }
1571}
1572
1573// ════════════════════════════════════════════
1574// DatabaseSql Trait Implementation
1575// ════════════════════════════════════════════
1576
1577impl crate::traits::DatabaseSql for Database {
1578    fn execute_sql(&self, sql: &str) -> DbxResult<Vec<arrow::record_batch::RecordBatch>> {
1579        // Reuse existing implementation
1580        Database::execute_sql(self, sql)
1581    }
1582
1583    fn register_table(&self, name: &str, batches: Vec<arrow::record_batch::RecordBatch>) {
1584        // Reuse existing implementation
1585        Database::register_table(self, name, batches)
1586    }
1587
1588    fn append_batch(&self, table: &str, batch: arrow::record_batch::RecordBatch) -> DbxResult<()> {
1589        // Reuse existing implementation
1590        Database::append_batch(self, table, batch);
1591        Ok(())
1592    }
1593}