1use crate::engine::Database;
4use crate::error::{DbxError, DbxResult};
5use crate::sql::executor::{
6 FilterOperator, HashAggregateOperator, HashJoinOperator, LimitOperator, PhysicalOperator,
7 ProjectionOperator, SortOperator, TableScanOperator,
8};
9use crate::sql::planner::{LogicalPlanner, PhysicalExpr, PhysicalPlan, PhysicalPlanner};
10use crate::storage::columnar_cache::ColumnarCache;
11use arrow::array::RecordBatch;
12use arrow::datatypes::Schema;
13use arrow::ipc::writer::StreamWriter;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17fn infer_schema_from_values(values: &[PhysicalExpr]) -> DbxResult<Schema> {
23 use crate::storage::columnar::ScalarValue;
24 use arrow::datatypes::{DataType, Field};
25
26 let fields: Vec<Field> = values
27 .iter()
28 .enumerate()
29 .map(|(i, expr)| {
30 let name = format!("col_{}", i);
31 let data_type = match expr {
32 PhysicalExpr::Literal(scalar) => match scalar {
33 ScalarValue::Int32(_) => DataType::Int32,
34 ScalarValue::Int64(_) => DataType::Int64,
35 ScalarValue::Float64(_) => DataType::Float64,
36 ScalarValue::Utf8(_) => DataType::Utf8,
37 ScalarValue::Boolean(_) => DataType::Boolean,
38 ScalarValue::Binary(_) => DataType::Binary,
39 ScalarValue::Null => DataType::Utf8, },
41 _ => DataType::Utf8, };
43 Field::new(name, data_type, true) })
45 .collect();
46
47 Ok(Schema::new(fields))
48}
49
50fn serialize_to_arrow_ipc(schema: &Schema, row_values: &[PhysicalExpr]) -> DbxResult<Vec<u8>> {
52 use crate::storage::columnar::ScalarValue;
53 use arrow::array::*;
54
55 let mut arrays: Vec<ArrayRef> = Vec::new();
57
58 for (field, expr) in schema.fields().iter().zip(row_values) {
59 let array: ArrayRef = match expr {
60 PhysicalExpr::Literal(scalar) => {
61 match (field.data_type(), scalar) {
62 (arrow::datatypes::DataType::Int32, ScalarValue::Int32(v)) => {
63 Arc::new(Int32Array::from(vec![*v]))
64 }
65 (arrow::datatypes::DataType::Int64, ScalarValue::Int64(v)) => {
66 Arc::new(Int64Array::from(vec![*v]))
67 }
68 (arrow::datatypes::DataType::Int64, ScalarValue::Int32(v)) => {
70 Arc::new(Int64Array::from(vec![*v as i64]))
71 }
72 (arrow::datatypes::DataType::Float64, ScalarValue::Float64(v)) => {
73 Arc::new(Float64Array::from(vec![*v]))
74 }
75 (arrow::datatypes::DataType::Utf8, ScalarValue::Utf8(s)) => {
76 Arc::new(StringArray::from(vec![s.as_str()]))
77 }
78 (arrow::datatypes::DataType::Boolean, ScalarValue::Boolean(b)) => {
79 Arc::new(BooleanArray::from(vec![*b]))
80 }
81 (arrow::datatypes::DataType::Binary, ScalarValue::Binary(b)) => {
82 Arc::new(BinaryArray::from(vec![b.as_slice()]))
83 }
84 (_, ScalarValue::Null) => {
85 match field.data_type() {
87 arrow::datatypes::DataType::Int32 => {
88 Arc::new(Int32Array::from(vec![None as Option<i32>]))
89 }
90 arrow::datatypes::DataType::Int64 => {
91 Arc::new(Int64Array::from(vec![None as Option<i64>]))
92 }
93 arrow::datatypes::DataType::Float64 => {
94 Arc::new(Float64Array::from(vec![None as Option<f64>]))
95 }
96 arrow::datatypes::DataType::Utf8 => {
97 Arc::new(StringArray::from(vec![None as Option<&str>]))
98 }
99 arrow::datatypes::DataType::Boolean => {
100 Arc::new(BooleanArray::from(vec![None as Option<bool>]))
101 }
102 arrow::datatypes::DataType::Binary => {
103 Arc::new(BinaryArray::from(vec![None as Option<&[u8]>]))
104 }
105 _ => {
106 return Err(DbxError::NotImplemented(format!(
107 "Unsupported null type: {:?}",
108 field.data_type()
109 )));
110 }
111 }
112 }
113 _ => {
114 return Err(DbxError::NotImplemented(format!(
115 "Type mismatch: field {:?} vs scalar {:?}",
116 field.data_type(),
117 scalar
118 )));
119 }
120 }
121 }
122 _ => {
123 return Err(DbxError::NotImplemented(
124 "Non-literal value in INSERT".to_string(),
125 ));
126 }
127 };
128 arrays.push(array);
129 }
130
131 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays)
133 .map_err(|e| DbxError::Storage(e.to_string()))?;
134
135 let mut buffer = Vec::new();
137 {
138 let mut writer = StreamWriter::try_new(&mut buffer, schema)
139 .map_err(|e| DbxError::Serialization(e.to_string()))?;
140 writer
141 .write(&batch)
142 .map_err(|e| DbxError::Serialization(e.to_string()))?;
143 writer
144 .finish()
145 .map_err(|e| DbxError::Serialization(e.to_string()))?;
146 }
147
148 Ok(buffer)
149}
150
151fn reconstruct_full_record(key: &[u8], value_bytes: &[u8]) -> Vec<u8> {
160 let key_json = if key.len() == 4 {
162 let val = i32::from_le_bytes(key.try_into().unwrap_or([0; 4]));
163 serde_json::Value::Number(val.into())
164 } else if key.len() == 8 {
165 let val = i64::from_le_bytes(key.try_into().unwrap_or([0; 8]));
166 serde_json::Value::Number(val.into())
167 } else {
168 let s = String::from_utf8_lossy(key).to_string();
170 serde_json::Value::String(s)
171 };
172
173 let mut values: Vec<serde_json::Value> =
175 serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
176 values.insert(0, key_json);
177 serde_json::to_vec(&values).unwrap_or_else(|_| value_bytes.to_vec())
178}
179
180fn json_record_to_batch(value_bytes: &[u8]) -> DbxResult<RecordBatch> {
182 use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray};
183 use arrow::datatypes::{DataType, Field, Schema};
184
185 let current_values: Vec<serde_json::Value> =
186 serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
187
188 let mut fields = Vec::new();
189 let mut columns: Vec<ArrayRef> = Vec::new();
190
191 for (i, val) in current_values.iter().enumerate() {
192 match val {
193 serde_json::Value::Number(n) => {
194 if n.is_i64() {
195 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
196 columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
197 } else if n.is_f64() {
198 fields.push(Field::new(format!("col_{}", i), DataType::Float64, true));
199 columns.push(Arc::new(Float64Array::from(vec![n.as_f64()])));
200 } else {
201 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
202 columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
203 }
204 }
205 serde_json::Value::String(s) => {
206 fields.push(Field::new(format!("col_{}", i), DataType::Utf8, true));
207 columns.push(Arc::new(StringArray::from(vec![Some(s.as_str())])));
208 }
209 serde_json::Value::Bool(b) => {
210 fields.push(Field::new(format!("col_{}", i), DataType::Boolean, true));
211 columns.push(Arc::new(BooleanArray::from(vec![Some(*b)])));
212 }
213 serde_json::Value::Null => {
214 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
215 columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
216 }
217 _ => {
218 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
219 columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
220 }
221 }
222 }
223
224 let schema = Arc::new(Schema::new(fields));
225 RecordBatch::try_new(schema, columns).map_err(DbxError::from)
226}
227
228fn evaluate_filter_for_record(
231 filter_expr: Option<&PhysicalExpr>,
232 value_bytes: &[u8],
233) -> DbxResult<bool> {
234 if let Some(expr) = filter_expr {
235 let batch = json_record_to_batch(value_bytes)?;
236
237 use crate::sql::executor::evaluate_expr;
238 let result = evaluate_expr(expr, &batch)?;
239
240 use arrow::array::BooleanArray;
241 let bool_array = result
242 .as_any()
243 .downcast_ref::<BooleanArray>()
244 .ok_or_else(|| DbxError::TypeMismatch {
245 expected: "BooleanArray".to_string(),
246 actual: format!("{:?}", result.data_type()),
247 })?;
248
249 Ok(bool_array.value(0))
250 } else {
251 Ok(true) }
253}
254
255impl Database {
256 pub fn register_table(&self, name: &str, batches: Vec<RecordBatch>) {
264 if let Some(first_batch) = batches.first() {
266 let schema = first_batch.schema();
267 let mut schemas = self.table_schemas.write().unwrap();
268 schemas.insert(name.to_string(), schema);
269 }
270
271 let mut tables = self.tables.write().unwrap();
273 tables.insert(name.to_string(), batches);
274 }
275
276 pub fn append_batch(&self, table: &str, batch: RecordBatch) {
278 let mut tables = self.tables.write().unwrap();
279 tables.entry(table.to_string()).or_default().push(batch);
280 }
281
282 pub fn execute_sql(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
298 let statements = self.sql_parser.parse(sql)?;
300 if statements.is_empty() {
301 return Ok(vec![]);
302 }
303
304 let planner = LogicalPlanner::new();
306 let logical_plan = planner.plan(&statements[0])?;
307
308 let optimized = self.sql_optimizer.optimize(logical_plan)?;
310
311 let physical_planner = PhysicalPlanner::new(Arc::clone(&self.table_schemas));
313 let physical_plan = physical_planner.plan(&optimized)?;
314
315 self.execute_physical_plan(&physical_plan)
317 }
318
319 fn execute_physical_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
321 match plan {
322 PhysicalPlan::Insert {
323 table,
324 columns: _,
325 values,
326 } => {
327 let mut rows_inserted = 0;
329
330 for row_values in values {
331 if row_values.is_empty() {
334 continue;
335 }
336
337 let key = match &row_values[0] {
339 PhysicalExpr::Literal(scalar) => {
340 use crate::storage::columnar::ScalarValue;
341 match scalar {
342 ScalarValue::Utf8(s) => s.as_bytes().to_vec(),
343 ScalarValue::Int32(i) => i.to_le_bytes().to_vec(),
344 ScalarValue::Int64(i) => i.to_le_bytes().to_vec(),
345 ScalarValue::Float64(f) => f.to_le_bytes().to_vec(),
346 ScalarValue::Boolean(b) => vec![if *b { 1 } else { 0 }],
347 _ => {
352 return Err(DbxError::NotImplemented(
353 "Non-literal key in INSERT".to_string(),
354 ));
355 }
356 }
357 }
358 _ => {
359 return Err(DbxError::NotImplemented(
360 "Non-literal key in INSERT".to_string(),
361 ));
362 }
363 };
364
365 let schema = {
367 let schemas = self.table_schemas.read().unwrap();
368 schemas.get(table.as_str()).cloned()
369 }
370 .unwrap_or_else(|| {
371 Arc::new(
373 infer_schema_from_values(row_values)
374 .expect("Failed to infer schema from values"),
375 )
376 });
377
378 let value_bytes = serialize_to_arrow_ipc(&schema, row_values)?;
380
381 self.insert(table, &key, &value_bytes)?;
383 rows_inserted += 1;
384 }
385
386 use arrow::array::{Int64Array, RecordBatch};
388 use arrow::datatypes::{DataType, Field, Schema};
389
390 let schema = Arc::new(Schema::new(vec![Field::new(
391 "rows_inserted",
392 DataType::Int64,
393 false,
394 )]));
395 let array = Int64Array::from(vec![rows_inserted as i64]);
396 let batch =
397 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
398
399 Ok(vec![batch])
400 }
401 PhysicalPlan::Update {
402 table,
403 assignments,
404 filter,
405 } => {
406 let all_records = self.scan(table)?;
408 let mut rows_updated = 0_i64;
409
410 let column_index_map = {
412 let schemas = self.table_schemas.read().unwrap();
413 schemas.get(table.as_str()).map(|schema| {
414 schema
415 .fields()
416 .iter()
417 .enumerate()
418 .map(|(i, field)| (field.name().clone(), i))
419 .collect::<std::collections::HashMap<String, usize>>()
420 })
421 };
422
423 for (key, value_bytes) in all_records {
424 let full_record = reconstruct_full_record(&key, &value_bytes);
427 let should_update = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
428
429 if should_update {
430 let mut current_values: Vec<serde_json::Value> =
431 serde_json::from_slice(&full_record).unwrap_or_else(|_| vec![]);
432
433 for (column_name, expr) in assignments.iter() {
435 let target_idx = column_index_map
436 .as_ref()
437 .and_then(|map| map.get(column_name).copied())
438 .unwrap_or_else(|| {
439 assignments
441 .iter()
442 .position(|(n, _)| n == column_name)
443 .unwrap_or(0)
444 });
445
446 if let PhysicalExpr::Literal(scalar) = expr {
447 use crate::storage::columnar::ScalarValue;
448 let new_value = match scalar {
449 ScalarValue::Utf8(s) => serde_json::Value::String(s.clone()),
450 ScalarValue::Int32(v) => serde_json::Value::Number((*v).into()),
451 ScalarValue::Int64(v) => serde_json::Value::Number((*v).into()),
452 ScalarValue::Float64(f) => serde_json::Number::from_f64(*f)
453 .map(serde_json::Value::Number)
454 .unwrap_or(serde_json::Value::Null),
455 ScalarValue::Boolean(b) => serde_json::Value::Bool(*b),
456 ScalarValue::Binary(b) => {
457 serde_json::Value::String(base64::Engine::encode(
459 &base64::engine::general_purpose::STANDARD,
460 b,
461 ))
462 }
463 ScalarValue::Null => serde_json::Value::Null,
464 };
465
466 if target_idx < current_values.len() {
467 current_values[target_idx] = new_value;
468 }
469 }
470 }
471
472 let storage_values = if current_values.len() > 1 {
474 ¤t_values[1..]
475 } else {
476 ¤t_values[..]
477 };
478 let new_value_bytes = serde_json::to_vec(storage_values)
479 .map_err(|e| DbxError::Serialization(e.to_string()))?;
480
481 self.insert(table, &key, &new_value_bytes)?;
482 rows_updated += 1;
483 }
484 }
485
486 use arrow::array::{Int64Array, RecordBatch};
488 use arrow::datatypes::{DataType, Field, Schema};
489
490 let schema = Arc::new(Schema::new(vec![Field::new(
491 "rows_updated",
492 DataType::Int64,
493 false,
494 )]));
495 let array = Int64Array::from(vec![rows_updated]);
496 let batch =
497 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
498
499 Ok(vec![batch])
500 }
501 PhysicalPlan::Delete { table, filter } => {
502 let all_records = self.scan(table)?;
506
507 let mut rows_deleted = 0_i64;
508
509 for (key, value_bytes) in all_records {
511 let full_record = reconstruct_full_record(&key, &value_bytes);
513 let should_delete = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
514
515 if should_delete {
516 self.delete(table, &key)?;
518 rows_deleted += 1;
519 }
520 }
521
522 use arrow::array::{Int64Array, RecordBatch};
524 use arrow::datatypes::{DataType, Field, Schema};
525
526 let schema = Arc::new(Schema::new(vec![Field::new(
527 "rows_deleted",
528 DataType::Int64,
529 false,
530 )]));
531 let array = Int64Array::from(vec![rows_deleted]);
532 let batch =
533 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
534
535 Ok(vec![batch])
536 }
537 PhysicalPlan::DropTable { table, if_exists } => {
538 use arrow::array::{Int64Array, RecordBatch};
540 use arrow::datatypes::{DataType, Field, Schema};
541
542 let exists = self.table_schemas.read().unwrap().contains_key(table);
544
545 if !exists && !if_exists {
546 return Err(DbxError::TableNotFound(table.clone()));
547 }
548
549 if exists {
550 self.table_schemas.write().unwrap().remove(table);
552
553 self.wos.delete_schema_metadata(table)?;
555
556 }
560
561 let schema = Arc::new(Schema::new(vec![Field::new(
563 "rows_affected",
564 DataType::Int64,
565 false,
566 )]));
567 let array = Int64Array::from(vec![1]);
568 let batch =
569 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
570
571 Ok(vec![batch])
572 }
573 PhysicalPlan::CreateTable {
574 table,
575 columns,
576 if_not_exists,
577 } => {
578 use arrow::array::{Int64Array, RecordBatch};
580 use arrow::datatypes::{DataType, Field, Schema};
581
582 let exists = self.table_schemas.read().unwrap().contains_key(table);
584
585 if exists && !if_not_exists {
586 return Err(DbxError::Schema(format!(
587 "Table '{}' already exists",
588 table
589 )));
590 }
591
592 if !exists {
593 let fields: Vec<Field> = columns
595 .iter()
596 .map(|(name, type_str)| {
597 let data_type = match type_str.to_uppercase().as_str() {
598 "INT" | "INTEGER" => DataType::Int64,
599 "TEXT" | "STRING" | "VARCHAR" => DataType::Utf8,
600 "FLOAT" | "DOUBLE" => DataType::Float64,
601 "BOOL" | "BOOLEAN" => DataType::Boolean,
602 _ => DataType::Utf8, };
604 Field::new(name, data_type, true)
605 })
606 .collect();
607
608 let schema = Arc::new(Schema::new(fields));
609
610 self.table_schemas
612 .write()
613 .unwrap()
614 .insert(table.clone(), schema.clone());
615
616 self.wos.save_schema_metadata(table, &schema)?;
618 }
619
620 let schema = Arc::new(Schema::new(vec![Field::new(
622 "rows_affected",
623 DataType::Int64,
624 false,
625 )]));
626 let array = Int64Array::from(vec![1]);
627 let batch =
628 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
629
630 Ok(vec![batch])
631 }
632 PhysicalPlan::CreateIndex {
633 table,
634 index_name,
635 columns,
636 if_not_exists,
637 } => {
638 use arrow::array::{Int64Array, RecordBatch};
639 use arrow::datatypes::{DataType, Field, Schema};
640
641 {
643 let schemas = self.table_schemas.read().unwrap();
644 if !schemas.contains_key(table.as_str()) {
645 return Err(DbxError::Schema(format!(
646 "Table '{}' does not exist",
647 table
648 )));
649 }
650 }
651
652 let column = columns.first().ok_or_else(|| {
653 DbxError::Schema("CREATE INDEX requires at least one column".to_string())
654 })?;
655
656 let exists = self.index.has_index(table, column);
657
658 if exists && !if_not_exists {
659 return Err(DbxError::IndexAlreadyExists {
660 table: table.clone(),
661 column: column.clone(),
662 });
663 }
664
665 if !exists {
666 self.index.create_index(table, column)?;
667
668 self.index_registry
670 .write()
671 .unwrap()
672 .insert(index_name.clone(), (table.clone(), column.clone()));
673
674 self.wos.save_index_metadata(index_name, table, column)?;
676 }
677
678 let schema = Arc::new(Schema::new(vec![Field::new(
679 "rows_affected",
680 DataType::Int64,
681 false,
682 )]));
683 let array = Int64Array::from(vec![1]);
684 let batch =
685 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
686
687 Ok(vec![batch])
688 }
689 PhysicalPlan::DropIndex {
690 table,
691 index_name,
692 if_exists,
693 } => {
694 use arrow::array::{Int64Array, RecordBatch};
695 use arrow::datatypes::{DataType, Field, Schema};
696
697 let resolved_column = {
699 let registry = self.index_registry.read().unwrap();
700 registry
701 .get(index_name.as_str())
702 .map(|(_, col)| col.clone())
703 };
704
705 let column = resolved_column.as_deref().unwrap_or(index_name.as_str());
707
708 let exists = self.index.has_index(table, column);
709
710 if !exists && !if_exists {
711 return Err(DbxError::IndexNotFound {
712 table: table.clone(),
713 column: column.to_string(),
714 });
715 }
716
717 if exists {
718 self.index.drop_index(table, column)?;
719
720 self.index_registry
722 .write()
723 .unwrap()
724 .remove(index_name.as_str());
725
726 self.wos.delete_index_metadata(index_name)?;
728 }
729
730 let schema = Arc::new(Schema::new(vec![Field::new(
731 "rows_affected",
732 DataType::Int64,
733 false,
734 )]));
735 let array = Int64Array::from(vec![1]);
736 let batch =
737 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
738
739 Ok(vec![batch])
740 }
741 PhysicalPlan::AlterTable { table, operation } => {
742 use crate::sql::planner::types::AlterTableOperation;
744 use arrow::array::{Int64Array, RecordBatch};
745 use arrow::datatypes::{DataType, Field, Schema};
746
747 match operation {
748 AlterTableOperation::AddColumn {
749 column_name,
750 data_type,
751 } => {
752 let mut schemas = self.table_schemas.write().unwrap();
754 let current_schema = schemas.get(table).ok_or_else(|| {
755 DbxError::Schema(format!("Table '{}' not found", table))
756 })?;
757
758 let arrow_type = match data_type.to_uppercase().as_str() {
760 "INT" | "INTEGER" => DataType::Int64,
761 "TEXT" | "VARCHAR" | "STRING" => DataType::Utf8,
762 "FLOAT" | "DOUBLE" | "REAL" => DataType::Float64,
763 "BOOL" | "BOOLEAN" => DataType::Boolean,
764 _ => DataType::Utf8, };
766
767 let new_field = Field::new(column_name, arrow_type, true);
769
770 let mut fields: Vec<Field> = current_schema
772 .fields()
773 .iter()
774 .map(|f| f.as_ref().clone())
775 .collect();
776 fields.push(new_field);
777 let new_schema = Arc::new(Schema::new(fields));
778
779 schemas.insert(table.clone(), new_schema.clone());
781
782 drop(schemas); self.wos.save_schema_metadata(table, &new_schema)?;
785 }
786 AlterTableOperation::DropColumn { column_name } => {
787 let mut schemas = self.table_schemas.write().unwrap();
789 let current_schema = schemas.get(table).ok_or_else(|| {
790 DbxError::Schema(format!("Table '{}' not found", table))
791 })?;
792
793 let fields: Vec<Field> = current_schema
795 .fields()
796 .iter()
797 .filter(|f| f.name() != column_name)
798 .map(|f| f.as_ref().clone())
799 .collect();
800
801 if fields.len() == current_schema.fields().len() {
803 return Err(DbxError::Schema(format!(
804 "Column '{}' not found in table '{}'",
805 column_name, table
806 )));
807 }
808
809 let new_schema = Arc::new(Schema::new(fields));
811
812 schemas.insert(table.clone(), new_schema.clone());
814
815 drop(schemas); self.wos.save_schema_metadata(table, &new_schema)?;
818 }
819 AlterTableOperation::RenameColumn { old_name, new_name } => {
820 let mut schemas = self.table_schemas.write().unwrap();
822 let current_schema = schemas.get(table).ok_or_else(|| {
823 DbxError::Schema(format!("Table '{}' not found", table))
824 })?;
825
826 let mut found = false;
828 let fields: Vec<Field> = current_schema
829 .fields()
830 .iter()
831 .map(|f| {
832 if f.name() == old_name {
833 found = true;
834 Field::new(new_name, f.data_type().clone(), f.is_nullable())
835 } else {
836 f.as_ref().clone()
837 }
838 })
839 .collect();
840
841 if !found {
843 return Err(DbxError::Schema(format!(
844 "Column '{}' not found in table '{}'",
845 old_name, table
846 )));
847 }
848
849 let new_schema = Arc::new(Schema::new(fields));
851
852 schemas.insert(table.clone(), new_schema.clone());
854
855 drop(schemas); self.wos.save_schema_metadata(table, &new_schema)?;
858 }
859 }
860
861 let schema = Arc::new(Schema::new(vec![Field::new(
863 "rows_affected",
864 DataType::Int64,
865 false,
866 )]));
867 let array = Int64Array::from(vec![1]);
868 let batch =
869 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
870
871 Ok(vec![batch])
872 }
873 _ => {
874 self.execute_select_plan(plan)
876 }
877 }
878 }
879
880 fn execute_select_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
882 for table in plan.tables() {
885 if !self.columnar_cache.has_table(&table) {
886 let _ = self.sync_columnar_cache(&table);
887 }
888 }
889
890 let tables = self.tables.read().unwrap();
891 let mut operator = self.build_operator(plan, &tables, &self.columnar_cache)?;
892 Self::drain_operator(&mut *operator)
893 }
894
895 fn build_operator(
897 &self,
898 plan: &PhysicalPlan,
899 tables: &HashMap<String, Vec<RecordBatch>>,
900 columnar_cache: &ColumnarCache,
901 ) -> DbxResult<Box<dyn PhysicalOperator>> {
902 match plan {
903 PhysicalPlan::TableScan {
904 table,
905 projection,
906 filter,
907 } => {
908 let mut filter_pushed_down = false;
909
910 let cached_results = if let Some(filter_expr) = filter {
912 let filter_expr_clone = filter_expr.clone();
913 let result = columnar_cache.get_batches_with_filter(
915 table,
916 if projection.is_empty() {
917 None
918 } else {
919 Some(projection)
920 },
921 move |batch| {
922 use crate::sql::executor::evaluate_expr;
923 use arrow::array::BooleanArray;
924
925 let array = evaluate_expr(&filter_expr_clone, batch)?;
926 let boolean_array = array
927 .as_any()
928 .downcast_ref::<BooleanArray>()
929 .ok_or_else(|| DbxError::TypeMismatch {
930 expected: "BooleanArray".to_string(),
931 actual: format!("{:?}", array.data_type()),
932 })?;
933 Ok(boolean_array.clone())
934 },
935 )?;
936 if result.is_some() {
937 filter_pushed_down = true;
938 }
939 result
940 } else {
941 columnar_cache.get_batches(
942 table,
943 if projection.is_empty() {
944 None
945 } else {
946 Some(projection)
947 },
948 )?
949 };
950
951 let (batches, schema, projection_to_use) =
952 if let Some(cached_batches) = cached_results {
953 if cached_batches.is_empty() {
954 let schema = {
957 let schemas = self.table_schemas.read().unwrap();
958 schemas
959 .get(table)
960 .or_else(|| {
961 let table_lower = table.to_lowercase();
962 schemas
963 .iter()
964 .find(|(k, _)| k.to_lowercase() == table_lower)
965 .map(|(_, v)| v)
966 })
967 .cloned()
968 }
969 .ok_or_else(|| DbxError::TableNotFound(table.clone()))?;
970 (vec![], schema, projection.clone())
971 } else {
972 let schema = cached_batches[0].schema();
973 (cached_batches, schema, vec![])
974 }
975 } else {
976 let cached_after_sync = columnar_cache.get_batches(
978 table,
979 if projection.is_empty() {
980 None
981 } else {
982 Some(projection)
983 },
984 )?;
985
986 if let Some(batches) = cached_after_sync {
987 if !batches.is_empty() {
988 let schema = batches[0].schema();
989 (batches, schema, vec![])
990 } else {
991 let schema = {
993 let schemas = self.table_schemas.read().unwrap();
994 schemas
995 .get(table)
996 .or_else(|| {
997 let table_lower = table.to_lowercase();
998 schemas
999 .iter()
1000 .find(|(k, _)| k.to_lowercase() == table_lower)
1001 .map(|(_, v)| v)
1002 })
1003 .cloned()
1004 }
1005 .ok_or_else(|| DbxError::TableNotFound(table.clone()))?;
1006 (vec![], schema, projection.clone())
1007 }
1008 } else {
1009 let batches_opt = tables.get(table).or_else(|| {
1011 let table_lower = table.to_lowercase();
1012 tables
1013 .iter()
1014 .find(|(k, _)| k.to_lowercase() == table_lower)
1015 .map(|(_, v)| v)
1016 });
1017
1018 if let Some(batches) = batches_opt {
1019 if batches.is_empty() {
1020 let schema = {
1021 let schemas = self.table_schemas.read().unwrap();
1022 schemas
1023 .get(table)
1024 .or_else(|| {
1025 let table_lower = table.to_lowercase();
1026 schemas
1027 .iter()
1028 .find(|(k, _)| k.to_lowercase() == table_lower)
1029 .map(|(_, v)| v)
1030 })
1031 .cloned()
1032 }
1033 .ok_or_else(|| DbxError::TableNotFound(table.clone()))?;
1034 (vec![], schema, projection.clone())
1035 } else {
1036 let schema = batches[0].schema();
1037 (batches.clone(), schema, projection.clone())
1038 }
1039 } else {
1040 return Err(DbxError::TableNotFound(table.clone()));
1042 }
1043 }
1044 };
1045
1046 let mut scan =
1047 TableScanOperator::new(table.clone(), Arc::clone(&schema), projection_to_use);
1048 scan.set_data(batches);
1049
1050 if let Some(filter_expr) = filter {
1052 if !filter_pushed_down {
1053 Ok(Box::new(FilterOperator::new(
1054 Box::new(scan),
1055 filter_expr.clone(),
1056 )))
1057 } else {
1058 Ok(Box::new(scan))
1060 }
1061 } else {
1062 Ok(Box::new(scan))
1063 }
1064 }
1065
1066 PhysicalPlan::Projection {
1067 input,
1068 exprs,
1069 aliases,
1070 } => {
1071 let input_op = self.build_operator(input, tables, columnar_cache)?;
1072 use arrow::datatypes::Field;
1073
1074 let input_schema = input_op.schema();
1075 let fields: Vec<Field> = exprs
1076 .iter()
1077 .enumerate()
1078 .map(|(i, expr)| {
1079 let data_type = expr.get_type(input_schema);
1080 let field_name = if let Some(Some(alias)) = aliases.get(i) {
1081 alias.clone()
1082 } else {
1083 format!("col_{}", i)
1084 };
1085 Field::new(&field_name, data_type, true)
1086 })
1087 .collect();
1088
1089 let output_schema = Arc::new(Schema::new(fields));
1090 Ok(Box::new(ProjectionOperator::new(
1091 input_op,
1092 output_schema,
1093 exprs.clone(),
1094 )))
1095 }
1096
1097 PhysicalPlan::Limit {
1098 input,
1099 count,
1100 offset,
1101 } => {
1102 let input_op = self.build_operator(input, tables, columnar_cache)?;
1103 Ok(Box::new(LimitOperator::new(input_op, *count, *offset)))
1104 }
1105
1106 PhysicalPlan::SortMerge { input, order_by } => {
1107 let input_op = self.build_operator(input, tables, columnar_cache)?;
1108 Ok(Box::new(SortOperator::new(input_op, order_by.clone())))
1109 }
1110
1111 PhysicalPlan::HashAggregate {
1112 input,
1113 group_by,
1114 aggregates,
1115 } => {
1116 let input_op = self.build_operator(input, tables, columnar_cache)?;
1117 let input_schema = input_op.schema();
1118 let mut fields = Vec::new();
1119
1120 for &idx in group_by {
1121 fields.push(input_schema.field(idx).clone());
1122 }
1123
1124 for agg in aggregates {
1125 use arrow::datatypes::DataType;
1126 let data_type =
1127 if matches!(agg.function, crate::sql::planner::AggregateFunction::Count) {
1128 DataType::Int64
1129 } else {
1130 input_schema.field(agg.input).data_type().clone()
1131 };
1132 let name = agg
1133 .alias
1134 .clone()
1135 .unwrap_or_else(|| format!("agg_{:?}", agg.function));
1136 fields.push(arrow::datatypes::Field::new(&name, data_type, true));
1137 }
1138
1139 let agg_schema = Arc::new(arrow::datatypes::Schema::new(fields));
1140 Ok(Box::new(
1141 HashAggregateOperator::new(
1142 input_op,
1143 agg_schema,
1144 group_by.clone(),
1145 aggregates.clone(),
1146 )
1147 .with_gpu(self.gpu_manager.clone()),
1148 ))
1149 }
1150
1151 PhysicalPlan::HashJoin {
1152 left,
1153 right,
1154 on,
1155 join_type,
1156 } => {
1157 use arrow::datatypes::Field;
1158
1159 let left_op = self.build_operator(left, tables, columnar_cache)?;
1160 let right_op = self.build_operator(right, tables, columnar_cache)?;
1161
1162 let left_schema = left_op.schema();
1164 let right_schema = right_op.schema();
1165
1166 let mut joined_fields: Vec<Field> = Vec::new();
1167 for field in left_schema.fields().iter() {
1168 let mut f = field.as_ref().clone();
1169 if matches!(join_type, crate::sql::planner::JoinType::Right) {
1171 f = f.with_nullable(true);
1172 }
1173 joined_fields.push(f);
1174 }
1175 for field in right_schema.fields().iter() {
1176 let mut f = field.as_ref().clone();
1177 if matches!(join_type, crate::sql::planner::JoinType::Left) {
1179 f = f.with_nullable(true);
1180 }
1181 joined_fields.push(f);
1182 }
1183
1184 let joined_schema = Arc::new(Schema::new(joined_fields));
1185
1186 Ok(Box::new(HashJoinOperator::new(
1187 left_op,
1188 right_op,
1189 joined_schema,
1190 on.clone(),
1191 *join_type,
1192 )))
1193 }
1194
1195 PhysicalPlan::Insert { .. } => {
1196 unreachable!("INSERT should not reach build_operator")
1198 }
1199 PhysicalPlan::Update { .. } => {
1200 unreachable!("UPDATE should not reach build_operator")
1202 }
1203 PhysicalPlan::Delete { .. } => {
1204 unreachable!("DELETE should not reach build_operator")
1206 }
1207 PhysicalPlan::DropTable { .. } => {
1208 unreachable!("DROP TABLE should not reach build_operator")
1210 }
1211 PhysicalPlan::CreateTable { .. } => {
1212 unreachable!("CREATE TABLE should not reach build_operator")
1214 }
1215 PhysicalPlan::CreateIndex { .. } => {
1216 unreachable!("CREATE INDEX should not reach build_operator")
1218 }
1219 PhysicalPlan::DropIndex { .. } => {
1220 unreachable!("DROP INDEX should not reach build_operator")
1222 }
1223 PhysicalPlan::AlterTable { .. } => {
1224 unreachable!("ALTER TABLE should not reach build_operator")
1226 }
1227 PhysicalPlan::CreateFunction { .. } => {
1228 unreachable!("CREATE FUNCTION should not reach build_operator")
1229 }
1230 PhysicalPlan::CreateTrigger { .. } => {
1231 unreachable!("CREATE TRIGGER should not reach build_operator")
1232 }
1233 PhysicalPlan::CreateJob { .. } => {
1234 unreachable!("CREATE JOB should not reach build_operator")
1235 }
1236 PhysicalPlan::DropFunction { .. } => {
1237 unreachable!("DROP FUNCTION should not reach build_operator")
1238 }
1239 PhysicalPlan::DropTrigger { .. } => {
1240 unreachable!("DROP TRIGGER should not reach build_operator")
1241 }
1242 PhysicalPlan::DropJob { .. } => {
1243 unreachable!("DROP JOB should not reach build_operator")
1244 }
1245 }
1246 }
1247
1248 fn drain_operator(op: &mut dyn PhysicalOperator) -> DbxResult<Vec<RecordBatch>> {
1250 let mut results = Vec::new();
1251 while let Some(batch) = op.next()? {
1252 if batch.num_rows() > 0 {
1253 results.push(batch);
1254 }
1255 }
1256 Ok(results)
1257 }
1258}
1259
1260impl crate::traits::DatabaseSql for Database {
1265 fn execute_sql(&self, sql: &str) -> DbxResult<Vec<arrow::record_batch::RecordBatch>> {
1266 Database::execute_sql(self, sql)
1268 }
1269
1270 fn register_table(&self, name: &str, batches: Vec<arrow::record_batch::RecordBatch>) {
1271 Database::register_table(self, name, batches)
1273 }
1274
1275 fn append_batch(&self, table: &str, batch: arrow::record_batch::RecordBatch) -> DbxResult<()> {
1276 Database::append_batch(self, table, batch);
1278 Ok(())
1279 }
1280}