1use crate::engine::Database;
4use crate::error::{DbxError, DbxResult};
5use crate::sql::executor::{
6 FilterOperator, HashAggregateOperator, HashJoinOperator, LimitOperator, PhysicalOperator,
7 ProjectionOperator, SortOperator, TableScanOperator,
8};
9use crate::sql::planner::{LogicalPlanner, PhysicalExpr, PhysicalPlan, PhysicalPlanner};
10use crate::storage::columnar_cache::ColumnarCache;
11use arrow::array::RecordBatch;
12use arrow::datatypes::Schema;
13use arrow::ipc::writer::StreamWriter;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17fn infer_schema_from_values(values: &[PhysicalExpr]) -> DbxResult<Schema> {
23 use crate::storage::columnar::ScalarValue;
24 use arrow::datatypes::{DataType, Field};
25
26 let fields: Vec<Field> = values
27 .iter()
28 .enumerate()
29 .map(|(i, expr)| {
30 let name = format!("col_{}", i);
31 let data_type = match expr {
32 PhysicalExpr::Literal(scalar) => match scalar {
33 ScalarValue::Int32(_) => DataType::Int32,
34 ScalarValue::Int64(_) => DataType::Int64,
35 ScalarValue::Float64(_) => DataType::Float64,
36 ScalarValue::Utf8(_) => DataType::Utf8,
37 ScalarValue::Boolean(_) => DataType::Boolean,
38 ScalarValue::Binary(_) => DataType::Binary,
39 ScalarValue::Null => DataType::Utf8, },
41 _ => DataType::Utf8, };
43 Field::new(name, data_type, true) })
45 .collect();
46
47 Ok(Schema::new(fields))
48}
49
50fn serialize_to_arrow_ipc(schema: &Schema, row_values: &[PhysicalExpr]) -> DbxResult<Vec<u8>> {
52 use crate::storage::columnar::ScalarValue;
53 use arrow::array::*;
54
55 let mut arrays: Vec<ArrayRef> = Vec::new();
57
58 for (field, expr) in schema.fields().iter().zip(row_values) {
59 let array: ArrayRef = match expr {
60 PhysicalExpr::Literal(scalar) => {
61 match (field.data_type(), scalar) {
62 (arrow::datatypes::DataType::Int32, ScalarValue::Int32(v)) => {
63 Arc::new(Int32Array::from(vec![*v]))
64 }
65 (arrow::datatypes::DataType::Int64, ScalarValue::Int64(v)) => {
66 Arc::new(Int64Array::from(vec![*v]))
67 }
68 (arrow::datatypes::DataType::Int64, ScalarValue::Int32(v)) => {
70 Arc::new(Int64Array::from(vec![*v as i64]))
71 }
72 (arrow::datatypes::DataType::Float64, ScalarValue::Float64(v)) => {
73 Arc::new(Float64Array::from(vec![*v]))
74 }
75 (arrow::datatypes::DataType::Utf8, ScalarValue::Utf8(s)) => {
76 Arc::new(StringArray::from(vec![s.as_str()]))
77 }
78 (arrow::datatypes::DataType::Boolean, ScalarValue::Boolean(b)) => {
79 Arc::new(BooleanArray::from(vec![*b]))
80 }
81 (arrow::datatypes::DataType::Binary, ScalarValue::Binary(b)) => {
82 Arc::new(BinaryArray::from(vec![b.as_slice()]))
83 }
84 (_, ScalarValue::Null) => {
85 match field.data_type() {
87 arrow::datatypes::DataType::Int32 => {
88 Arc::new(Int32Array::from(vec![None as Option<i32>]))
89 }
90 arrow::datatypes::DataType::Int64 => {
91 Arc::new(Int64Array::from(vec![None as Option<i64>]))
92 }
93 arrow::datatypes::DataType::Float64 => {
94 Arc::new(Float64Array::from(vec![None as Option<f64>]))
95 }
96 arrow::datatypes::DataType::Utf8 => {
97 Arc::new(StringArray::from(vec![None as Option<&str>]))
98 }
99 arrow::datatypes::DataType::Boolean => {
100 Arc::new(BooleanArray::from(vec![None as Option<bool>]))
101 }
102 arrow::datatypes::DataType::Binary => {
103 Arc::new(BinaryArray::from(vec![None as Option<&[u8]>]))
104 }
105 _ => {
106 return Err(DbxError::NotImplemented(format!(
107 "Unsupported null type: {:?}",
108 field.data_type()
109 )));
110 }
111 }
112 }
113 _ => {
114 return Err(DbxError::NotImplemented(format!(
115 "Type mismatch: field {:?} vs scalar {:?}",
116 field.data_type(),
117 scalar
118 )));
119 }
120 }
121 }
122 _ => {
123 return Err(DbxError::NotImplemented(
124 "Non-literal value in INSERT".to_string(),
125 ));
126 }
127 };
128 arrays.push(array);
129 }
130
131 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays)
133 .map_err(|e| DbxError::Storage(e.to_string()))?;
134
135 let mut buffer = Vec::new();
137 {
138 let mut writer = StreamWriter::try_new(&mut buffer, schema)
139 .map_err(|e| DbxError::Serialization(e.to_string()))?;
140 writer
141 .write(&batch)
142 .map_err(|e| DbxError::Serialization(e.to_string()))?;
143 writer
144 .finish()
145 .map_err(|e| DbxError::Serialization(e.to_string()))?;
146 }
147
148 Ok(buffer)
149}
150
151fn reconstruct_full_record(key: &[u8], value_bytes: &[u8]) -> Vec<u8> {
160 let key_json = if key.len() == 4 {
162 let val = i32::from_le_bytes(key.try_into().unwrap_or([0; 4]));
163 serde_json::Value::Number(val.into())
164 } else if key.len() == 8 {
165 let val = i64::from_le_bytes(key.try_into().unwrap_or([0; 8]));
166 serde_json::Value::Number(val.into())
167 } else {
168 let s = String::from_utf8_lossy(key).to_string();
170 serde_json::Value::String(s)
171 };
172
173 let mut values: Vec<serde_json::Value> =
175 serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
176 values.insert(0, key_json);
177 serde_json::to_vec(&values).unwrap_or_else(|_| value_bytes.to_vec())
178}
179
180fn json_record_to_batch(value_bytes: &[u8]) -> DbxResult<RecordBatch> {
182 use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray};
183 use arrow::datatypes::{DataType, Field, Schema};
184
185 let current_values: Vec<serde_json::Value> =
186 serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
187
188 let mut fields = Vec::new();
189 let mut columns: Vec<ArrayRef> = Vec::new();
190
191 for (i, val) in current_values.iter().enumerate() {
192 match val {
193 serde_json::Value::Number(n) => {
194 if n.is_i64() {
195 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
196 columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
197 } else if n.is_f64() {
198 fields.push(Field::new(format!("col_{}", i), DataType::Float64, true));
199 columns.push(Arc::new(Float64Array::from(vec![n.as_f64()])));
200 } else {
201 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
202 columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
203 }
204 }
205 serde_json::Value::String(s) => {
206 fields.push(Field::new(format!("col_{}", i), DataType::Utf8, true));
207 columns.push(Arc::new(StringArray::from(vec![Some(s.as_str())])));
208 }
209 serde_json::Value::Bool(b) => {
210 fields.push(Field::new(format!("col_{}", i), DataType::Boolean, true));
211 columns.push(Arc::new(BooleanArray::from(vec![Some(*b)])));
212 }
213 serde_json::Value::Null => {
214 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
215 columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
216 }
217 _ => {
218 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
219 columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
220 }
221 }
222 }
223
224 let schema = Arc::new(Schema::new(fields));
225 RecordBatch::try_new(schema, columns).map_err(DbxError::from)
226}
227
228fn evaluate_filter_for_record(
231 filter_expr: Option<&PhysicalExpr>,
232 value_bytes: &[u8],
233) -> DbxResult<bool> {
234 if let Some(expr) = filter_expr {
235 let batch = json_record_to_batch(value_bytes)?;
236
237 use crate::sql::executor::evaluate_expr;
238 let result = evaluate_expr(expr, &batch)?;
239
240 use arrow::array::BooleanArray;
241 let bool_array = result
242 .as_any()
243 .downcast_ref::<BooleanArray>()
244 .ok_or_else(|| DbxError::TypeMismatch {
245 expected: "BooleanArray".to_string(),
246 actual: format!("{:?}", result.data_type()),
247 })?;
248
249 Ok(bool_array.value(0))
250 } else {
251 Ok(true) }
253}
254
255impl Database {
256 pub(crate) fn get_tables_to_scan(
263 &self,
264 table: &str,
265 _filter: Option<&PhysicalExpr>,
266 ) -> Vec<String> {
267 let maps = self.partition_maps.read().unwrap();
268 if let Some(map) = maps.get(table) {
269 map.all_partitions()
272 } else {
273 self.sharding_router
278 .all_shards()
279 .iter()
280 .map(|shard| format!("{}__shard_{}", table, shard.id))
281 .collect()
282 }
283 }
284
285 pub fn register_table(&self, name: &str, batches: Vec<RecordBatch>) {
293 if let Some(first_batch) = batches.first() {
295 let schema = first_batch.schema();
296 let mut schemas = self.table_schemas.write().unwrap();
297 schemas.insert(name.to_string(), schema);
298 }
299
300 let mut tables = self.tables.write().unwrap();
302 tables.insert(name.to_string(), batches);
303 }
304
305 pub fn append_batch(&self, table: &str, batch: RecordBatch) {
307 let mut tables = self.tables.write().unwrap();
308 tables.entry(table.to_string()).or_default().push(batch);
309 }
310
311 pub fn execute_sql(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
327 let t0 = std::time::Instant::now();
328
329 let sql_trimmed = sql.trim();
331 let sql_upper = sql_trimmed.to_uppercase();
332
333 if sql_upper.starts_with("CREATE VIEW") {
334 return self.handle_create_view(sql_trimmed);
335 }
336 if sql_upper.starts_with("DROP VIEW") {
337 return self.handle_drop_view(sql_trimmed);
338 }
339
340 if sql_upper.starts_with("CREATE MATERIALIZED VIEW") {
342 return self.handle_create_materialized_view(sql_trimmed);
343 }
344 if sql_upper.starts_with("DROP MATERIALIZED VIEW") {
345 return self.handle_drop_materialized_view(sql_trimmed);
346 }
347 if sql_upper.starts_with("REFRESH MATERIALIZED VIEW") {
348 return self.handle_refresh_materialized_view(sql_trimmed);
349 }
350
351 if sql_upper.starts_with("SELECT")
353 && let Some(cached) = self.try_matview_cache(sql_trimmed)
354 {
355 return Ok(cached);
356 }
357
358 let expanded = self.view_registry.expand(sql_trimmed);
360 let sql = expanded.as_str();
361
362 let statements = self.sql_parser.parse(sql)?;
364 if statements.is_empty() {
365 return Ok(vec![]);
366 }
367
368 let planner = LogicalPlanner::new();
370 let logical_plan = planner.plan(&statements[0])?;
371
372 let optimized = self.sql_optimizer.optimize(logical_plan)?;
374
375 let physical_planner = PhysicalPlanner::new(Arc::clone(&self.table_schemas));
377 let physical_plan = physical_planner.plan(&optimized)?;
378
379 let result = self.execute_physical_plan(&physical_plan);
381
382 let elapsed_us = t0.elapsed().as_micros() as u64;
384 self.metrics.inc_sql_queries();
385 self.metrics.query_latency_us.observe(elapsed_us);
386
387 result
388 }
389
390 fn handle_create_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
392 let upper = sql.to_uppercase();
394 let after_view = &sql[upper.find("VIEW").unwrap() + 4..]
395 .trim_start()
396 .to_owned();
397 let as_pos = after_view
398 .to_uppercase()
399 .find(" AS ")
400 .ok_or_else(|| DbxError::SqlParse {
401 message: "CREATE VIEW requires AS".to_string(),
402 sql: sql.to_string(),
403 })?;
404 let view_name = after_view[..as_pos].trim();
405 let view_sql = after_view[as_pos + 4..].trim();
406 self.view_registry.create(view_name, view_sql)?;
407 self.one_row_affected_batch()
408 }
409
410 fn handle_drop_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
412 let upper = sql.to_uppercase();
414 let after_view = sql[upper.find("VIEW").unwrap() + 4..]
415 .trim_start()
416 .to_owned();
417 let (if_exists, name_part) = if after_view.to_uppercase().starts_with("IF EXISTS") {
418 (true, after_view[9..].trim_start().to_string())
419 } else {
420 (false, after_view.clone())
421 };
422 let view_name = name_part.trim();
423
424 if if_exists && !self.view_registry.exists(view_name) {
425 return self.one_row_affected_batch();
426 }
427
428 self.view_registry.drop(view_name)?;
429 self.one_row_affected_batch()
430 }
431
432 fn handle_create_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
438 let upper = sql.to_uppercase();
439
440 let after_view = sql[upper.find("VIEW").unwrap() + 4..]
442 .trim_start()
443 .to_owned();
444 let after_upper = after_view.to_uppercase();
445
446 let (refresh_interval_secs, body) = if let Some(re_pos) = after_upper.find("REFRESH EVERY")
448 {
449 let name_part = after_view[..re_pos].trim().to_string();
451 let rest = after_view[re_pos + 13..].trim_start().to_string();
452 let as_pos = rest
453 .to_uppercase()
454 .find(" AS ")
455 .ok_or_else(|| DbxError::SqlParse {
456 message: "CREATE MATERIALIZED VIEW ... REFRESH EVERY <n> AS <sql>".to_string(),
457 sql: sql.to_string(),
458 })?;
459 let interval_str = rest[..as_pos].trim();
460 let interval_secs: u64 = interval_str.parse().map_err(|_| DbxError::SqlParse {
461 message: format!(
462 "REFRESH EVERY requires integer seconds, got '{}'",
463 interval_str
464 ),
465 sql: sql.to_string(),
466 })?;
467 let view_sql = rest[as_pos + 4..].trim().to_string();
468 (Some(interval_secs), (name_part, view_sql))
469 } else {
470 let as_pos = after_upper.find(" AS ").ok_or_else(|| DbxError::SqlParse {
471 message: "CREATE MATERIALIZED VIEW requires AS".to_string(),
472 sql: sql.to_string(),
473 })?;
474 let name = after_view[..as_pos].trim().to_string();
475 let view_sql = after_view[as_pos + 4..].trim().to_string();
476 (None, (name, view_sql))
477 };
478
479 self.mat_view_registry
480 .create(&body.0, &body.1, refresh_interval_secs)?;
481 self.one_row_affected_batch()
482 }
483
484 fn handle_drop_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
486 let upper = sql.to_uppercase();
487 let name = sql[upper.find("VIEW").unwrap() + 4..].trim();
488 self.mat_view_registry.remove(name)?;
489 self.one_row_affected_batch()
490 }
491
492 fn handle_refresh_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
494 let upper = sql.to_uppercase();
495 let name = sql[upper.find("VIEW").unwrap() + 4..].trim().to_lowercase();
496 let view_sql = self.mat_view_registry.get_sql(&name).ok_or_else(|| {
497 DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
498 })?;
499 let batches = self.execute_sql(&view_sql)?;
501 self.mat_view_registry.set_cache(&name, batches)?;
502 self.one_row_affected_batch()
503 }
504
505 fn try_matview_cache(&self, sql: &str) -> Option<Vec<RecordBatch>> {
510 let upper = sql.to_uppercase();
511 let from_pos = upper.find(" FROM ")?;
512 let after_from = sql[from_pos + 6..].trim();
513 let name = after_from
515 .split(|c: char| c.is_whitespace() || c == ';' || c == ')')
516 .next()?;
517 if self.mat_view_registry.is_fresh(name) {
518 self.mat_view_registry.get_cache(name)
519 } else {
520 None
521 }
522 }
523
524 fn one_row_affected_batch(&self) -> DbxResult<Vec<RecordBatch>> {
526 use arrow::array::Int64Array;
527 use arrow::datatypes::{DataType, Field, Schema};
528 let schema = Arc::new(Schema::new(vec![Field::new(
529 "rows_affected",
530 DataType::Int64,
531 false,
532 )]));
533 let array = Int64Array::from(vec![1_i64]);
534 let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
535 Ok(vec![batch])
536 }
537
538 fn execute_physical_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
540 match plan {
541 PhysicalPlan::Insert {
542 table,
543 columns: _,
544 values,
545 } => {
546 self.workload_analyzer
548 .write()
549 .unwrap()
550 .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
551
552 let mut rows_inserted = 0;
554
555 for row_values in values {
556 if row_values.is_empty() {
559 continue;
560 }
561
562 let key = match &row_values[0] {
564 PhysicalExpr::Literal(scalar) => {
565 use crate::storage::columnar::ScalarValue;
566 match scalar {
567 ScalarValue::Utf8(s) => s.as_bytes().to_vec(),
568 ScalarValue::Int32(i) => i.to_le_bytes().to_vec(),
569 ScalarValue::Int64(i) => i.to_le_bytes().to_vec(),
570 ScalarValue::Float64(f) => f.to_le_bytes().to_vec(),
571 ScalarValue::Boolean(b) => vec![if *b { 1 } else { 0 }],
572 _ => {
577 return Err(DbxError::NotImplemented(
578 "Non-literal key in INSERT".to_string(),
579 ));
580 }
581 }
582 }
583 _ => {
584 return Err(DbxError::NotImplemented(
585 "Non-literal key in INSERT".to_string(),
586 ));
587 }
588 };
589
590 let schema = {
592 let schemas = self.table_schemas.read().unwrap();
593 schemas.get(table.as_str()).cloned()
594 }
595 .unwrap_or_else(|| {
596 Arc::new(
598 infer_schema_from_values(row_values)
599 .expect("Failed to infer schema from values"),
600 )
601 });
602
603 let value_bytes = serialize_to_arrow_ipc(&schema, row_values)?;
605
606 self.scatter_gather.scatter_write(
610 &key,
611 &value_bytes,
612 |shard_id, sub_key, sub_val| {
613 let shard_table = format!("{}__shard_{}", table, shard_id);
614 let _ = self.insert(&shard_table, sub_key, sub_val);
616 },
617 );
618
619 rows_inserted += 1;
620 }
621
622 use arrow::array::{Int64Array, RecordBatch};
624 use arrow::datatypes::{DataType, Field, Schema};
625
626 let schema = Arc::new(Schema::new(vec![Field::new(
627 "rows_inserted",
628 DataType::Int64,
629 false,
630 )]));
631 let array = Int64Array::from(vec![rows_inserted as i64]);
632 let batch =
633 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
634
635 Ok(vec![batch])
636 }
637 PhysicalPlan::Update {
638 table,
639 assignments,
640 filter,
641 } => {
642 self.workload_analyzer
644 .write()
645 .unwrap()
646 .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
647
648 let target_tables = self.get_tables_to_scan(table, filter.as_ref());
650 let mut all_records = Vec::new();
651 for target_table in &target_tables {
652 all_records.extend(self.scan(target_table)?);
653 }
654
655 let mut rows_updated = 0_i64;
656
657 let column_index_map = {
659 let schemas = self.table_schemas.read().unwrap();
660 schemas.get(table.as_str()).map(|schema| {
661 schema
662 .fields()
663 .iter()
664 .enumerate()
665 .map(|(i, field)| (field.name().clone(), i))
666 .collect::<std::collections::HashMap<String, usize>>()
667 })
668 };
669
670 for (key, value_bytes) in all_records {
671 let full_record = reconstruct_full_record(&key, &value_bytes);
674 let should_update = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
675
676 if should_update {
677 let mut current_values: Vec<serde_json::Value> =
678 serde_json::from_slice(&full_record).unwrap_or_else(|_| vec![]);
679
680 for (column_name, expr) in assignments.iter() {
682 let target_idx = column_index_map
683 .as_ref()
684 .and_then(|map| map.get(column_name).copied())
685 .unwrap_or_else(|| {
686 assignments
688 .iter()
689 .position(|(n, _)| n == column_name)
690 .unwrap_or(0)
691 });
692
693 if let PhysicalExpr::Literal(scalar) = expr {
694 use crate::storage::columnar::ScalarValue;
695 let new_value = match scalar {
696 ScalarValue::Utf8(s) => serde_json::Value::String(s.clone()),
697 ScalarValue::Int32(v) => serde_json::Value::Number((*v).into()),
698 ScalarValue::Int64(v) => serde_json::Value::Number((*v).into()),
699 ScalarValue::Float64(f) => serde_json::Number::from_f64(*f)
700 .map(serde_json::Value::Number)
701 .unwrap_or(serde_json::Value::Null),
702 ScalarValue::Boolean(b) => serde_json::Value::Bool(*b),
703 ScalarValue::Binary(b) => {
704 serde_json::Value::String(base64::Engine::encode(
706 &base64::engine::general_purpose::STANDARD,
707 b,
708 ))
709 }
710 ScalarValue::Null => serde_json::Value::Null,
711 };
712
713 if target_idx < current_values.len() {
714 current_values[target_idx] = new_value;
715 }
716 }
717 }
718
719 let storage_values = if current_values.len() > 1 {
721 ¤t_values[1..]
722 } else {
723 ¤t_values[..]
724 };
725 let new_value_bytes = serde_json::to_vec(storage_values)
726 .map_err(|e| DbxError::Serialization(e.to_string()))?;
727
728 self.insert(table, &key, &new_value_bytes)?;
729 rows_updated += 1;
730 }
731 }
732
733 use arrow::array::{Int64Array, RecordBatch};
735 use arrow::datatypes::{DataType, Field, Schema};
736
737 let schema = Arc::new(Schema::new(vec![Field::new(
738 "rows_updated",
739 DataType::Int64,
740 false,
741 )]));
742 let array = Int64Array::from(vec![rows_updated]);
743 let batch =
744 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
745
746 Ok(vec![batch])
747 }
748 PhysicalPlan::Delete { table, filter } => {
749 self.workload_analyzer
751 .write()
752 .unwrap()
753 .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
754
755 let target_tables = self.get_tables_to_scan(table, filter.as_ref());
759 let mut all_records = Vec::new();
760 for target_table in &target_tables {
761 all_records.extend(self.scan(target_table)?);
762 }
763
764 let mut rows_deleted = 0_i64;
765
766 for (key, value_bytes) in all_records {
768 let full_record = reconstruct_full_record(&key, &value_bytes);
770 let should_delete = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
771
772 if should_delete {
773 self.delete(table, &key)?;
775 rows_deleted += 1;
776 }
777 }
778
779 use arrow::array::{Int64Array, RecordBatch};
781 use arrow::datatypes::{DataType, Field, Schema};
782
783 let schema = Arc::new(Schema::new(vec![Field::new(
784 "rows_deleted",
785 DataType::Int64,
786 false,
787 )]));
788 let array = Int64Array::from(vec![rows_deleted]);
789 let batch =
790 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
791
792 Ok(vec![batch])
793 }
794 PhysicalPlan::DropTable { table, if_exists } => {
795 use arrow::array::{Int64Array, RecordBatch};
797 use arrow::datatypes::{DataType, Field, Schema};
798
799 let exists = self.table_schemas.read().unwrap().contains_key(table);
801
802 if !exists && !if_exists {
803 return Err(DbxError::TableNotFound(table.clone()));
804 }
805
806 if exists {
807 self.table_schemas.write().unwrap().remove(table);
809
810 self.wos_for_metadata().delete_schema_metadata(table)?;
812
813 }
817
818 let schema = Arc::new(Schema::new(vec![Field::new(
820 "rows_affected",
821 DataType::Int64,
822 false,
823 )]));
824 let array = Int64Array::from(vec![1]);
825 let batch =
826 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
827
828 Ok(vec![batch])
829 }
830 PhysicalPlan::CreateTable {
831 table,
832 columns,
833 if_not_exists,
834 } => {
835 use arrow::array::{Int64Array, RecordBatch};
837 use arrow::datatypes::{DataType, Field, Schema};
838
839 let exists = self.table_schemas.read().unwrap().contains_key(table);
841
842 if exists && !if_not_exists {
843 return Err(DbxError::Schema(format!(
844 "Table '{}' already exists",
845 table
846 )));
847 }
848
849 if !exists {
850 let fields: Vec<Field> = columns
852 .iter()
853 .map(|(name, type_str)| {
854 let data_type = match type_str.to_uppercase().as_str() {
855 "INT" | "INTEGER" => DataType::Int64,
856 "TEXT" | "STRING" | "VARCHAR" => DataType::Utf8,
857 "FLOAT" | "DOUBLE" => DataType::Float64,
858 "BOOL" | "BOOLEAN" => DataType::Boolean,
859 _ => DataType::Utf8, };
861 Field::new(name, data_type, true)
862 })
863 .collect();
864
865 let schema = Arc::new(Schema::new(fields));
866
867 self.table_schemas
869 .write()
870 .unwrap()
871 .insert(table.clone(), schema.clone());
872
873 self.wos_for_metadata()
875 .save_schema_metadata(table, &schema)?;
876 }
877
878 let schema = Arc::new(Schema::new(vec![Field::new(
880 "rows_affected",
881 DataType::Int64,
882 false,
883 )]));
884 let array = Int64Array::from(vec![1]);
885 let batch =
886 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
887
888 Ok(vec![batch])
889 }
890 PhysicalPlan::CreateIndex {
891 table,
892 index_name,
893 columns,
894 if_not_exists,
895 } => {
896 use arrow::array::{Int64Array, RecordBatch};
897 use arrow::datatypes::{DataType, Field, Schema};
898
899 {
901 let schemas = self.table_schemas.read().unwrap();
902 if !schemas.contains_key(table.as_str()) {
903 return Err(DbxError::Schema(format!(
904 "Table '{}' does not exist",
905 table
906 )));
907 }
908 }
909
910 let column = columns.first().ok_or_else(|| {
911 DbxError::Schema("CREATE INDEX requires at least one column".to_string())
912 })?;
913
914 let exists = self.index.has_index(table, column);
915
916 if exists && !if_not_exists {
917 return Err(DbxError::IndexAlreadyExists {
918 table: table.clone(),
919 column: column.clone(),
920 });
921 }
922
923 if !exists {
924 self.index.create_index(table, column)?;
925
926 self.index_registry
928 .write()
929 .unwrap()
930 .insert(index_name.clone(), (table.clone(), column.clone()));
931
932 self.wos_for_metadata()
934 .save_index_metadata(index_name, table, column)?;
935 }
936
937 let schema = Arc::new(Schema::new(vec![Field::new(
938 "rows_affected",
939 DataType::Int64,
940 false,
941 )]));
942 let array = Int64Array::from(vec![1]);
943 let batch =
944 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
945
946 Ok(vec![batch])
947 }
948 PhysicalPlan::DropIndex {
949 table,
950 index_name,
951 if_exists,
952 } => {
953 use arrow::array::{Int64Array, RecordBatch};
954 use arrow::datatypes::{DataType, Field, Schema};
955
956 let resolved_column = {
958 let registry = self.index_registry.read().unwrap();
959 registry
960 .get(index_name.as_str())
961 .map(|(_, col)| col.clone())
962 };
963
964 let column = resolved_column.as_deref().unwrap_or(index_name.as_str());
966
967 let exists = self.index.has_index(table, column);
968
969 if !exists && !if_exists {
970 return Err(DbxError::IndexNotFound {
971 table: table.clone(),
972 column: column.to_string(),
973 });
974 }
975
976 if exists {
977 self.index.drop_index(table, column)?;
978
979 self.index_registry
981 .write()
982 .unwrap()
983 .remove(index_name.as_str());
984
985 self.wos_for_metadata().delete_index_metadata(index_name)?;
987 }
988
989 let schema = Arc::new(Schema::new(vec![Field::new(
990 "rows_affected",
991 DataType::Int64,
992 false,
993 )]));
994 let array = Int64Array::from(vec![1]);
995 let batch =
996 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
997
998 Ok(vec![batch])
999 }
1000 PhysicalPlan::AlterTable { table, operation } => {
1001 use crate::sql::planner::types::AlterTableOperation;
1003 use arrow::array::{Int64Array, RecordBatch};
1004 use arrow::datatypes::{DataType, Field, Schema};
1005
1006 match operation {
1007 AlterTableOperation::AddColumn {
1008 column_name,
1009 data_type,
1010 } => {
1011 let mut schemas = self.table_schemas.write().unwrap();
1013 let current_schema = schemas.get(table).ok_or_else(|| {
1014 DbxError::Schema(format!("Table '{}' not found", table))
1015 })?;
1016
1017 let arrow_type = match data_type.to_uppercase().as_str() {
1019 "INT" | "INTEGER" => DataType::Int64,
1020 "TEXT" | "VARCHAR" | "STRING" => DataType::Utf8,
1021 "FLOAT" | "DOUBLE" | "REAL" => DataType::Float64,
1022 "BOOL" | "BOOLEAN" => DataType::Boolean,
1023 _ => DataType::Utf8, };
1025
1026 let new_field = Field::new(column_name, arrow_type, true);
1028
1029 let mut fields: Vec<Field> = current_schema
1031 .fields()
1032 .iter()
1033 .map(|f| f.as_ref().clone())
1034 .collect();
1035 fields.push(new_field);
1036 let new_schema = Arc::new(Schema::new(fields));
1037
1038 schemas.insert(table.clone(), new_schema.clone());
1040
1041 drop(schemas); self.wos_for_metadata()
1044 .save_schema_metadata(table, &new_schema)?;
1045 }
1046 AlterTableOperation::DropColumn { column_name } => {
1047 let mut schemas = self.table_schemas.write().unwrap();
1049 let current_schema = schemas.get(table).ok_or_else(|| {
1050 DbxError::Schema(format!("Table '{}' not found", table))
1051 })?;
1052
1053 let fields: Vec<Field> = current_schema
1055 .fields()
1056 .iter()
1057 .filter(|f| f.name() != column_name)
1058 .map(|f| f.as_ref().clone())
1059 .collect();
1060
1061 if fields.len() == current_schema.fields().len() {
1063 return Err(DbxError::Schema(format!(
1064 "Column '{}' not found in table '{}'",
1065 column_name, table
1066 )));
1067 }
1068
1069 let new_schema = Arc::new(Schema::new(fields));
1071
1072 schemas.insert(table.clone(), new_schema.clone());
1074
1075 drop(schemas); self.wos_for_metadata()
1078 .save_schema_metadata(table, &new_schema)?;
1079 }
1080 AlterTableOperation::RenameColumn { old_name, new_name } => {
1081 let mut schemas = self.table_schemas.write().unwrap();
1083 let current_schema = schemas.get(table).ok_or_else(|| {
1084 DbxError::Schema(format!("Table '{}' not found", table))
1085 })?;
1086
1087 let mut found = false;
1089 let fields: Vec<Field> = current_schema
1090 .fields()
1091 .iter()
1092 .map(|f| {
1093 if f.name() == old_name {
1094 found = true;
1095 Field::new(new_name, f.data_type().clone(), f.is_nullable())
1096 } else {
1097 f.as_ref().clone()
1098 }
1099 })
1100 .collect();
1101
1102 if !found {
1104 return Err(DbxError::Schema(format!(
1105 "Column '{}' not found in table '{}'",
1106 old_name, table
1107 )));
1108 }
1109
1110 let new_schema = Arc::new(Schema::new(fields));
1112
1113 schemas.insert(table.clone(), new_schema.clone());
1115
1116 drop(schemas); self.wos_for_metadata()
1119 .save_schema_metadata(table, &new_schema)?;
1120 }
1121 }
1122
1123 let schema = Arc::new(Schema::new(vec![Field::new(
1125 "rows_affected",
1126 DataType::Int64,
1127 false,
1128 )]));
1129 let array = Int64Array::from(vec![1]);
1130 let batch =
1131 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
1132
1133 Ok(vec![batch])
1134 }
1135 _ => {
1136 self.execute_select_plan(plan)
1138 }
1139 }
1140 }
1141
1142 fn execute_select_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
1144 for table in plan.tables() {
1147 let target_tables = self.get_tables_to_scan(&table, None);
1148 for t in target_tables {
1149 if !self.columnar_cache.has_table(&t) {
1150 let _ = self.sync_columnar_cache(&t);
1151 }
1152 }
1153 }
1154
1155 let tables = self.tables.read().unwrap();
1156 let mut operator = self.build_operator(plan, &tables, &self.columnar_cache)?;
1157 Self::drain_operator(&mut *operator)
1158 }
1159
1160 fn build_operator(
1162 &self,
1163 plan: &PhysicalPlan,
1164 tables: &HashMap<String, Vec<RecordBatch>>,
1165 columnar_cache: &ColumnarCache,
1166 ) -> DbxResult<Box<dyn PhysicalOperator>> {
1167 match plan {
1168 PhysicalPlan::TableScan {
1169 table,
1170 projection,
1171 filter,
1172 } => {
1173 self.workload_analyzer
1175 .write()
1176 .unwrap()
1177 .record(crate::engine::workload_analyzer::QueryPattern::RangeScan);
1178
1179 let mut filter_pushed_down = false;
1180 let target_tables = self.get_tables_to_scan(table, filter.as_ref());
1181
1182 let mut all_batches = Vec::new();
1183 let mut base_schema = None;
1184
1185 for t in target_tables {
1186 let cached_results = if let Some(filter_expr) = filter {
1188 let filter_expr_clone = filter_expr.clone();
1189 let result = columnar_cache.get_batches_with_filter(
1191 &t,
1192 if projection.is_empty() {
1193 None
1194 } else {
1195 Some(projection)
1196 },
1197 move |batch| {
1198 use crate::sql::executor::evaluate_expr;
1199 use arrow::array::BooleanArray;
1200
1201 let array = evaluate_expr(&filter_expr_clone, batch)?;
1202 let boolean_array = array
1203 .as_any()
1204 .downcast_ref::<BooleanArray>()
1205 .ok_or_else(|| DbxError::TypeMismatch {
1206 expected: "BooleanArray".to_string(),
1207 actual: format!("{:?}", array.data_type()),
1208 })?;
1209 Ok(boolean_array.clone())
1210 },
1211 )?;
1212 if result.is_some() {
1213 filter_pushed_down = true;
1214 }
1215 result
1216 } else {
1217 columnar_cache.get_batches(
1218 &t,
1219 if projection.is_empty() {
1220 None
1221 } else {
1222 Some(projection)
1223 },
1224 )?
1225 };
1226
1227 let (batches, schema, _) = if let Some(cached_batches) = cached_results {
1228 if cached_batches.is_empty() {
1229 let schema = {
1232 let schemas = self.table_schemas.read().unwrap();
1233 schemas
1234 .get(table)
1235 .or_else(|| {
1236 let table_lower = table.to_lowercase();
1237 schemas
1238 .iter()
1239 .find(|(k, _)| k.to_lowercase() == table_lower)
1240 .map(|(_, v)| v)
1241 })
1242 .cloned()
1243 }
1244 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1245 (vec![], schema, projection.clone())
1246 } else {
1247 let schema = cached_batches[0].schema();
1248 (cached_batches, schema, vec![])
1249 }
1250 } else {
1251 let cached_after_sync = columnar_cache.get_batches(
1253 &t,
1254 if projection.is_empty() {
1255 None
1256 } else {
1257 Some(projection)
1258 },
1259 )?;
1260
1261 if let Some(batches) = cached_after_sync {
1262 if !batches.is_empty() {
1263 let schema = batches[0].schema();
1264 (batches, schema, vec![])
1265 } else {
1266 let schema = {
1268 let schemas = self.table_schemas.read().unwrap();
1269 schemas
1270 .get(table)
1271 .or_else(|| {
1272 let table_lower = table.to_lowercase();
1273 schemas
1274 .iter()
1275 .find(|(k, _)| k.to_lowercase() == table_lower)
1276 .map(|(_, v)| v)
1277 })
1278 .cloned()
1279 }
1280 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1281 (vec![], schema, projection.clone())
1282 }
1283 } else {
1284 let batches_opt = tables.get(&t).or_else(|| {
1286 let table_lower = t.to_lowercase();
1287 tables
1288 .iter()
1289 .find(|(k, _)| k.to_lowercase() == table_lower)
1290 .map(|(_, v)| v)
1291 });
1292
1293 if let Some(batches) = batches_opt {
1294 if batches.is_empty() {
1295 let schema = {
1296 let schemas = self.table_schemas.read().unwrap();
1297 schemas
1298 .get(table)
1299 .or_else(|| {
1300 let table_lower = table.to_lowercase();
1301 schemas
1302 .iter()
1303 .find(|(k, _)| k.to_lowercase() == table_lower)
1304 .map(|(_, v)| v)
1305 })
1306 .cloned()
1307 }
1308 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1309 (vec![], schema, projection.clone())
1310 } else {
1311 let schema = batches[0].schema();
1312 (batches.clone(), schema, projection.clone())
1313 }
1314 } else {
1315 let schema = {
1317 let schemas = self.table_schemas.read().unwrap();
1318 schemas.get(table).cloned()
1319 }
1320 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1321 (vec![], schema, projection.clone())
1322 }
1323 }
1324 };
1325
1326 if base_schema.is_none() {
1327 base_schema = Some(schema);
1328 }
1329 all_batches.extend(batches);
1330 }
1331
1332 let final_schema = base_schema.unwrap_or_else(|| {
1334 let schemas = self.table_schemas.read().unwrap();
1335 schemas
1336 .get(table)
1337 .cloned()
1338 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()))
1339 });
1340
1341 let projection_to_use = if projection.is_empty() {
1342 vec![]
1343 } else {
1344 projection.clone()
1345 };
1346 let mut scan =
1347 TableScanOperator::new(table.clone(), final_schema, projection_to_use);
1348 scan.set_data(all_batches);
1349
1350 if let Some(filter_expr) = filter {
1352 if !filter_pushed_down {
1353 Ok(Box::new(FilterOperator::new(
1354 Box::new(scan),
1355 filter_expr.clone(),
1356 )))
1357 } else {
1358 Ok(Box::new(scan))
1360 }
1361 } else {
1362 Ok(Box::new(scan))
1363 }
1364 }
1365
1366 PhysicalPlan::Projection {
1367 input,
1368 exprs,
1369 aliases,
1370 } => {
1371 let input_op = self.build_operator(input, tables, columnar_cache)?;
1372 use arrow::datatypes::Field;
1373
1374 let input_schema = input_op.schema();
1375 let fields: Vec<Field> = exprs
1376 .iter()
1377 .enumerate()
1378 .map(|(i, expr)| {
1379 let data_type = expr.get_type(input_schema);
1380 let field_name = if let Some(Some(alias)) = aliases.get(i) {
1381 alias.clone()
1382 } else {
1383 format!("col_{}", i)
1384 };
1385 Field::new(&field_name, data_type, true)
1386 })
1387 .collect();
1388
1389 let output_schema = Arc::new(Schema::new(fields));
1390 Ok(Box::new(ProjectionOperator::new(
1391 input_op,
1392 output_schema,
1393 exprs.clone(),
1394 )))
1395 }
1396
1397 PhysicalPlan::Limit {
1398 input,
1399 count,
1400 offset,
1401 } => {
1402 let input_op = self.build_operator(input, tables, columnar_cache)?;
1403 Ok(Box::new(LimitOperator::new(input_op, *count, *offset)))
1404 }
1405
1406 PhysicalPlan::SortMerge { input, order_by } => {
1407 let input_op = self.build_operator(input, tables, columnar_cache)?;
1408 Ok(Box::new(SortOperator::new(input_op, order_by.clone())))
1409 }
1410
1411 PhysicalPlan::HashAggregate {
1412 input,
1413 group_by,
1414 aggregates,
1415 } => {
1416 self.workload_analyzer
1418 .write()
1419 .unwrap()
1420 .record(crate::engine::workload_analyzer::QueryPattern::Aggregation);
1421
1422 let input_op = self.build_operator(input, tables, columnar_cache)?;
1423 let input_schema = input_op.schema();
1424 let mut fields = Vec::new();
1425
1426 for &idx in group_by {
1427 fields.push(input_schema.field(idx).clone());
1428 }
1429
1430 for agg in aggregates {
1431 use arrow::datatypes::DataType;
1432 let data_type = match agg.function {
1433 crate::sql::planner::AggregateFunction::Count => DataType::Int64,
1434 crate::sql::planner::AggregateFunction::Sum
1435 | crate::sql::planner::AggregateFunction::Avg
1436 | crate::sql::planner::AggregateFunction::Min
1437 | crate::sql::planner::AggregateFunction::Max => DataType::Float64,
1438 };
1439 let name = agg
1440 .alias
1441 .clone()
1442 .unwrap_or_else(|| format!("agg_{:?}", agg.function));
1443 fields.push(arrow::datatypes::Field::new(&name, data_type, true));
1444 }
1445
1446 let agg_schema = Arc::new(arrow::datatypes::Schema::new(fields));
1447 Ok(Box::new(
1448 HashAggregateOperator::new(
1449 input_op,
1450 agg_schema,
1451 group_by.clone(),
1452 aggregates.clone(),
1453 )
1454 .with_gpu(self.gpu_manager.clone()),
1455 ))
1456 }
1457
1458 PhysicalPlan::HashJoin {
1459 left,
1460 right,
1461 on,
1462 join_type,
1463 } => {
1464 self.workload_analyzer
1466 .write()
1467 .unwrap()
1468 .record(crate::engine::workload_analyzer::QueryPattern::Join);
1469
1470 use arrow::datatypes::Field;
1471
1472 let left_op = self.build_operator(left, tables, columnar_cache)?;
1473 let right_op = self.build_operator(right, tables, columnar_cache)?;
1474
1475 let left_schema = left_op.schema();
1477 let right_schema = right_op.schema();
1478
1479 let mut joined_fields: Vec<Field> = Vec::new();
1480 for field in left_schema.fields().iter() {
1481 let mut f = field.as_ref().clone();
1482 if matches!(join_type, crate::sql::planner::JoinType::Right) {
1484 f = f.with_nullable(true);
1485 }
1486 joined_fields.push(f);
1487 }
1488 for field in right_schema.fields().iter() {
1489 let mut f = field.as_ref().clone();
1490 if matches!(join_type, crate::sql::planner::JoinType::Left) {
1492 f = f.with_nullable(true);
1493 }
1494 joined_fields.push(f);
1495 }
1496
1497 let joined_schema = Arc::new(Schema::new(joined_fields));
1498
1499 Ok(Box::new(HashJoinOperator::new(
1500 left_op,
1501 right_op,
1502 joined_schema,
1503 on.clone(),
1504 *join_type,
1505 )))
1506 }
1507
1508 PhysicalPlan::Insert { .. } => {
1509 unreachable!("INSERT should not reach build_operator")
1511 }
1512 PhysicalPlan::Update { .. } => {
1513 unreachable!("UPDATE should not reach build_operator")
1515 }
1516 PhysicalPlan::Delete { .. } => {
1517 unreachable!("DELETE should not reach build_operator")
1519 }
1520 PhysicalPlan::DropTable { .. } => {
1521 unreachable!("DROP TABLE should not reach build_operator")
1523 }
1524 PhysicalPlan::CreateTable { .. } => {
1525 unreachable!("CREATE TABLE should not reach build_operator")
1527 }
1528 PhysicalPlan::CreateIndex { .. } => {
1529 unreachable!("CREATE INDEX should not reach build_operator")
1531 }
1532 PhysicalPlan::DropIndex { .. } => {
1533 unreachable!("DROP INDEX should not reach build_operator")
1535 }
1536 PhysicalPlan::AlterTable { .. } => {
1537 unreachable!("ALTER TABLE should not reach build_operator")
1539 }
1540 PhysicalPlan::CreateFunction { .. } => {
1541 unreachable!("CREATE FUNCTION should not reach build_operator")
1542 }
1543 PhysicalPlan::CreateTrigger { .. } => {
1544 unreachable!("CREATE TRIGGER should not reach build_operator")
1545 }
1546 PhysicalPlan::CreateJob { .. } => {
1547 unreachable!("CREATE JOB should not reach build_operator")
1548 }
1549 PhysicalPlan::DropFunction { .. } => {
1550 unreachable!("DROP FUNCTION should not reach build_operator")
1551 }
1552 PhysicalPlan::DropTrigger { .. } => {
1553 unreachable!("DROP TRIGGER should not reach build_operator")
1554 }
1555 PhysicalPlan::DropJob { .. } => {
1556 unreachable!("DROP JOB should not reach build_operator")
1557 }
1558 }
1559 }
1560
1561 fn drain_operator(op: &mut dyn PhysicalOperator) -> DbxResult<Vec<RecordBatch>> {
1563 let mut results = Vec::new();
1564 while let Some(batch) = op.next()? {
1565 if batch.num_rows() > 0 {
1566 results.push(batch);
1567 }
1568 }
1569 Ok(results)
1570 }
1571}
1572
1573impl crate::traits::DatabaseSql for Database {
1578 fn execute_sql(&self, sql: &str) -> DbxResult<Vec<arrow::record_batch::RecordBatch>> {
1579 Database::execute_sql(self, sql)
1581 }
1582
1583 fn register_table(&self, name: &str, batches: Vec<arrow::record_batch::RecordBatch>) {
1584 Database::register_table(self, name, batches)
1586 }
1587
1588 fn append_batch(&self, table: &str, batch: arrow::record_batch::RecordBatch) -> DbxResult<()> {
1589 Database::append_batch(self, table, batch);
1591 Ok(())
1592 }
1593}