Skip to main content

dbx_core/sql/
interface.rs

1//! SQL Execution Pipeline — SQL query execution methods
2
3use crate::engine::Database;
4use crate::sql::StringCaseExt;
5
6use crate::error::{DbxError, DbxResult};
7use crate::sql::executor::{
8    FilterOperator, HashAggregateOperator, HashJoinOperator, LimitOperator, PhysicalOperator,
9    ProjectionOperator, SortOperator, TableScanOperator,
10};
11use crate::sql::planner::{LogicalPlanner, PhysicalExpr, PhysicalPlan, PhysicalPlanner};
12use crate::storage::columnar_cache::ColumnarCache;
13use arrow::array::RecordBatch;
14use arrow::datatypes::Schema;
15use arrow::ipc::writer::StreamWriter;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19// ════════════════════════════════════════════
20// Arrow IPC Serialization
21// ════════════════════════════════════════════
22
23/// Infer Arrow schema from row values
24fn infer_schema_from_values(values: &[PhysicalExpr]) -> DbxResult<Schema> {
25    use crate::storage::columnar::ScalarValue;
26    use arrow::datatypes::{DataType, Field};
27
28    let fields: Vec<Field> = values
29        .iter()
30        .enumerate()
31        .map(|(i, expr)| {
32            let name = format!("col_{}", i);
33            let data_type = match expr {
34                PhysicalExpr::Literal(scalar) => match scalar {
35                    ScalarValue::Int32(_) => DataType::Int32,
36                    ScalarValue::Int64(_) => DataType::Int64,
37                    ScalarValue::Float64(_) => DataType::Float64,
38                    ScalarValue::Utf8(_) => DataType::Utf8,
39                    ScalarValue::Boolean(_) => DataType::Boolean,
40                    ScalarValue::Binary(_) => DataType::Binary,
41                    ScalarValue::Null => DataType::Utf8, // Default to string for nulls
42                },
43                _ => DataType::Utf8, // Fallback to string
44            };
45            Field::new(name, data_type, true) // All fields nullable
46        })
47        .collect();
48
49    Ok(Schema::new(fields))
50}
51
52/// Serialize row values to Arrow IPC format
53fn serialize_to_arrow_ipc(schema: &Schema, row_values: &[PhysicalExpr]) -> DbxResult<Vec<u8>> {
54    use crate::storage::columnar::ScalarValue;
55    use arrow::array::*;
56
57    // Build arrays for each field
58    let mut arrays: Vec<ArrayRef> = Vec::new();
59
60    for (field, expr) in schema.fields().iter().zip(row_values) {
61        let array: ArrayRef = match expr {
62            PhysicalExpr::Literal(scalar) => {
63                match (field.data_type(), scalar) {
64                    (arrow::datatypes::DataType::Int32, ScalarValue::Int32(v)) => {
65                        Arc::new(Int32Array::from(vec![*v]))
66                    }
67                    (arrow::datatypes::DataType::Int64, ScalarValue::Int64(v)) => {
68                        Arc::new(Int64Array::from(vec![*v]))
69                    }
70                    // Auto-convert Int32 to Int64
71                    (arrow::datatypes::DataType::Int64, ScalarValue::Int32(v)) => {
72                        Arc::new(Int64Array::from(vec![*v as i64]))
73                    }
74                    (arrow::datatypes::DataType::Float64, ScalarValue::Float64(v)) => {
75                        Arc::new(Float64Array::from(vec![*v]))
76                    }
77                    (arrow::datatypes::DataType::Utf8, ScalarValue::Utf8(s)) => {
78                        Arc::new(StringArray::from(vec![s.as_str()]))
79                    }
80                    (arrow::datatypes::DataType::Boolean, ScalarValue::Boolean(b)) => {
81                        Arc::new(BooleanArray::from(vec![*b]))
82                    }
83                    (arrow::datatypes::DataType::Binary, ScalarValue::Binary(b)) => {
84                        Arc::new(BinaryArray::from(vec![b.as_slice()]))
85                    }
86                    (_, ScalarValue::Null) => {
87                        // Create null array of appropriate type
88                        match field.data_type() {
89                            arrow::datatypes::DataType::Int32 => {
90                                Arc::new(Int32Array::from(vec![None as Option<i32>]))
91                            }
92                            arrow::datatypes::DataType::Int64 => {
93                                Arc::new(Int64Array::from(vec![None as Option<i64>]))
94                            }
95                            arrow::datatypes::DataType::Float64 => {
96                                Arc::new(Float64Array::from(vec![None as Option<f64>]))
97                            }
98                            arrow::datatypes::DataType::Utf8 => {
99                                Arc::new(StringArray::from(vec![None as Option<&str>]))
100                            }
101                            arrow::datatypes::DataType::Boolean => {
102                                Arc::new(BooleanArray::from(vec![None as Option<bool>]))
103                            }
104                            arrow::datatypes::DataType::Binary => {
105                                Arc::new(BinaryArray::from(vec![None as Option<&[u8]>]))
106                            }
107                            _ => {
108                                return Err(DbxError::NotImplemented(format!(
109                                    "Unsupported null type: {:?}",
110                                    field.data_type()
111                                )));
112                            }
113                        }
114                    }
115                    _ => {
116                        return Err(DbxError::NotImplemented(format!(
117                            "Type mismatch: field {:?} vs scalar {:?}",
118                            field.data_type(),
119                            scalar
120                        )));
121                    }
122                }
123            }
124            _ => {
125                return Err(DbxError::NotImplemented(
126                    "Non-literal value in INSERT".to_string(),
127                ));
128            }
129        };
130        arrays.push(array);
131    }
132
133    // Create RecordBatch
134    let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays)
135        .map_err(|e| DbxError::Storage(e.to_string()))?;
136
137    // Serialize to Arrow IPC Stream format
138    let mut buffer = Vec::new();
139    {
140        let mut writer = StreamWriter::try_new(&mut buffer, schema)
141            .map_err(|e| DbxError::Serialization(e.to_string()))?;
142        writer
143            .write(&batch)
144            .map_err(|e| DbxError::Serialization(e.to_string()))?;
145        writer
146            .finish()
147            .map_err(|e| DbxError::Serialization(e.to_string()))?;
148    }
149
150    Ok(buffer)
151}
152
153// ════════════════════════════════════════════
154// Helper Functions for WHERE Clause Evaluation
155// ════════════════════════════════════════════
156
157/// Reconstruct a full record by prepending the key (col 0) to the stored value JSON.
158/// INSERT stores `row_values[1..]` as JSON, excluding the key column.
159/// This function re-adds the key as the first element so that schema column indices
160/// align correctly with the data during filter evaluation.
161fn reconstruct_full_record(key: &[u8], value_bytes: &[u8]) -> Vec<u8> {
162    // Try to interpret the key as an integer (le_bytes from INSERT)
163    let key_json = if key.len() == 4 {
164        let val = i32::from_le_bytes(key.try_into().unwrap_or([0; 4]));
165        serde_json::Value::Number(val.into())
166    } else if key.len() == 8 {
167        let val = i64::from_le_bytes(key.try_into().unwrap_or([0; 8]));
168        serde_json::Value::Number(val.into())
169    } else {
170        // Treat as UTF-8 string key
171        let s = String::from_utf8_lossy(key).to_string();
172        serde_json::Value::String(s)
173    };
174
175    // Parse existing values and prepend key
176    let mut values: Vec<serde_json::Value> =
177        serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
178    values.insert(0, key_json);
179    serde_json::to_vec(&values).unwrap_or_else(|_| value_bytes.to_vec())
180}
181
182/// Convert a single JSON record to a RecordBatch for filter evaluation
183pub fn json_record_to_batch(value_bytes: &[u8]) -> DbxResult<RecordBatch> {
184    use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray};
185    use arrow::datatypes::{DataType, Field, Schema};
186
187    let current_values: Vec<serde_json::Value> =
188        serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
189
190    let mut fields = Vec::new();
191    let mut columns: Vec<ArrayRef> = Vec::new();
192
193    for (i, val) in current_values.iter().enumerate() {
194        match val {
195            serde_json::Value::Number(n) => {
196                if n.is_i64() {
197                    fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
198                    columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
199                } else if n.is_f64() {
200                    fields.push(Field::new(format!("col_{}", i), DataType::Float64, true));
201                    columns.push(Arc::new(Float64Array::from(vec![n.as_f64()])));
202                } else {
203                    fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
204                    columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
205                }
206            }
207            serde_json::Value::String(s) => {
208                fields.push(Field::new(format!("col_{}", i), DataType::Utf8, true));
209                columns.push(Arc::new(StringArray::from(vec![Some(s.as_str())])));
210            }
211            serde_json::Value::Bool(b) => {
212                fields.push(Field::new(format!("col_{}", i), DataType::Boolean, true));
213                columns.push(Arc::new(BooleanArray::from(vec![Some(*b)])));
214            }
215            serde_json::Value::Null => {
216                fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
217                columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
218            }
219            _ => {
220                fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
221                columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
222            }
223        }
224    }
225
226    let schema = Arc::new(Schema::new(fields));
227    RecordBatch::try_new(schema, columns).map_err(DbxError::from)
228}
229
230/// Evaluate a filter expression for a single record
231/// Returns true if the record matches the filter (or if no filter is provided)
232fn evaluate_filter_for_record(
233    filter_expr: Option<&PhysicalExpr>,
234    value_bytes: &[u8],
235) -> DbxResult<bool> {
236    if let Some(expr) = filter_expr {
237        let batch = json_record_to_batch(value_bytes)?;
238
239        use crate::sql::executor::evaluate_expr;
240        let result = evaluate_expr(expr, &batch)?;
241
242        use arrow::array::BooleanArray;
243        let bool_array = result
244            .as_any()
245            .downcast_ref::<BooleanArray>()
246            .ok_or_else(|| DbxError::TypeMismatch {
247                expected: "BooleanArray".to_string(),
248                actual: format!("{:?}", result.data_type()),
249            })?;
250
251        Ok(bool_array.value(0))
252    } else {
253        Ok(true) // No filter, all records match
254    }
255}
256
257impl Database {
258    // ════════════════════════════════════════════
259    // Phase 3: 파티셔닝 (Partition Pruning Helper)
260    // ════════════════════════════════════════════
261
262    /// 파티셔닝 라우팅 리스트를 가져옵니다.
263    /// 조건이 없으면 모든 파티션을 스캔, 조건이 있으면 (향후) 필요한 파티션만 스캔합니다.
264    pub(crate) fn get_tables_to_scan(
265        &self,
266        table: &str,
267        _filter: Option<&PhysicalExpr>,
268    ) -> Vec<String> {
269        let maps = self.partition_maps.read().unwrap();
270        if let Some(map) = maps.get(table) {
271            // MVP: 조건절을 분석하여 특정 파티션만 추출하는 프루닝 로직 대신 전체 파티션을 반환.
272            // TODO: Extract PartitionValue from _filter and prune.
273            map.all_partitions()
274        } else {
275            // 파티션 매핑이 없으면 셔딩 라우터에 쓰인 모든 샤드 가상 테이블을 조회
276            // ════════════════════════════════════════════
277            // Phase 6: Sharding Scatter Read (Scatter-Gather)
278            // ════════════════════════════════════════════
279            self.sharding_router
280                .all_shards()
281                .iter()
282                .map(|shard| format!("{}__shard_{}", table, shard.id))
283                .collect()
284        }
285    }
286
287    // ════════════════════════════════════════════
288    // SQL Execution Pipeline
289    // ════════════════════════════════════════════
290
291    /// Register table data for SQL queries.
292    ///
293    /// Tables registered here can be queried via `execute_sql()`.
294    pub fn register_table(&self, name: &str, batches: Vec<RecordBatch>) {
295        // Store schema from first batch
296        if let Some(first_batch) = batches.first() {
297            let schema = first_batch.schema();
298            let mut schemas = self.table_schemas.write().unwrap();
299            schemas.insert(name.to_string(), schema);
300        }
301
302        // Store batches
303        let mut tables = self.tables.write().unwrap();
304        tables.insert(name.to_string(), batches);
305    }
306
307    /// Append a RecordBatch to an existing registered table.
308    pub fn append_batch(&self, table: &str, batch: RecordBatch) {
309        let mut tables = self.tables.write().unwrap();
310        tables.entry(table.to_string()).or_default().push(batch);
311    }
312
313    /// Execute a SQL query and return RecordBatch results.
314    ///
315    /// Full pipeline: Parse → LogicalPlan → Optimize → PhysicalPlan → Execute
316    ///
317    /// # Example
318    ///
319    /// ```rust
320    /// # use dbx_core::Database;
321    /// # fn main() -> dbx_core::DbxResult<()> {
322    /// let db = Database::open_in_memory()?;
323    /// // Register table data first, then:
324    /// // let batches = db.execute_sql("SELECT * FROM users WHERE age > 18")?;
325    /// # Ok(())
326    /// # }
327    /// ```
328    pub fn execute_sql(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
329        let t0 = std::time::Instant::now();
330
331        // Step 0: Intercept CREATE VIEW / DROP VIEW (before parser)
332        let sql_trimmed = sql.trim();
333
334        if sql_trimmed.starts_with_ignore_ascii_case("CREATE VIEW") {
335            return self.handle_create_view(sql_trimmed);
336        }
337        if sql_trimmed.starts_with_ignore_ascii_case("DROP VIEW") {
338            return self.handle_drop_view(sql_trimmed);
339        }
340
341        // Materialized View 인터셀트 (CREATE/DROP/REFRESH)
342        if sql_trimmed.starts_with_ignore_ascii_case("CREATE MATERIALIZED VIEW") {
343            return self.handle_create_materialized_view(sql_trimmed);
344        }
345        if sql_trimmed.starts_with_ignore_ascii_case("DROP MATERIALIZED VIEW") {
346            return self.handle_drop_materialized_view(sql_trimmed);
347        }
348        if sql_trimmed.starts_with_ignore_ascii_case("REFRESH MATERIALIZED VIEW") {
349            return self.handle_refresh_materialized_view(sql_trimmed);
350        }
351
352        // SELECT 시 Materialized View 캐시 히트 확인
353        if sql_trimmed.starts_with_ignore_ascii_case("SELECT")
354            && let Some(cached) = self.try_matview_cache(sql_trimmed)
355        {
356            return Ok(cached);
357        }
358
359        // Step 0b: Expand views in FROM clauses
360        let expanded = self.view_registry.expand(sql_trimmed);
361        let sql = expanded.as_str();
362
363        // Step 1: Parse SQL → AST
364        let statements = self.sql_parser.parse(sql)?;
365        if statements.is_empty() {
366            return Ok(vec![]);
367        }
368
369        // Step 2: Logical Plan
370        let planner = LogicalPlanner::new();
371        let logical_plan = planner.plan(&statements[0])?;
372
373        // Step 3: Optimize
374        let optimized = self.sql_optimizer.optimize(logical_plan)?;
375
376        // Step 4: Physical Plan
377        let physical_planner = PhysicalPlanner::new(Arc::clone(&self.table_schemas));
378        let physical_plan = physical_planner.plan(&optimized)?;
379
380        // Step 5: Execute
381        let result = self.execute_physical_plan(&physical_plan);
382
383        // ── Metrics ─────────────────────────────────────────────────
384        let elapsed_us = t0.elapsed().as_micros() as u64;
385        self.metrics.inc_sql_queries();
386        self.metrics.query_latency_us.observe(elapsed_us);
387
388        result
389    }
390
391    /// CREATE VIEW handler
392    fn handle_create_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
393        // Syntax: CREATE VIEW <name> AS <select_sql>
394        let upper = sql.to_uppercase();
395        let after_view = &sql[upper.find("VIEW").unwrap() + 4..]
396            .trim_start()
397            .to_owned();
398        let as_pos = after_view
399            .to_uppercase()
400            .find(" AS ")
401            .ok_or_else(|| DbxError::SqlParse {
402                message: "CREATE VIEW requires AS".to_string(),
403                sql: sql.to_string(),
404            })?;
405        let view_name = after_view[..as_pos].trim();
406        let view_sql = after_view[as_pos + 4..].trim();
407        self.view_registry.create(view_name, view_sql)?;
408        self.one_row_affected_batch()
409    }
410
411    /// DROP VIEW handler
412    fn handle_drop_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
413        // Syntax: DROP VIEW [IF EXISTS] <name>
414        let upper = sql.to_uppercase();
415        let after_view = sql[upper.find("VIEW").unwrap() + 4..]
416            .trim_start()
417            .to_owned();
418        let (if_exists, name_part) = if after_view.starts_with_ignore_ascii_case("IF EXISTS") {
419            (true, after_view[9..].trim_start().to_string())
420        } else {
421            (false, after_view.clone())
422        };
423        let view_name = name_part.trim();
424
425        if if_exists && !self.view_registry.exists(view_name) {
426            return self.one_row_affected_batch();
427        }
428
429        self.view_registry.drop(view_name)?;
430        self.one_row_affected_batch()
431    }
432
433    // ────────────────────────────────────────────────────────────────────────
434    // Materialized View Handlers
435    // ────────────────────────────────────────────────────────────────────────
436
437    /// CREATE MATERIALIZED VIEW <name> [REFRESH EVERY <secs>] AS <select_sql>
438    fn handle_create_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
439        let upper = sql.to_uppercase();
440
441        // 뷰 이름 / SQL 추출 (VIEW 키워드 이후)
442        let after_view = sql[upper.find("VIEW").unwrap() + 4..]
443            .trim_start()
444            .to_owned();
445        let after_upper = after_view.to_uppercase();
446
447        // "REFRESH EVERY <n> AS ..." 패턴 감지 (선택적)
448        let (refresh_interval_secs, body) = if let Some(re_pos) = after_upper.find("REFRESH EVERY")
449        {
450            // 이름은 REFRESH EVERY 이전
451            let name_part = after_view[..re_pos].trim().to_string();
452            let rest = after_view[re_pos + 13..].trim_start().to_string();
453            let as_pos = rest
454                .to_uppercase()
455                .find(" AS ")
456                .ok_or_else(|| DbxError::SqlParse {
457                    message: "CREATE MATERIALIZED VIEW ... REFRESH EVERY <n> AS <sql>".to_string(),
458                    sql: sql.to_string(),
459                })?;
460            let interval_str = rest[..as_pos].trim();
461            let interval_secs: u64 = interval_str.parse().map_err(|_| DbxError::SqlParse {
462                message: format!(
463                    "REFRESH EVERY requires integer seconds, got '{}'",
464                    interval_str
465                ),
466                sql: sql.to_string(),
467            })?;
468            let view_sql = rest[as_pos + 4..].trim().to_string();
469            (Some(interval_secs), (name_part, view_sql))
470        } else {
471            let as_pos = after_upper.find(" AS ").ok_or_else(|| DbxError::SqlParse {
472                message: "CREATE MATERIALIZED VIEW requires AS".to_string(),
473                sql: sql.to_string(),
474            })?;
475            let name = after_view[..as_pos].trim().to_string();
476            let view_sql = after_view[as_pos + 4..].trim().to_string();
477            (None, (name, view_sql))
478        };
479
480        self.mat_view_registry
481            .create(&body.0, &body.1, refresh_interval_secs)?;
482        self.one_row_affected_batch()
483    }
484
485    /// DROP MATERIALIZED VIEW <name>
486    fn handle_drop_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
487        let upper = sql.to_uppercase();
488        let name = sql[upper.find("VIEW").unwrap() + 4..].trim();
489        self.mat_view_registry.remove(name)?;
490        self.one_row_affected_batch()
491    }
492
493    /// REFRESH MATERIALIZED VIEW <name>
494    fn handle_refresh_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
495        let upper = sql.to_uppercase();
496        let name = sql[upper.find("VIEW").unwrap() + 4..].trim().to_lowercase();
497        let view_sql = self.mat_view_registry.get_sql(&name).ok_or_else(|| {
498            DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
499        })?;
500        // 뷰 쿼리 실행 후 캐시 저장
501        let batches = self.execute_sql(&view_sql)?;
502        self.mat_view_registry.set_cache(&name, batches)?;
503        self.one_row_affected_batch()
504    }
505
506    /// SELECT 쿼리에서 Materialized View 캐시 히트 확인
507    ///
508    /// "SELECT ... FROM <mv_name>" 패턴에서 FROM 다음 토큰이 등록된 MV이고
509    /// 캐시가 fresh하면 캐시를 반환합니다.
510    fn try_matview_cache(&self, sql: &str) -> Option<Vec<RecordBatch>> {
511        let upper = sql.to_uppercase();
512        let from_pos = upper.find(" FROM ")?;
513        let after_from = sql[from_pos + 6..].trim();
514        // 첫 번째 토큰 (공백/세미콜론/닫는 괄호 기준)
515        let name = after_from
516            .split(|c: char| c.is_whitespace() || c == ';' || c == ')')
517            .next()?;
518        if self.mat_view_registry.is_fresh(name) {
519            self.mat_view_registry.get_cache(name)
520        } else {
521            None
522        }
523    }
524
525    /// Helper: single-row `rows_affected = 1` result batch
526    fn one_row_affected_batch(&self) -> DbxResult<Vec<RecordBatch>> {
527        use arrow::array::Int64Array;
528        use arrow::datatypes::{DataType, Field, Schema};
529        let schema = Arc::new(Schema::new(vec![Field::new(
530            "rows_affected",
531            DataType::Int64,
532            false,
533        )]));
534        let array = Int64Array::from(vec![1_i64]);
535        let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
536        Ok(vec![batch])
537    }
538
539    /// Execute a physical plan against registered table data.
540    fn execute_physical_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
541        match plan {
542            PhysicalPlan::Insert {
543                table,
544                columns: _,
545                values,
546            } => {
547                // Track OLTP Workload
548                self.workload_analyzer
549                    .write()
550                    .unwrap()
551                    .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
552
553                // Execute INSERT: convert PhysicalExpr values to bytes and insert into Delta Store
554                let mut rows_inserted = 0;
555
556                for row_values in values {
557                    // For simplicity, use first column as key, rest as value
558                    // TODO: Proper schema-based serialization
559                    if row_values.is_empty() {
560                        continue;
561                    }
562
563                    // Extract key from first value
564                    let key = match &row_values[0] {
565                        PhysicalExpr::Literal(scalar) => {
566                            use crate::storage::columnar::ScalarValue;
567                            match scalar {
568                                ScalarValue::Utf8(s) => s.as_bytes().to_vec(),
569                                ScalarValue::Int32(i) => i.to_le_bytes().to_vec(),
570                                ScalarValue::Int64(i) => i.to_le_bytes().to_vec(),
571                                ScalarValue::Float64(f) => f.to_le_bytes().to_vec(),
572                                ScalarValue::Boolean(b) => vec![if *b { 1 } else { 0 }],
573                                // The following cases are not expected here, but are added as per instruction
574                                // They would typically be handled in a `build_operator` method or similar
575                                // where DDL/DML plans are distinguished from query plans.
576                                // For now, we'll place them here as a placeholder for the instruction.
577                                _ => {
578                                    return Err(DbxError::NotImplemented(
579                                        "Non-literal key in INSERT".to_string(),
580                                    ));
581                                }
582                            }
583                        }
584                        _ => {
585                            return Err(DbxError::NotImplemented(
586                                "Non-literal key in INSERT".to_string(),
587                            ));
588                        }
589                    };
590
591                    // Get or infer schema for Arrow IPC serialization
592                    let schema = {
593                        let schemas = self.table_schemas.read().unwrap();
594                        schemas.get(table.as_str()).cloned()
595                    }
596                    .unwrap_or_else(|| {
597                        // No registered schema: infer from row values
598                        Arc::new(
599                            infer_schema_from_values(row_values)
600                                .expect("Failed to infer schema from values"),
601                        )
602                    });
603
604                    // Always use Arrow IPC serialization
605                    let value_bytes = serialize_to_arrow_ipc(&schema, row_values)?;
606
607                    // ════════════════════════════════════════════
608                    // Phase 6: Sharding Scatter Write
609                    // ════════════════════════════════════════════
610                    self.scatter_gather.scatter_write(
611                        &key,
612                        &value_bytes,
613                        |shard_id, sub_key, sub_val| {
614                            let shard_table = format!("{}__shard_{}", table, shard_id);
615                            // Write to the specific shard virtual table (which will then be partitioned by crud.rs if PartitionMap exists)
616                            let _ = self.insert(&shard_table, sub_key, sub_val);
617                        },
618                    );
619
620                    rows_inserted += 1;
621                }
622
623                // Return result batch indicating success
624                use arrow::array::{Int64Array, RecordBatch};
625                use arrow::datatypes::{DataType, Field, Schema};
626
627                let schema = Arc::new(Schema::new(vec![Field::new(
628                    "rows_inserted",
629                    DataType::Int64,
630                    false,
631                )]));
632                let array = Int64Array::from(vec![rows_inserted as i64]);
633                let batch =
634                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
635
636                Ok(vec![batch])
637            }
638            PhysicalPlan::Update {
639                table,
640                assignments,
641                filter,
642            } => {
643                // Track OLTP Workload
644                self.workload_analyzer
645                    .write()
646                    .unwrap()
647                    .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
648
649                // Execute UPDATE: scan (across partitions), evaluate filter, update matching records
650                let target_tables = self.get_tables_to_scan(table, filter.as_ref());
651                let mut all_records = Vec::new();
652                for target_table in &target_tables {
653                    all_records.extend(self.scan(target_table)?);
654                }
655
656                let mut rows_updated = 0_i64;
657
658                // Build column name → index mapping from schema
659                let column_index_map = {
660                    let schemas = self.table_schemas.read().unwrap();
661                    schemas.get(table.as_str()).map(|schema| {
662                        schema
663                            .fields()
664                            .iter()
665                            .enumerate()
666                            .map(|(i, field)| (field.name().clone(), i))
667                            .collect::<std::collections::HashMap<String, usize>>()
668                    })
669                };
670
671                for (key, value_bytes) in all_records {
672                    // Reconstruct full record: prepend key (col 0) to value
673                    // INSERT stores row_values[1..] as JSON, so key must be re-added
674                    let full_record = reconstruct_full_record(&key, &value_bytes);
675                    let should_update = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
676
677                    if should_update {
678                        let mut current_values: Vec<serde_json::Value> =
679                            serde_json::from_slice(&full_record).unwrap_or_else(|_| vec![]);
680
681                        // Apply assignments using schema-based column mapping
682                        for (column_name, expr) in assignments.iter() {
683                            let target_idx = column_index_map
684                                .as_ref()
685                                .and_then(|map| map.get(column_name).copied())
686                                .unwrap_or_else(|| {
687                                    // Fallback: linear search by position
688                                    assignments
689                                        .iter()
690                                        .position(|(n, _)| n == column_name)
691                                        .unwrap_or(0)
692                                });
693
694                            if let PhysicalExpr::Literal(scalar) = expr {
695                                use crate::storage::columnar::ScalarValue;
696                                let new_value = match scalar {
697                                    ScalarValue::Utf8(s) => serde_json::Value::String(s.clone()),
698                                    ScalarValue::Int32(v) => serde_json::Value::Number((*v).into()),
699                                    ScalarValue::Int64(v) => serde_json::Value::Number((*v).into()),
700                                    ScalarValue::Float64(f) => serde_json::Number::from_f64(*f)
701                                        .map(serde_json::Value::Number)
702                                        .unwrap_or(serde_json::Value::Null),
703                                    ScalarValue::Boolean(b) => serde_json::Value::Bool(*b),
704                                    ScalarValue::Binary(b) => {
705                                        // Encode binary as base64 string
706                                        serde_json::Value::String(base64::Engine::encode(
707                                            &base64::engine::general_purpose::STANDARD,
708                                            b,
709                                        ))
710                                    }
711                                    ScalarValue::Null => serde_json::Value::Null,
712                                };
713
714                                if target_idx < current_values.len() {
715                                    current_values[target_idx] = new_value;
716                                }
717                            }
718                        }
719
720                        // Remove key (col 0) before serializing back to storage
721                        let storage_values = if current_values.len() > 1 {
722                            &current_values[1..]
723                        } else {
724                            &current_values[..]
725                        };
726                        let new_value_bytes = serde_json::to_vec(storage_values)
727                            .map_err(|e| DbxError::Serialization(e.to_string()))?;
728
729                        self.insert(table, &key, &new_value_bytes)?;
730                        rows_updated += 1;
731                    }
732                }
733
734                // Return result batch
735                use arrow::array::{Int64Array, RecordBatch};
736                use arrow::datatypes::{DataType, Field, Schema};
737
738                let schema = Arc::new(Schema::new(vec![Field::new(
739                    "rows_updated",
740                    DataType::Int64,
741                    false,
742                )]));
743                let array = Int64Array::from(vec![rows_updated]);
744                let batch =
745                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
746
747                Ok(vec![batch])
748            }
749            PhysicalPlan::Delete { table, filter } => {
750                // Track OLTP Workload
751                self.workload_analyzer
752                    .write()
753                    .unwrap()
754                    .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
755
756                // Execute DELETE: scan table (across partitions), evaluate filter, delete matching records
757
758                // Step 1: Scan all records from the target partition(s)
759                let target_tables = self.get_tables_to_scan(table, filter.as_ref());
760                let mut all_records = Vec::new();
761                for target_table in &target_tables {
762                    all_records.extend(self.scan(target_table)?);
763                }
764
765                let mut rows_deleted = 0_i64;
766
767                // Step 2: Process each record
768                for (key, value_bytes) in all_records {
769                    // Reconstruct full record: prepend key (col 0) to value
770                    let full_record = reconstruct_full_record(&key, &value_bytes);
771                    let should_delete = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
772
773                    if should_delete {
774                        // Delete from Delta Store
775                        self.delete(table, &key)?;
776                        rows_deleted += 1;
777                    }
778                }
779
780                // Return result batch
781                use arrow::array::{Int64Array, RecordBatch};
782                use arrow::datatypes::{DataType, Field, Schema};
783
784                let schema = Arc::new(Schema::new(vec![Field::new(
785                    "rows_deleted",
786                    DataType::Int64,
787                    false,
788                )]));
789                let array = Int64Array::from(vec![rows_deleted]);
790                let batch =
791                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
792
793                Ok(vec![batch])
794            }
795            PhysicalPlan::DropTable { table, if_exists } => {
796                // DROP TABLE implementation
797                use arrow::array::{Int64Array, RecordBatch};
798                use arrow::datatypes::{DataType, Field, Schema};
799
800                // Check if table exists
801                let exists = self.table_schemas.read().unwrap().contains_key(table);
802
803                if !exists && !if_exists {
804                    return Err(DbxError::TableNotFound(table.clone()));
805                }
806
807                if exists {
808                    // Remove table schema from memory
809                    self.table_schemas.write().unwrap().remove(table);
810
811                    // Delete schema from persistent storage
812                    self.wos_for_metadata().delete_schema_metadata(table)?;
813
814                    // Note: Data deletion from Delta Store/WOS would require
815                    // scanning and deleting all keys with table prefix
816                    // For now, we just remove the schema metadata
817                }
818
819                // Return success
820                let schema = Arc::new(Schema::new(vec![Field::new(
821                    "rows_affected",
822                    DataType::Int64,
823                    false,
824                )]));
825                let array = Int64Array::from(vec![1]);
826                let batch =
827                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
828
829                Ok(vec![batch])
830            }
831            PhysicalPlan::CreateTable {
832                table,
833                columns,
834                if_not_exists,
835                policy,
836            } => {
837                // CREATE TABLE implementation
838                use arrow::array::{Int64Array, RecordBatch};
839                use arrow::datatypes::{DataType, Field, Schema};
840
841                // Check if table already exists
842                let exists = self.table_schemas.read().unwrap().contains_key(table);
843
844                if exists && !if_not_exists {
845                    return Err(DbxError::Schema(format!(
846                        "Table '{}' already exists",
847                        table
848                    )));
849                }
850
851                if !exists {
852                    // Create Arrow schema from column definitions
853                    let fields: Vec<Field> = columns
854                        .iter()
855                        .map(|(name, type_str)| {
856                            let data_type = match type_str.to_uppercase().as_str() {
857                                "INT" | "INTEGER" => DataType::Int64,
858                                "TEXT" | "STRING" | "VARCHAR" => DataType::Utf8,
859                                "FLOAT" | "DOUBLE" => DataType::Float64,
860                                "BOOL" | "BOOLEAN" => DataType::Boolean,
861                                _ => DataType::Utf8, // Default to string
862                            };
863                            Field::new(name, data_type, true)
864                        })
865                        .collect();
866
867                    let mut schema = Schema::new(fields);
868
869                    // IF Policy exists, embed as JSON into Arrow schema metadata
870                    if let Some(p) = policy
871                        && let Ok(json) = p.to_json()
872                    {
873                        let mut map = std::collections::HashMap::new();
874                        map.insert("dbx_table_policy".to_string(), json);
875                        schema = schema.with_metadata(map);
876                    }
877
878                    let schema = Arc::new(schema);
879
880                    // Store schema in memory
881                    self.table_schemas
882                        .write()
883                        .unwrap()
884                        .insert(table.clone(), schema.clone());
885
886                    // Persist schema to storage
887                    self.wos_for_metadata()
888                        .save_schema_metadata(table, &schema)?;
889                }
890
891                // Return success
892                let schema = Arc::new(Schema::new(vec![Field::new(
893                    "rows_affected",
894                    DataType::Int64,
895                    false,
896                )]));
897                let array = Int64Array::from(vec![1]);
898                let batch =
899                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
900
901                Ok(vec![batch])
902            }
903            PhysicalPlan::CreateIndex {
904                table,
905                index_name,
906                columns,
907                if_not_exists,
908            } => {
909                use arrow::array::{Int64Array, RecordBatch};
910                use arrow::datatypes::{DataType, Field, Schema};
911
912                // Validate: target table must exist
913                {
914                    let schemas = self.table_schemas.read().unwrap();
915                    if !schemas.contains_key(table.as_str()) {
916                        return Err(DbxError::Schema(format!(
917                            "Table '{}' does not exist",
918                            table
919                        )));
920                    }
921                }
922
923                let column = columns.first().ok_or_else(|| {
924                    DbxError::Schema("CREATE INDEX requires at least one column".to_string())
925                })?;
926
927                let exists = self.index.has_index(table, column);
928
929                if exists && !if_not_exists {
930                    return Err(DbxError::IndexAlreadyExists {
931                        table: table.clone(),
932                        column: column.clone(),
933                    });
934                }
935
936                if !exists {
937                    self.index.create_index(table, column)?;
938
939                    // Register index_name → (table, column) mapping in memory
940                    self.index_registry
941                        .write()
942                        .unwrap()
943                        .insert(index_name.clone(), (table.clone(), column.clone()));
944
945                    // Persist index metadata to storage
946                    self.wos_for_metadata()
947                        .save_index_metadata(index_name, table, column)?;
948                }
949
950                let schema = Arc::new(Schema::new(vec![Field::new(
951                    "rows_affected",
952                    DataType::Int64,
953                    false,
954                )]));
955                let array = Int64Array::from(vec![1]);
956                let batch =
957                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
958
959                Ok(vec![batch])
960            }
961            PhysicalPlan::DropIndex {
962                table,
963                index_name,
964                if_exists,
965            } => {
966                use arrow::array::{Int64Array, RecordBatch};
967                use arrow::datatypes::{DataType, Field, Schema};
968
969                // Resolve index_name → actual column via registry
970                let resolved_column = {
971                    let registry = self.index_registry.read().unwrap();
972                    registry
973                        .get(index_name.as_str())
974                        .map(|(_, col)| col.clone())
975                };
976
977                // Fallback: if not in registry, try index_name as column name
978                let column = resolved_column.as_deref().unwrap_or(index_name.as_str());
979
980                let exists = self.index.has_index(table, column);
981
982                if !exists && !if_exists {
983                    return Err(DbxError::IndexNotFound {
984                        table: table.clone(),
985                        column: column.to_string(),
986                    });
987                }
988
989                if exists {
990                    self.index.drop_index(table, column)?;
991
992                    // Remove from registry in memory
993                    self.index_registry
994                        .write()
995                        .unwrap()
996                        .remove(index_name.as_str());
997
998                    // Delete index metadata from storage
999                    self.wos_for_metadata().delete_index_metadata(index_name)?;
1000                }
1001
1002                let schema = Arc::new(Schema::new(vec![Field::new(
1003                    "rows_affected",
1004                    DataType::Int64,
1005                    false,
1006                )]));
1007                let array = Int64Array::from(vec![1]);
1008                let batch =
1009                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
1010
1011                Ok(vec![batch])
1012            }
1013            PhysicalPlan::AlterTable { table, operation } => {
1014                // ALTER TABLE implementation
1015                use crate::sql::planner::types::AlterTableOperation;
1016                use arrow::array::{Int64Array, RecordBatch};
1017                use arrow::datatypes::{DataType, Field, Schema};
1018
1019                match operation {
1020                    AlterTableOperation::AddColumn {
1021                        column_name,
1022                        data_type,
1023                    } => {
1024                        // Get current schema
1025                        let mut schemas = self.table_schemas.write().unwrap();
1026                        let current_schema = schemas.get(table).ok_or_else(|| {
1027                            DbxError::Schema(format!("Table '{}' not found", table))
1028                        })?;
1029
1030                        // Convert data type string to Arrow DataType
1031                        let arrow_type = match data_type.to_uppercase().as_str() {
1032                            "INT" | "INTEGER" => DataType::Int64,
1033                            "TEXT" | "VARCHAR" | "STRING" => DataType::Utf8,
1034                            "FLOAT" | "DOUBLE" | "REAL" => DataType::Float64,
1035                            "BOOL" | "BOOLEAN" => DataType::Boolean,
1036                            _ => DataType::Utf8, // Default to string
1037                        };
1038
1039                        // Create new field
1040                        let new_field = Field::new(column_name, arrow_type, true);
1041
1042                        // Create new schema with added column
1043                        let mut fields: Vec<Field> = current_schema
1044                            .fields()
1045                            .iter()
1046                            .map(|f| f.as_ref().clone())
1047                            .collect();
1048                        fields.push(new_field);
1049                        let new_schema = Arc::new(Schema::new(fields));
1050
1051                        // Update schema in memory
1052                        schemas.insert(table.clone(), new_schema.clone());
1053
1054                        // Persist updated schema
1055                        drop(schemas); // Release lock before calling wos
1056                        self.wos_for_metadata()
1057                            .save_schema_metadata(table, &new_schema)?;
1058                    }
1059                    AlterTableOperation::DropColumn { column_name } => {
1060                        // Get current schema
1061                        let mut schemas = self.table_schemas.write().unwrap();
1062                        let current_schema = schemas.get(table).ok_or_else(|| {
1063                            DbxError::Schema(format!("Table '{}' not found", table))
1064                        })?;
1065
1066                        // Find the column to drop
1067                        let fields: Vec<Field> = current_schema
1068                            .fields()
1069                            .iter()
1070                            .filter(|f| f.name() != column_name)
1071                            .map(|f| f.as_ref().clone())
1072                            .collect();
1073
1074                        // Check if column was found
1075                        if fields.len() == current_schema.fields().len() {
1076                            return Err(DbxError::Schema(format!(
1077                                "Column '{}' not found in table '{}'",
1078                                column_name, table
1079                            )));
1080                        }
1081
1082                        // Create new schema without the dropped column
1083                        let new_schema = Arc::new(Schema::new(fields));
1084
1085                        // Update schema in memory
1086                        schemas.insert(table.clone(), new_schema.clone());
1087
1088                        // Persist updated schema
1089                        drop(schemas); // Release lock
1090                        self.wos_for_metadata()
1091                            .save_schema_metadata(table, &new_schema)?;
1092                    }
1093                    AlterTableOperation::RenameColumn { old_name, new_name } => {
1094                        // Get current schema
1095                        let mut schemas = self.table_schemas.write().unwrap();
1096                        let current_schema = schemas.get(table).ok_or_else(|| {
1097                            DbxError::Schema(format!("Table '{}' not found", table))
1098                        })?;
1099
1100                        // Find and rename the column
1101                        let mut found = false;
1102                        let fields: Vec<Field> = current_schema
1103                            .fields()
1104                            .iter()
1105                            .map(|f| {
1106                                if f.name() == old_name {
1107                                    found = true;
1108                                    Field::new(new_name, f.data_type().clone(), f.is_nullable())
1109                                } else {
1110                                    f.as_ref().clone()
1111                                }
1112                            })
1113                            .collect();
1114
1115                        // Check if column was found
1116                        if !found {
1117                            return Err(DbxError::Schema(format!(
1118                                "Column '{}' not found in table '{}'",
1119                                old_name, table
1120                            )));
1121                        }
1122
1123                        // Create new schema with renamed column
1124                        let new_schema = Arc::new(Schema::new(fields));
1125
1126                        // Update schema in memory
1127                        schemas.insert(table.clone(), new_schema.clone());
1128
1129                        // Persist updated schema
1130                        drop(schemas); // Release lock
1131                        self.wos_for_metadata()
1132                            .save_schema_metadata(table, &new_schema)?;
1133                    }
1134                }
1135
1136                // Return success
1137                let schema = Arc::new(Schema::new(vec![Field::new(
1138                    "rows_affected",
1139                    DataType::Int64,
1140                    false,
1141                )]));
1142                let array = Int64Array::from(vec![1]);
1143                let batch =
1144                    RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
1145
1146                Ok(vec![batch])
1147            }
1148            _ => {
1149                // Original logic for SELECT queries
1150                self.execute_select_plan(plan)
1151            }
1152        }
1153    }
1154
1155    /// Execute SELECT query plans (original logic)
1156    fn execute_select_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
1157        // Automatic Tier Selection & Loading
1158        // Ensure all tables involved in the query are synced to Columnar Cache (Tier 2).
1159        for table in plan.tables() {
1160            let target_tables = self.get_tables_to_scan(&table, None);
1161            for t in target_tables {
1162                if !self.columnar_cache.has_table(&t) {
1163                    let _ = self.sync_columnar_cache(&t);
1164                }
1165            }
1166        }
1167
1168        let tables = self.tables.read().unwrap();
1169        let mut operator = self.build_operator(plan, &tables, &self.columnar_cache)?;
1170        Self::drain_operator(&mut *operator)
1171    }
1172
1173    /// Build an operator tree from a physical plan.
1174    fn build_operator(
1175        &self,
1176        plan: &PhysicalPlan,
1177        tables: &HashMap<String, Vec<RecordBatch>>,
1178        columnar_cache: &ColumnarCache,
1179    ) -> DbxResult<Box<dyn PhysicalOperator>> {
1180        match plan {
1181            PhysicalPlan::TableScan {
1182                table,
1183                projection,
1184                filter,
1185                ros_files,
1186            } => {
1187                // Track OLAP Workload
1188                self.workload_analyzer
1189                    .write()
1190                    .unwrap()
1191                    .record(crate::engine::workload_analyzer::QueryPattern::RangeScan);
1192
1193                let mut filter_pushed_down = false;
1194                let target_tables = self.get_tables_to_scan(table, filter.as_ref());
1195
1196                let mut all_batches = Vec::new();
1197                let mut base_schema = None;
1198
1199                for t in target_tables {
1200                    // Try Columnar Cache first (with projection AND filter pushdown!)
1201                    let cached_results = if let Some(filter_expr) = filter {
1202                        let filter_expr_clone = filter_expr.clone();
1203                        // Use pushdown with filter
1204                        let result = columnar_cache.get_batches_with_filter(
1205                            &t,
1206                            if projection.is_empty() {
1207                                None
1208                            } else {
1209                                Some(projection)
1210                            },
1211                            move |batch| {
1212                                use crate::sql::executor::evaluate_expr;
1213                                use arrow::array::BooleanArray;
1214
1215                                let array = evaluate_expr(&filter_expr_clone, batch)?;
1216                                let boolean_array = array
1217                                    .as_any()
1218                                    .downcast_ref::<BooleanArray>()
1219                                    .ok_or_else(|| DbxError::TypeMismatch {
1220                                    expected: "BooleanArray".to_string(),
1221                                    actual: format!("{:?}", array.data_type()),
1222                                })?;
1223                                Ok(boolean_array.clone())
1224                            },
1225                        )?;
1226                        if result.is_some() {
1227                            filter_pushed_down = true;
1228                        }
1229                        result
1230                    } else {
1231                        columnar_cache.get_batches(
1232                            &t,
1233                            if projection.is_empty() {
1234                                None
1235                            } else {
1236                                Some(projection)
1237                            },
1238                        )?
1239                    };
1240
1241                    let (batches, schema, _) = if let Some(cached_batches) = cached_results {
1242                        if cached_batches.is_empty() {
1243                            // Table exists but is empty in cache.
1244                            // We need the schema to return an empty scan.
1245                            let schema = {
1246                                let schemas = self.table_schemas.read().unwrap();
1247                                schemas
1248                                    .get(table)
1249                                    .or_else(|| {
1250                                        let table_lower = table.to_lowercase();
1251                                        schemas
1252                                            .iter()
1253                                            .find(|(k, _)| k.to_lowercase() == table_lower)
1254                                            .map(|(_, v)| v)
1255                                    })
1256                                    .cloned()
1257                            }
1258                            .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1259                            (vec![], schema, projection.clone())
1260                        } else {
1261                            let schema = cached_batches[0].schema();
1262                            (cached_batches, schema, vec![])
1263                        }
1264                    } else {
1265                        // Try cache again
1266                        let cached_after_sync = columnar_cache.get_batches(
1267                            &t,
1268                            if projection.is_empty() {
1269                                None
1270                            } else {
1271                                Some(projection)
1272                            },
1273                        )?;
1274
1275                        if let Some(batches) = cached_after_sync {
1276                            if !batches.is_empty() {
1277                                let schema = batches[0].schema();
1278                                (batches, schema, vec![])
1279                            } else {
1280                                // Table exists but is empty
1281                                let schema = {
1282                                    let schemas = self.table_schemas.read().unwrap();
1283                                    schemas
1284                                        .get(table)
1285                                        .or_else(|| {
1286                                            let table_lower = table.to_lowercase();
1287                                            schemas
1288                                                .iter()
1289                                                .find(|(k, _)| k.to_lowercase() == table_lower)
1290                                                .map(|(_, v)| v)
1291                                        })
1292                                        .cloned()
1293                                }
1294                                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1295                                (vec![], schema, projection.clone())
1296                            }
1297                        } else {
1298                            // Not in cache, check HashMap fallback
1299                            let batches_opt = tables.get(&t).or_else(|| {
1300                                let table_lower = t.to_lowercase();
1301                                tables
1302                                    .iter()
1303                                    .find(|(k, _)| k.to_lowercase() == table_lower)
1304                                    .map(|(_, v)| v)
1305                            });
1306
1307                            if let Some(batches) = batches_opt {
1308                                if batches.is_empty() {
1309                                    let schema = {
1310                                        let schemas = self.table_schemas.read().unwrap();
1311                                        schemas
1312                                            .get(table)
1313                                            .or_else(|| {
1314                                                let table_lower = table.to_lowercase();
1315                                                schemas
1316                                                    .iter()
1317                                                    .find(|(k, _)| k.to_lowercase() == table_lower)
1318                                                    .map(|(_, v)| v)
1319                                            })
1320                                            .cloned()
1321                                    }
1322                                    .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1323                                    (vec![], schema, projection.clone())
1324                                } else {
1325                                    let schema = batches[0].schema();
1326                                    (batches.clone(), schema, projection.clone())
1327                                }
1328                            } else {
1329                                // Skip missing sub-table if it has no data yet
1330                                let schema = {
1331                                    let schemas = self.table_schemas.read().unwrap();
1332                                    schemas.get(table).cloned()
1333                                }
1334                                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1335                                (vec![], schema, projection.clone())
1336                            }
1337                        }
1338                    };
1339
1340                    if base_schema.is_none() {
1341                        base_schema = Some(schema);
1342                    }
1343                    all_batches.extend(batches);
1344                }
1345
1346                // ---------------------- PHASE 3: ROS (Parquet) Union (Optimized) ----------------------
1347                // [Phase 9] 최적화: 매 쿼리마다 read_dir을 수행하는 대신 PhysicalPlan에 전달된 ros_files 정보를 TableScanOperator가 처리하도록 위임함.
1348                // --------------------------------------------------------------------------
1349
1350                // Use the base schema from the loop, or try to look it up if all partitions were empty
1351                let final_schema = base_schema.unwrap_or_else(|| {
1352                    let schemas = self.table_schemas.read().unwrap();
1353                    schemas
1354                        .get(table)
1355                        .cloned()
1356                        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()))
1357                });
1358
1359                let projection_to_use = if projection.is_empty() {
1360                    vec![]
1361                } else {
1362                    projection.clone()
1363                };
1364                let mut scan =
1365                    TableScanOperator::new(table.clone(), final_schema, projection_to_use);
1366
1367                // ros_files가 채워져 있으면 파이프라인 티어 스캔,
1368                // 비어있으면 기존의 all_batches 통째 주입
1369                if !ros_files.is_empty() {
1370                    scan.start_tier_scan(all_batches, ros_files.clone());
1371                } else {
1372                    scan.set_data(all_batches);
1373                }
1374
1375                // Wrap with filter if needed AND NOT pushed down
1376                if let Some(filter_expr) = filter {
1377                    if !filter_pushed_down {
1378                        Ok(Box::new(FilterOperator::new(
1379                            Box::new(scan),
1380                            filter_expr.clone(),
1381                        )))
1382                    } else {
1383                        // Filter already applied in scan (via cache)
1384                        Ok(Box::new(scan))
1385                    }
1386                } else {
1387                    Ok(Box::new(scan))
1388                }
1389            }
1390
1391            PhysicalPlan::Projection {
1392                input,
1393                exprs,
1394                aliases,
1395            } => {
1396                let input_op = self.build_operator(input, tables, columnar_cache)?;
1397                use arrow::datatypes::Field;
1398
1399                let input_schema = input_op.schema();
1400                let fields: Vec<Field> = exprs
1401                    .iter()
1402                    .enumerate()
1403                    .map(|(i, expr)| {
1404                        let data_type = expr.get_type(input_schema);
1405                        let field_name = if let Some(Some(alias)) = aliases.get(i) {
1406                            alias.clone()
1407                        } else {
1408                            format!("col_{}", i)
1409                        };
1410                        Field::new(&field_name, data_type, true)
1411                    })
1412                    .collect();
1413
1414                let output_schema = Arc::new(Schema::new(fields));
1415                Ok(Box::new(ProjectionOperator::new(
1416                    input_op,
1417                    output_schema,
1418                    exprs.clone(),
1419                )))
1420            }
1421
1422            PhysicalPlan::Limit {
1423                input,
1424                count,
1425                offset,
1426            } => {
1427                let input_op = self.build_operator(input, tables, columnar_cache)?;
1428                Ok(Box::new(LimitOperator::new(input_op, *count, *offset)))
1429            }
1430
1431            PhysicalPlan::SortMerge { input, order_by } => {
1432                let input_op = self.build_operator(input, tables, columnar_cache)?;
1433                Ok(Box::new(SortOperator::new(input_op, order_by.clone())))
1434            }
1435
1436            PhysicalPlan::HashAggregate {
1437                input,
1438                group_by,
1439                aggregates,
1440                mode,
1441            } => {
1442                // Track OLAP Workload
1443                self.workload_analyzer
1444                    .write()
1445                    .unwrap()
1446                    .record(crate::engine::workload_analyzer::QueryPattern::Aggregation);
1447
1448                let input_op = self.build_operator(input, tables, columnar_cache)?;
1449                let input_schema = input_op.schema();
1450                let mut fields = Vec::new();
1451
1452                for &idx in group_by {
1453                    fields.push(input_schema.field(idx).clone());
1454                }
1455
1456                for agg in aggregates {
1457                    use arrow::datatypes::DataType;
1458                    let data_type = match agg.function {
1459                        crate::sql::planner::AggregateFunction::Count => DataType::Int64,
1460                        crate::sql::planner::AggregateFunction::Sum
1461                        | crate::sql::planner::AggregateFunction::Avg
1462                        | crate::sql::planner::AggregateFunction::Min
1463                        | crate::sql::planner::AggregateFunction::Max => DataType::Float64,
1464                    };
1465                    let name = agg
1466                        .alias
1467                        .clone()
1468                        .unwrap_or_else(|| format!("agg_{:?}", agg.function));
1469                    fields.push(arrow::datatypes::Field::new(&name, data_type, true));
1470                }
1471
1472                let agg_schema = Arc::new(arrow::datatypes::Schema::new(fields));
1473                Ok(Box::new(
1474                    HashAggregateOperator::new(
1475                        input_op,
1476                        agg_schema,
1477                        group_by.clone(),
1478                        aggregates.clone(),
1479                        *mode,
1480                    )
1481                    .with_gpu(self.gpu_manager.clone()),
1482                ))
1483            }
1484
1485            PhysicalPlan::HashJoin {
1486                left,
1487                right,
1488                on,
1489                join_type,
1490            } => {
1491                // Track OLAP Workload
1492                self.workload_analyzer
1493                    .write()
1494                    .unwrap()
1495                    .record(crate::engine::workload_analyzer::QueryPattern::Join);
1496
1497                use arrow::datatypes::Field;
1498
1499                let left_op = self.build_operator(left, tables, columnar_cache)?;
1500                let right_op = self.build_operator(right, tables, columnar_cache)?;
1501
1502                // Build joined schema: left columns + right columns
1503                let left_schema = left_op.schema();
1504                let right_schema = right_op.schema();
1505
1506                let mut joined_fields: Vec<Field> = Vec::new();
1507                for field in left_schema.fields().iter() {
1508                    let mut f = field.as_ref().clone();
1509                    // In RIGHT JOIN, left side can be null
1510                    if matches!(join_type, crate::sql::planner::JoinType::Right) {
1511                        f = f.with_nullable(true);
1512                    }
1513                    joined_fields.push(f);
1514                }
1515                for field in right_schema.fields().iter() {
1516                    let mut f = field.as_ref().clone();
1517                    // In LEFT JOIN, right side can be null
1518                    if matches!(join_type, crate::sql::planner::JoinType::Left) {
1519                        f = f.with_nullable(true);
1520                    }
1521                    joined_fields.push(f);
1522                }
1523
1524                let joined_schema = Arc::new(Schema::new(joined_fields));
1525
1526                Ok(Box::new(HashJoinOperator::new(
1527                    left_op,
1528                    right_op,
1529                    joined_schema,
1530                    on.clone(),
1531                    *join_type,
1532                )))
1533            }
1534
1535            PhysicalPlan::Insert { .. } => {
1536                // INSERT should be handled in execute_physical_plan, not here
1537                unreachable!("INSERT should not reach build_operator")
1538            }
1539            PhysicalPlan::Update { .. } => {
1540                // UPDATE should be handled in execute_physical_plan, not here
1541                unreachable!("UPDATE should not reach build_operator")
1542            }
1543            PhysicalPlan::Delete { .. } => {
1544                // DELETE should be handled in execute_physical_plan, not here
1545                unreachable!("DELETE should not reach build_operator")
1546            }
1547            PhysicalPlan::DropTable { .. } => {
1548                // DROP TABLE should be handled in execute_physical_plan, not here
1549                unreachable!("DROP TABLE should not reach build_operator")
1550            }
1551            PhysicalPlan::CreateTable { .. } => {
1552                // CREATE TABLE should be handled in execute_physical_plan, not here
1553                unreachable!("CREATE TABLE should not reach build_operator")
1554            }
1555            PhysicalPlan::CreateIndex { .. } => {
1556                // CREATE INDEX should be handled in execute_physical_plan, not here
1557                unreachable!("CREATE INDEX should not reach build_operator")
1558            }
1559            PhysicalPlan::DropIndex { .. } => {
1560                // DROP INDEX should be handled in execute_physical_plan, not here
1561                unreachable!("DROP INDEX should not reach build_operator")
1562            }
1563            PhysicalPlan::AlterTable { .. } => {
1564                // ALTER TABLE should be handled in execute_physical_plan, not here
1565                unreachable!("ALTER TABLE should not reach build_operator")
1566            }
1567            PhysicalPlan::CreateFunction { .. } => {
1568                unreachable!("CREATE FUNCTION should not reach build_operator")
1569            }
1570            PhysicalPlan::CreateTrigger { .. } => {
1571                unreachable!("CREATE TRIGGER should not reach build_operator")
1572            }
1573            PhysicalPlan::CreateJob { .. } => {
1574                unreachable!("CREATE JOB should not reach build_operator")
1575            }
1576            PhysicalPlan::DropFunction { .. } => {
1577                unreachable!("DROP FUNCTION should not reach build_operator")
1578            }
1579            PhysicalPlan::DropTrigger { .. } => {
1580                unreachable!("DROP TRIGGER should not reach build_operator")
1581            }
1582            PhysicalPlan::DropJob { .. } => {
1583                unreachable!("DROP JOB should not reach build_operator")
1584            }
1585            PhysicalPlan::GridExchange { .. } => {
1586                // GridExchange 플레이스홀더 — DistributedExecutor가 런타임에 교체하므로
1587                // interface의 build_operator에 도달하면 설계 오류
1588                unreachable!(
1589                    "GridExchange placeholder must be replaced by DistributedExecutor before build_operator is called"
1590                )
1591            }
1592            PhysicalPlan::ShuffleWriter { .. } => {
1593                unreachable!(
1594                    "ShuffleWriter is a distributed operator and not supported in singleton Database"
1595                )
1596            }
1597        }
1598    }
1599
1600    /// Drain all batches from an operator.
1601    fn drain_operator(op: &mut dyn PhysicalOperator) -> DbxResult<Vec<RecordBatch>> {
1602        let mut results = Vec::new();
1603        while let Some(batch) = op.next()? {
1604            if batch.num_rows() > 0 {
1605                results.push(batch);
1606            }
1607        }
1608        Ok(results)
1609    }
1610}
1611
1612// ════════════════════════════════════════════
1613// DatabaseSql Trait Implementation
1614// ════════════════════════════════════════════
1615
1616impl crate::traits::DatabaseSql for Database {
1617    fn execute_sql(&self, sql: &str) -> DbxResult<Vec<arrow::record_batch::RecordBatch>> {
1618        // Reuse existing implementation
1619        Database::execute_sql(self, sql)
1620    }
1621
1622    fn register_table(&self, name: &str, batches: Vec<arrow::record_batch::RecordBatch>) {
1623        // Reuse existing implementation
1624        Database::register_table(self, name, batches)
1625    }
1626
1627    fn append_batch(&self, table: &str, batch: arrow::record_batch::RecordBatch) -> DbxResult<()> {
1628        // Reuse existing implementation
1629        Database::append_batch(self, table, batch);
1630        Ok(())
1631    }
1632}