1use crate::engine::Database;
4use crate::sql::StringCaseExt;
5
6use crate::error::{DbxError, DbxResult};
7use crate::sql::executor::{
8 FilterOperator, HashAggregateOperator, HashJoinOperator, LimitOperator, PhysicalOperator,
9 ProjectionOperator, SortOperator, TableScanOperator,
10};
11use crate::sql::planner::{LogicalPlanner, PhysicalExpr, PhysicalPlan, PhysicalPlanner};
12use crate::storage::columnar_cache::ColumnarCache;
13use arrow::array::RecordBatch;
14use arrow::datatypes::Schema;
15use arrow::ipc::writer::StreamWriter;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19fn infer_schema_from_values(values: &[PhysicalExpr]) -> DbxResult<Schema> {
25 use crate::storage::columnar::ScalarValue;
26 use arrow::datatypes::{DataType, Field};
27
28 let fields: Vec<Field> = values
29 .iter()
30 .enumerate()
31 .map(|(i, expr)| {
32 let name = format!("col_{}", i);
33 let data_type = match expr {
34 PhysicalExpr::Literal(scalar) => match scalar {
35 ScalarValue::Int32(_) => DataType::Int32,
36 ScalarValue::Int64(_) => DataType::Int64,
37 ScalarValue::Float64(_) => DataType::Float64,
38 ScalarValue::Utf8(_) => DataType::Utf8,
39 ScalarValue::Boolean(_) => DataType::Boolean,
40 ScalarValue::Binary(_) => DataType::Binary,
41 ScalarValue::Null => DataType::Utf8, },
43 _ => DataType::Utf8, };
45 Field::new(name, data_type, true) })
47 .collect();
48
49 Ok(Schema::new(fields))
50}
51
52fn serialize_to_arrow_ipc(schema: &Schema, row_values: &[PhysicalExpr]) -> DbxResult<Vec<u8>> {
54 use crate::storage::columnar::ScalarValue;
55 use arrow::array::*;
56
57 let mut arrays: Vec<ArrayRef> = Vec::new();
59
60 for (field, expr) in schema.fields().iter().zip(row_values) {
61 let array: ArrayRef = match expr {
62 PhysicalExpr::Literal(scalar) => {
63 match (field.data_type(), scalar) {
64 (arrow::datatypes::DataType::Int32, ScalarValue::Int32(v)) => {
65 Arc::new(Int32Array::from(vec![*v]))
66 }
67 (arrow::datatypes::DataType::Int64, ScalarValue::Int64(v)) => {
68 Arc::new(Int64Array::from(vec![*v]))
69 }
70 (arrow::datatypes::DataType::Int64, ScalarValue::Int32(v)) => {
72 Arc::new(Int64Array::from(vec![*v as i64]))
73 }
74 (arrow::datatypes::DataType::Float64, ScalarValue::Float64(v)) => {
75 Arc::new(Float64Array::from(vec![*v]))
76 }
77 (arrow::datatypes::DataType::Utf8, ScalarValue::Utf8(s)) => {
78 Arc::new(StringArray::from(vec![s.as_str()]))
79 }
80 (arrow::datatypes::DataType::Boolean, ScalarValue::Boolean(b)) => {
81 Arc::new(BooleanArray::from(vec![*b]))
82 }
83 (arrow::datatypes::DataType::Binary, ScalarValue::Binary(b)) => {
84 Arc::new(BinaryArray::from(vec![b.as_slice()]))
85 }
86 (_, ScalarValue::Null) => {
87 match field.data_type() {
89 arrow::datatypes::DataType::Int32 => {
90 Arc::new(Int32Array::from(vec![None as Option<i32>]))
91 }
92 arrow::datatypes::DataType::Int64 => {
93 Arc::new(Int64Array::from(vec![None as Option<i64>]))
94 }
95 arrow::datatypes::DataType::Float64 => {
96 Arc::new(Float64Array::from(vec![None as Option<f64>]))
97 }
98 arrow::datatypes::DataType::Utf8 => {
99 Arc::new(StringArray::from(vec![None as Option<&str>]))
100 }
101 arrow::datatypes::DataType::Boolean => {
102 Arc::new(BooleanArray::from(vec![None as Option<bool>]))
103 }
104 arrow::datatypes::DataType::Binary => {
105 Arc::new(BinaryArray::from(vec![None as Option<&[u8]>]))
106 }
107 _ => {
108 return Err(DbxError::NotImplemented(format!(
109 "Unsupported null type: {:?}",
110 field.data_type()
111 )));
112 }
113 }
114 }
115 _ => {
116 return Err(DbxError::NotImplemented(format!(
117 "Type mismatch: field {:?} vs scalar {:?}",
118 field.data_type(),
119 scalar
120 )));
121 }
122 }
123 }
124 _ => {
125 return Err(DbxError::NotImplemented(
126 "Non-literal value in INSERT".to_string(),
127 ));
128 }
129 };
130 arrays.push(array);
131 }
132
133 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays)
135 .map_err(|e| DbxError::Storage(e.to_string()))?;
136
137 let mut buffer = Vec::new();
139 {
140 let mut writer = StreamWriter::try_new(&mut buffer, schema)
141 .map_err(|e| DbxError::Serialization(e.to_string()))?;
142 writer
143 .write(&batch)
144 .map_err(|e| DbxError::Serialization(e.to_string()))?;
145 writer
146 .finish()
147 .map_err(|e| DbxError::Serialization(e.to_string()))?;
148 }
149
150 Ok(buffer)
151}
152
153fn reconstruct_full_record(key: &[u8], value_bytes: &[u8]) -> Vec<u8> {
162 let key_json = if key.len() == 4 {
164 let val = i32::from_le_bytes(key.try_into().unwrap_or([0; 4]));
165 serde_json::Value::Number(val.into())
166 } else if key.len() == 8 {
167 let val = i64::from_le_bytes(key.try_into().unwrap_or([0; 8]));
168 serde_json::Value::Number(val.into())
169 } else {
170 let s = String::from_utf8_lossy(key).to_string();
172 serde_json::Value::String(s)
173 };
174
175 let mut values: Vec<serde_json::Value> =
177 serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
178 values.insert(0, key_json);
179 serde_json::to_vec(&values).unwrap_or_else(|_| value_bytes.to_vec())
180}
181
182pub fn json_record_to_batch(value_bytes: &[u8]) -> DbxResult<RecordBatch> {
184 use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray};
185 use arrow::datatypes::{DataType, Field, Schema};
186
187 let current_values: Vec<serde_json::Value> =
188 serde_json::from_slice(value_bytes).unwrap_or_else(|_| vec![]);
189
190 let mut fields = Vec::new();
191 let mut columns: Vec<ArrayRef> = Vec::new();
192
193 for (i, val) in current_values.iter().enumerate() {
194 match val {
195 serde_json::Value::Number(n) => {
196 if n.is_i64() {
197 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
198 columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
199 } else if n.is_f64() {
200 fields.push(Field::new(format!("col_{}", i), DataType::Float64, true));
201 columns.push(Arc::new(Float64Array::from(vec![n.as_f64()])));
202 } else {
203 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
204 columns.push(Arc::new(Int64Array::from(vec![n.as_i64()])));
205 }
206 }
207 serde_json::Value::String(s) => {
208 fields.push(Field::new(format!("col_{}", i), DataType::Utf8, true));
209 columns.push(Arc::new(StringArray::from(vec![Some(s.as_str())])));
210 }
211 serde_json::Value::Bool(b) => {
212 fields.push(Field::new(format!("col_{}", i), DataType::Boolean, true));
213 columns.push(Arc::new(BooleanArray::from(vec![Some(*b)])));
214 }
215 serde_json::Value::Null => {
216 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
217 columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
218 }
219 _ => {
220 fields.push(Field::new(format!("col_{}", i), DataType::Int64, true));
221 columns.push(Arc::new(Int64Array::from(vec![None::<i64>])));
222 }
223 }
224 }
225
226 let schema = Arc::new(Schema::new(fields));
227 RecordBatch::try_new(schema, columns).map_err(DbxError::from)
228}
229
230fn evaluate_filter_for_record(
233 filter_expr: Option<&PhysicalExpr>,
234 value_bytes: &[u8],
235) -> DbxResult<bool> {
236 if let Some(expr) = filter_expr {
237 let batch = json_record_to_batch(value_bytes)?;
238
239 use crate::sql::executor::evaluate_expr;
240 let result = evaluate_expr(expr, &batch)?;
241
242 use arrow::array::BooleanArray;
243 let bool_array = result
244 .as_any()
245 .downcast_ref::<BooleanArray>()
246 .ok_or_else(|| DbxError::TypeMismatch {
247 expected: "BooleanArray".to_string(),
248 actual: format!("{:?}", result.data_type()),
249 })?;
250
251 Ok(bool_array.value(0))
252 } else {
253 Ok(true) }
255}
256
257impl Database {
258 pub(crate) fn get_tables_to_scan(
265 &self,
266 table: &str,
267 _filter: Option<&PhysicalExpr>,
268 ) -> Vec<String> {
269 let maps = self.partition_maps.read().unwrap();
270 if let Some(map) = maps.get(table) {
271 map.all_partitions()
274 } else {
275 self.sharding_router
280 .all_shards()
281 .iter()
282 .map(|shard| format!("{}__shard_{}", table, shard.id))
283 .collect()
284 }
285 }
286
287 pub fn register_table(&self, name: &str, batches: Vec<RecordBatch>) {
295 if let Some(first_batch) = batches.first() {
297 let schema = first_batch.schema();
298 let mut schemas = self.table_schemas.write().unwrap();
299 schemas.insert(name.to_string(), schema);
300 }
301
302 let mut tables = self.tables.write().unwrap();
304 tables.insert(name.to_string(), batches);
305 }
306
307 pub fn append_batch(&self, table: &str, batch: RecordBatch) {
309 let mut tables = self.tables.write().unwrap();
310 tables.entry(table.to_string()).or_default().push(batch);
311 }
312
313 pub fn execute_sql(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
329 let t0 = std::time::Instant::now();
330
331 let sql_trimmed = sql.trim();
333
334 if sql_trimmed.starts_with_ignore_ascii_case("CREATE VIEW") {
335 return self.handle_create_view(sql_trimmed);
336 }
337 if sql_trimmed.starts_with_ignore_ascii_case("DROP VIEW") {
338 return self.handle_drop_view(sql_trimmed);
339 }
340
341 if sql_trimmed.starts_with_ignore_ascii_case("CREATE MATERIALIZED VIEW") {
343 return self.handle_create_materialized_view(sql_trimmed);
344 }
345 if sql_trimmed.starts_with_ignore_ascii_case("DROP MATERIALIZED VIEW") {
346 return self.handle_drop_materialized_view(sql_trimmed);
347 }
348 if sql_trimmed.starts_with_ignore_ascii_case("REFRESH MATERIALIZED VIEW") {
349 return self.handle_refresh_materialized_view(sql_trimmed);
350 }
351
352 if sql_trimmed.starts_with_ignore_ascii_case("SELECT")
354 && let Some(cached) = self.try_matview_cache(sql_trimmed)
355 {
356 return Ok(cached);
357 }
358
359 let expanded = self.view_registry.expand(sql_trimmed);
361 let sql = expanded.as_str();
362
363 let statements = self.sql_parser.parse(sql)?;
365 if statements.is_empty() {
366 return Ok(vec![]);
367 }
368
369 let planner = LogicalPlanner::new();
371 let logical_plan = planner.plan(&statements[0])?;
372
373 let optimized = self.sql_optimizer.optimize(logical_plan)?;
375
376 let physical_planner = PhysicalPlanner::new(Arc::clone(&self.table_schemas));
378 let physical_plan = physical_planner.plan(&optimized)?;
379
380 let result = self.execute_physical_plan(&physical_plan);
382
383 let elapsed_us = t0.elapsed().as_micros() as u64;
385 self.metrics.inc_sql_queries();
386 self.metrics.query_latency_us.observe(elapsed_us);
387
388 result
389 }
390
391 fn handle_create_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
393 let upper = sql.to_uppercase();
395 let after_view = &sql[upper.find("VIEW").unwrap() + 4..]
396 .trim_start()
397 .to_owned();
398 let as_pos = after_view
399 .to_uppercase()
400 .find(" AS ")
401 .ok_or_else(|| DbxError::SqlParse {
402 message: "CREATE VIEW requires AS".to_string(),
403 sql: sql.to_string(),
404 })?;
405 let view_name = after_view[..as_pos].trim();
406 let view_sql = after_view[as_pos + 4..].trim();
407 self.view_registry.create(view_name, view_sql)?;
408 self.one_row_affected_batch()
409 }
410
411 fn handle_drop_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
413 let upper = sql.to_uppercase();
415 let after_view = sql[upper.find("VIEW").unwrap() + 4..]
416 .trim_start()
417 .to_owned();
418 let (if_exists, name_part) = if after_view.starts_with_ignore_ascii_case("IF EXISTS") {
419 (true, after_view[9..].trim_start().to_string())
420 } else {
421 (false, after_view.clone())
422 };
423 let view_name = name_part.trim();
424
425 if if_exists && !self.view_registry.exists(view_name) {
426 return self.one_row_affected_batch();
427 }
428
429 self.view_registry.drop(view_name)?;
430 self.one_row_affected_batch()
431 }
432
433 fn handle_create_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
439 let upper = sql.to_uppercase();
440
441 let after_view = sql[upper.find("VIEW").unwrap() + 4..]
443 .trim_start()
444 .to_owned();
445 let after_upper = after_view.to_uppercase();
446
447 let (refresh_interval_secs, body) = if let Some(re_pos) = after_upper.find("REFRESH EVERY")
449 {
450 let name_part = after_view[..re_pos].trim().to_string();
452 let rest = after_view[re_pos + 13..].trim_start().to_string();
453 let as_pos = rest
454 .to_uppercase()
455 .find(" AS ")
456 .ok_or_else(|| DbxError::SqlParse {
457 message: "CREATE MATERIALIZED VIEW ... REFRESH EVERY <n> AS <sql>".to_string(),
458 sql: sql.to_string(),
459 })?;
460 let interval_str = rest[..as_pos].trim();
461 let interval_secs: u64 = interval_str.parse().map_err(|_| DbxError::SqlParse {
462 message: format!(
463 "REFRESH EVERY requires integer seconds, got '{}'",
464 interval_str
465 ),
466 sql: sql.to_string(),
467 })?;
468 let view_sql = rest[as_pos + 4..].trim().to_string();
469 (Some(interval_secs), (name_part, view_sql))
470 } else {
471 let as_pos = after_upper.find(" AS ").ok_or_else(|| DbxError::SqlParse {
472 message: "CREATE MATERIALIZED VIEW requires AS".to_string(),
473 sql: sql.to_string(),
474 })?;
475 let name = after_view[..as_pos].trim().to_string();
476 let view_sql = after_view[as_pos + 4..].trim().to_string();
477 (None, (name, view_sql))
478 };
479
480 self.mat_view_registry
481 .create(&body.0, &body.1, refresh_interval_secs)?;
482 self.one_row_affected_batch()
483 }
484
485 fn handle_drop_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
487 let upper = sql.to_uppercase();
488 let name = sql[upper.find("VIEW").unwrap() + 4..].trim();
489 self.mat_view_registry.remove(name)?;
490 self.one_row_affected_batch()
491 }
492
493 fn handle_refresh_materialized_view(&self, sql: &str) -> DbxResult<Vec<RecordBatch>> {
495 let upper = sql.to_uppercase();
496 let name = sql[upper.find("VIEW").unwrap() + 4..].trim().to_lowercase();
497 let view_sql = self.mat_view_registry.get_sql(&name).ok_or_else(|| {
498 DbxError::InvalidArguments(format!("'{}' 구체화된 뷰를 찾을 수 없음", name))
499 })?;
500 let batches = self.execute_sql(&view_sql)?;
502 self.mat_view_registry.set_cache(&name, batches)?;
503 self.one_row_affected_batch()
504 }
505
506 fn try_matview_cache(&self, sql: &str) -> Option<Vec<RecordBatch>> {
511 let upper = sql.to_uppercase();
512 let from_pos = upper.find(" FROM ")?;
513 let after_from = sql[from_pos + 6..].trim();
514 let name = after_from
516 .split(|c: char| c.is_whitespace() || c == ';' || c == ')')
517 .next()?;
518 if self.mat_view_registry.is_fresh(name) {
519 self.mat_view_registry.get_cache(name)
520 } else {
521 None
522 }
523 }
524
525 fn one_row_affected_batch(&self) -> DbxResult<Vec<RecordBatch>> {
527 use arrow::array::Int64Array;
528 use arrow::datatypes::{DataType, Field, Schema};
529 let schema = Arc::new(Schema::new(vec![Field::new(
530 "rows_affected",
531 DataType::Int64,
532 false,
533 )]));
534 let array = Int64Array::from(vec![1_i64]);
535 let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
536 Ok(vec![batch])
537 }
538
539 fn execute_physical_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
541 match plan {
542 PhysicalPlan::Insert {
543 table,
544 columns: _,
545 values,
546 } => {
547 self.workload_analyzer
549 .write()
550 .unwrap()
551 .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
552
553 let mut rows_inserted = 0;
555
556 for row_values in values {
557 if row_values.is_empty() {
560 continue;
561 }
562
563 let key = match &row_values[0] {
565 PhysicalExpr::Literal(scalar) => {
566 use crate::storage::columnar::ScalarValue;
567 match scalar {
568 ScalarValue::Utf8(s) => s.as_bytes().to_vec(),
569 ScalarValue::Int32(i) => i.to_le_bytes().to_vec(),
570 ScalarValue::Int64(i) => i.to_le_bytes().to_vec(),
571 ScalarValue::Float64(f) => f.to_le_bytes().to_vec(),
572 ScalarValue::Boolean(b) => vec![if *b { 1 } else { 0 }],
573 _ => {
578 return Err(DbxError::NotImplemented(
579 "Non-literal key in INSERT".to_string(),
580 ));
581 }
582 }
583 }
584 _ => {
585 return Err(DbxError::NotImplemented(
586 "Non-literal key in INSERT".to_string(),
587 ));
588 }
589 };
590
591 let schema = {
593 let schemas = self.table_schemas.read().unwrap();
594 schemas.get(table.as_str()).cloned()
595 }
596 .unwrap_or_else(|| {
597 Arc::new(
599 infer_schema_from_values(row_values)
600 .expect("Failed to infer schema from values"),
601 )
602 });
603
604 let value_bytes = serialize_to_arrow_ipc(&schema, row_values)?;
606
607 self.scatter_gather.scatter_write(
611 &key,
612 &value_bytes,
613 |shard_id, sub_key, sub_val| {
614 let shard_table = format!("{}__shard_{}", table, shard_id);
615 let _ = self.insert(&shard_table, sub_key, sub_val);
617 },
618 );
619
620 rows_inserted += 1;
621 }
622
623 use arrow::array::{Int64Array, RecordBatch};
625 use arrow::datatypes::{DataType, Field, Schema};
626
627 let schema = Arc::new(Schema::new(vec![Field::new(
628 "rows_inserted",
629 DataType::Int64,
630 false,
631 )]));
632 let array = Int64Array::from(vec![rows_inserted as i64]);
633 let batch =
634 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
635
636 Ok(vec![batch])
637 }
638 PhysicalPlan::Update {
639 table,
640 assignments,
641 filter,
642 } => {
643 self.workload_analyzer
645 .write()
646 .unwrap()
647 .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
648
649 let target_tables = self.get_tables_to_scan(table, filter.as_ref());
651 let mut all_records = Vec::new();
652 for target_table in &target_tables {
653 all_records.extend(self.scan(target_table)?);
654 }
655
656 let mut rows_updated = 0_i64;
657
658 let column_index_map = {
660 let schemas = self.table_schemas.read().unwrap();
661 schemas.get(table.as_str()).map(|schema| {
662 schema
663 .fields()
664 .iter()
665 .enumerate()
666 .map(|(i, field)| (field.name().clone(), i))
667 .collect::<std::collections::HashMap<String, usize>>()
668 })
669 };
670
671 for (key, value_bytes) in all_records {
672 let full_record = reconstruct_full_record(&key, &value_bytes);
675 let should_update = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
676
677 if should_update {
678 let mut current_values: Vec<serde_json::Value> =
679 serde_json::from_slice(&full_record).unwrap_or_else(|_| vec![]);
680
681 for (column_name, expr) in assignments.iter() {
683 let target_idx = column_index_map
684 .as_ref()
685 .and_then(|map| map.get(column_name).copied())
686 .unwrap_or_else(|| {
687 assignments
689 .iter()
690 .position(|(n, _)| n == column_name)
691 .unwrap_or(0)
692 });
693
694 if let PhysicalExpr::Literal(scalar) = expr {
695 use crate::storage::columnar::ScalarValue;
696 let new_value = match scalar {
697 ScalarValue::Utf8(s) => serde_json::Value::String(s.clone()),
698 ScalarValue::Int32(v) => serde_json::Value::Number((*v).into()),
699 ScalarValue::Int64(v) => serde_json::Value::Number((*v).into()),
700 ScalarValue::Float64(f) => serde_json::Number::from_f64(*f)
701 .map(serde_json::Value::Number)
702 .unwrap_or(serde_json::Value::Null),
703 ScalarValue::Boolean(b) => serde_json::Value::Bool(*b),
704 ScalarValue::Binary(b) => {
705 serde_json::Value::String(base64::Engine::encode(
707 &base64::engine::general_purpose::STANDARD,
708 b,
709 ))
710 }
711 ScalarValue::Null => serde_json::Value::Null,
712 };
713
714 if target_idx < current_values.len() {
715 current_values[target_idx] = new_value;
716 }
717 }
718 }
719
720 let storage_values = if current_values.len() > 1 {
722 ¤t_values[1..]
723 } else {
724 ¤t_values[..]
725 };
726 let new_value_bytes = serde_json::to_vec(storage_values)
727 .map_err(|e| DbxError::Serialization(e.to_string()))?;
728
729 self.insert(table, &key, &new_value_bytes)?;
730 rows_updated += 1;
731 }
732 }
733
734 use arrow::array::{Int64Array, RecordBatch};
736 use arrow::datatypes::{DataType, Field, Schema};
737
738 let schema = Arc::new(Schema::new(vec![Field::new(
739 "rows_updated",
740 DataType::Int64,
741 false,
742 )]));
743 let array = Int64Array::from(vec![rows_updated]);
744 let batch =
745 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
746
747 Ok(vec![batch])
748 }
749 PhysicalPlan::Delete { table, filter } => {
750 self.workload_analyzer
752 .write()
753 .unwrap()
754 .record(crate::engine::workload_analyzer::QueryPattern::PointQuery);
755
756 let target_tables = self.get_tables_to_scan(table, filter.as_ref());
760 let mut all_records = Vec::new();
761 for target_table in &target_tables {
762 all_records.extend(self.scan(target_table)?);
763 }
764
765 let mut rows_deleted = 0_i64;
766
767 for (key, value_bytes) in all_records {
769 let full_record = reconstruct_full_record(&key, &value_bytes);
771 let should_delete = evaluate_filter_for_record(filter.as_ref(), &full_record)?;
772
773 if should_delete {
774 self.delete(table, &key)?;
776 rows_deleted += 1;
777 }
778 }
779
780 use arrow::array::{Int64Array, RecordBatch};
782 use arrow::datatypes::{DataType, Field, Schema};
783
784 let schema = Arc::new(Schema::new(vec![Field::new(
785 "rows_deleted",
786 DataType::Int64,
787 false,
788 )]));
789 let array = Int64Array::from(vec![rows_deleted]);
790 let batch =
791 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
792
793 Ok(vec![batch])
794 }
795 PhysicalPlan::DropTable { table, if_exists } => {
796 use arrow::array::{Int64Array, RecordBatch};
798 use arrow::datatypes::{DataType, Field, Schema};
799
800 let exists = self.table_schemas.read().unwrap().contains_key(table);
802
803 if !exists && !if_exists {
804 return Err(DbxError::TableNotFound(table.clone()));
805 }
806
807 if exists {
808 self.table_schemas.write().unwrap().remove(table);
810
811 self.wos_for_metadata().delete_schema_metadata(table)?;
813
814 }
818
819 let schema = Arc::new(Schema::new(vec![Field::new(
821 "rows_affected",
822 DataType::Int64,
823 false,
824 )]));
825 let array = Int64Array::from(vec![1]);
826 let batch =
827 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
828
829 Ok(vec![batch])
830 }
831 PhysicalPlan::CreateTable {
832 table,
833 columns,
834 if_not_exists,
835 policy,
836 } => {
837 use arrow::array::{Int64Array, RecordBatch};
839 use arrow::datatypes::{DataType, Field, Schema};
840
841 let exists = self.table_schemas.read().unwrap().contains_key(table);
843
844 if exists && !if_not_exists {
845 return Err(DbxError::Schema(format!(
846 "Table '{}' already exists",
847 table
848 )));
849 }
850
851 if !exists {
852 let fields: Vec<Field> = columns
854 .iter()
855 .map(|(name, type_str)| {
856 let data_type = match type_str.to_uppercase().as_str() {
857 "INT" | "INTEGER" => DataType::Int64,
858 "TEXT" | "STRING" | "VARCHAR" => DataType::Utf8,
859 "FLOAT" | "DOUBLE" => DataType::Float64,
860 "BOOL" | "BOOLEAN" => DataType::Boolean,
861 _ => DataType::Utf8, };
863 Field::new(name, data_type, true)
864 })
865 .collect();
866
867 let mut schema = Schema::new(fields);
868
869 if let Some(p) = policy
871 && let Ok(json) = p.to_json()
872 {
873 let mut map = std::collections::HashMap::new();
874 map.insert("dbx_table_policy".to_string(), json);
875 schema = schema.with_metadata(map);
876 }
877
878 let schema = Arc::new(schema);
879
880 self.table_schemas
882 .write()
883 .unwrap()
884 .insert(table.clone(), schema.clone());
885
886 self.wos_for_metadata()
888 .save_schema_metadata(table, &schema)?;
889 }
890
891 let schema = Arc::new(Schema::new(vec![Field::new(
893 "rows_affected",
894 DataType::Int64,
895 false,
896 )]));
897 let array = Int64Array::from(vec![1]);
898 let batch =
899 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
900
901 Ok(vec![batch])
902 }
903 PhysicalPlan::CreateIndex {
904 table,
905 index_name,
906 columns,
907 if_not_exists,
908 } => {
909 use arrow::array::{Int64Array, RecordBatch};
910 use arrow::datatypes::{DataType, Field, Schema};
911
912 {
914 let schemas = self.table_schemas.read().unwrap();
915 if !schemas.contains_key(table.as_str()) {
916 return Err(DbxError::Schema(format!(
917 "Table '{}' does not exist",
918 table
919 )));
920 }
921 }
922
923 let column = columns.first().ok_or_else(|| {
924 DbxError::Schema("CREATE INDEX requires at least one column".to_string())
925 })?;
926
927 let exists = self.index.has_index(table, column);
928
929 if exists && !if_not_exists {
930 return Err(DbxError::IndexAlreadyExists {
931 table: table.clone(),
932 column: column.clone(),
933 });
934 }
935
936 if !exists {
937 self.index.create_index(table, column)?;
938
939 self.index_registry
941 .write()
942 .unwrap()
943 .insert(index_name.clone(), (table.clone(), column.clone()));
944
945 self.wos_for_metadata()
947 .save_index_metadata(index_name, table, column)?;
948 }
949
950 let schema = Arc::new(Schema::new(vec![Field::new(
951 "rows_affected",
952 DataType::Int64,
953 false,
954 )]));
955 let array = Int64Array::from(vec![1]);
956 let batch =
957 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
958
959 Ok(vec![batch])
960 }
961 PhysicalPlan::DropIndex {
962 table,
963 index_name,
964 if_exists,
965 } => {
966 use arrow::array::{Int64Array, RecordBatch};
967 use arrow::datatypes::{DataType, Field, Schema};
968
969 let resolved_column = {
971 let registry = self.index_registry.read().unwrap();
972 registry
973 .get(index_name.as_str())
974 .map(|(_, col)| col.clone())
975 };
976
977 let column = resolved_column.as_deref().unwrap_or(index_name.as_str());
979
980 let exists = self.index.has_index(table, column);
981
982 if !exists && !if_exists {
983 return Err(DbxError::IndexNotFound {
984 table: table.clone(),
985 column: column.to_string(),
986 });
987 }
988
989 if exists {
990 self.index.drop_index(table, column)?;
991
992 self.index_registry
994 .write()
995 .unwrap()
996 .remove(index_name.as_str());
997
998 self.wos_for_metadata().delete_index_metadata(index_name)?;
1000 }
1001
1002 let schema = Arc::new(Schema::new(vec![Field::new(
1003 "rows_affected",
1004 DataType::Int64,
1005 false,
1006 )]));
1007 let array = Int64Array::from(vec![1]);
1008 let batch =
1009 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
1010
1011 Ok(vec![batch])
1012 }
1013 PhysicalPlan::AlterTable { table, operation } => {
1014 use crate::sql::planner::types::AlterTableOperation;
1016 use arrow::array::{Int64Array, RecordBatch};
1017 use arrow::datatypes::{DataType, Field, Schema};
1018
1019 match operation {
1020 AlterTableOperation::AddColumn {
1021 column_name,
1022 data_type,
1023 } => {
1024 let mut schemas = self.table_schemas.write().unwrap();
1026 let current_schema = schemas.get(table).ok_or_else(|| {
1027 DbxError::Schema(format!("Table '{}' not found", table))
1028 })?;
1029
1030 let arrow_type = match data_type.to_uppercase().as_str() {
1032 "INT" | "INTEGER" => DataType::Int64,
1033 "TEXT" | "VARCHAR" | "STRING" => DataType::Utf8,
1034 "FLOAT" | "DOUBLE" | "REAL" => DataType::Float64,
1035 "BOOL" | "BOOLEAN" => DataType::Boolean,
1036 _ => DataType::Utf8, };
1038
1039 let new_field = Field::new(column_name, arrow_type, true);
1041
1042 let mut fields: Vec<Field> = current_schema
1044 .fields()
1045 .iter()
1046 .map(|f| f.as_ref().clone())
1047 .collect();
1048 fields.push(new_field);
1049 let new_schema = Arc::new(Schema::new(fields));
1050
1051 schemas.insert(table.clone(), new_schema.clone());
1053
1054 drop(schemas); self.wos_for_metadata()
1057 .save_schema_metadata(table, &new_schema)?;
1058 }
1059 AlterTableOperation::DropColumn { column_name } => {
1060 let mut schemas = self.table_schemas.write().unwrap();
1062 let current_schema = schemas.get(table).ok_or_else(|| {
1063 DbxError::Schema(format!("Table '{}' not found", table))
1064 })?;
1065
1066 let fields: Vec<Field> = current_schema
1068 .fields()
1069 .iter()
1070 .filter(|f| f.name() != column_name)
1071 .map(|f| f.as_ref().clone())
1072 .collect();
1073
1074 if fields.len() == current_schema.fields().len() {
1076 return Err(DbxError::Schema(format!(
1077 "Column '{}' not found in table '{}'",
1078 column_name, table
1079 )));
1080 }
1081
1082 let new_schema = Arc::new(Schema::new(fields));
1084
1085 schemas.insert(table.clone(), new_schema.clone());
1087
1088 drop(schemas); self.wos_for_metadata()
1091 .save_schema_metadata(table, &new_schema)?;
1092 }
1093 AlterTableOperation::RenameColumn { old_name, new_name } => {
1094 let mut schemas = self.table_schemas.write().unwrap();
1096 let current_schema = schemas.get(table).ok_or_else(|| {
1097 DbxError::Schema(format!("Table '{}' not found", table))
1098 })?;
1099
1100 let mut found = false;
1102 let fields: Vec<Field> = current_schema
1103 .fields()
1104 .iter()
1105 .map(|f| {
1106 if f.name() == old_name {
1107 found = true;
1108 Field::new(new_name, f.data_type().clone(), f.is_nullable())
1109 } else {
1110 f.as_ref().clone()
1111 }
1112 })
1113 .collect();
1114
1115 if !found {
1117 return Err(DbxError::Schema(format!(
1118 "Column '{}' not found in table '{}'",
1119 old_name, table
1120 )));
1121 }
1122
1123 let new_schema = Arc::new(Schema::new(fields));
1125
1126 schemas.insert(table.clone(), new_schema.clone());
1128
1129 drop(schemas); self.wos_for_metadata()
1132 .save_schema_metadata(table, &new_schema)?;
1133 }
1134 }
1135
1136 let schema = Arc::new(Schema::new(vec![Field::new(
1138 "rows_affected",
1139 DataType::Int64,
1140 false,
1141 )]));
1142 let array = Int64Array::from(vec![1]);
1143 let batch =
1144 RecordBatch::try_new(schema, vec![Arc::new(array)]).map_err(DbxError::from)?;
1145
1146 Ok(vec![batch])
1147 }
1148 _ => {
1149 self.execute_select_plan(plan)
1151 }
1152 }
1153 }
1154
1155 fn execute_select_plan(&self, plan: &PhysicalPlan) -> DbxResult<Vec<RecordBatch>> {
1157 for table in plan.tables() {
1160 let target_tables = self.get_tables_to_scan(&table, None);
1161 for t in target_tables {
1162 if !self.columnar_cache.has_table(&t) {
1163 let _ = self.sync_columnar_cache(&t);
1164 }
1165 }
1166 }
1167
1168 let tables = self.tables.read().unwrap();
1169 let mut operator = self.build_operator(plan, &tables, &self.columnar_cache)?;
1170 Self::drain_operator(&mut *operator)
1171 }
1172
1173 fn build_operator(
1175 &self,
1176 plan: &PhysicalPlan,
1177 tables: &HashMap<String, Vec<RecordBatch>>,
1178 columnar_cache: &ColumnarCache,
1179 ) -> DbxResult<Box<dyn PhysicalOperator>> {
1180 match plan {
1181 PhysicalPlan::TableScan {
1182 table,
1183 projection,
1184 filter,
1185 ros_files,
1186 } => {
1187 self.workload_analyzer
1189 .write()
1190 .unwrap()
1191 .record(crate::engine::workload_analyzer::QueryPattern::RangeScan);
1192
1193 let mut filter_pushed_down = false;
1194 let target_tables = self.get_tables_to_scan(table, filter.as_ref());
1195
1196 let mut all_batches = Vec::new();
1197 let mut base_schema = None;
1198
1199 for t in target_tables {
1200 let cached_results = if let Some(filter_expr) = filter {
1202 let filter_expr_clone = filter_expr.clone();
1203 let result = columnar_cache.get_batches_with_filter(
1205 &t,
1206 if projection.is_empty() {
1207 None
1208 } else {
1209 Some(projection)
1210 },
1211 move |batch| {
1212 use crate::sql::executor::evaluate_expr;
1213 use arrow::array::BooleanArray;
1214
1215 let array = evaluate_expr(&filter_expr_clone, batch)?;
1216 let boolean_array = array
1217 .as_any()
1218 .downcast_ref::<BooleanArray>()
1219 .ok_or_else(|| DbxError::TypeMismatch {
1220 expected: "BooleanArray".to_string(),
1221 actual: format!("{:?}", array.data_type()),
1222 })?;
1223 Ok(boolean_array.clone())
1224 },
1225 )?;
1226 if result.is_some() {
1227 filter_pushed_down = true;
1228 }
1229 result
1230 } else {
1231 columnar_cache.get_batches(
1232 &t,
1233 if projection.is_empty() {
1234 None
1235 } else {
1236 Some(projection)
1237 },
1238 )?
1239 };
1240
1241 let (batches, schema, _) = if let Some(cached_batches) = cached_results {
1242 if cached_batches.is_empty() {
1243 let schema = {
1246 let schemas = self.table_schemas.read().unwrap();
1247 schemas
1248 .get(table)
1249 .or_else(|| {
1250 let table_lower = table.to_lowercase();
1251 schemas
1252 .iter()
1253 .find(|(k, _)| k.to_lowercase() == table_lower)
1254 .map(|(_, v)| v)
1255 })
1256 .cloned()
1257 }
1258 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1259 (vec![], schema, projection.clone())
1260 } else {
1261 let schema = cached_batches[0].schema();
1262 (cached_batches, schema, vec![])
1263 }
1264 } else {
1265 let cached_after_sync = columnar_cache.get_batches(
1267 &t,
1268 if projection.is_empty() {
1269 None
1270 } else {
1271 Some(projection)
1272 },
1273 )?;
1274
1275 if let Some(batches) = cached_after_sync {
1276 if !batches.is_empty() {
1277 let schema = batches[0].schema();
1278 (batches, schema, vec![])
1279 } else {
1280 let schema = {
1282 let schemas = self.table_schemas.read().unwrap();
1283 schemas
1284 .get(table)
1285 .or_else(|| {
1286 let table_lower = table.to_lowercase();
1287 schemas
1288 .iter()
1289 .find(|(k, _)| k.to_lowercase() == table_lower)
1290 .map(|(_, v)| v)
1291 })
1292 .cloned()
1293 }
1294 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1295 (vec![], schema, projection.clone())
1296 }
1297 } else {
1298 let batches_opt = tables.get(&t).or_else(|| {
1300 let table_lower = t.to_lowercase();
1301 tables
1302 .iter()
1303 .find(|(k, _)| k.to_lowercase() == table_lower)
1304 .map(|(_, v)| v)
1305 });
1306
1307 if let Some(batches) = batches_opt {
1308 if batches.is_empty() {
1309 let schema = {
1310 let schemas = self.table_schemas.read().unwrap();
1311 schemas
1312 .get(table)
1313 .or_else(|| {
1314 let table_lower = table.to_lowercase();
1315 schemas
1316 .iter()
1317 .find(|(k, _)| k.to_lowercase() == table_lower)
1318 .map(|(_, v)| v)
1319 })
1320 .cloned()
1321 }
1322 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1323 (vec![], schema, projection.clone())
1324 } else {
1325 let schema = batches[0].schema();
1326 (batches.clone(), schema, projection.clone())
1327 }
1328 } else {
1329 let schema = {
1331 let schemas = self.table_schemas.read().unwrap();
1332 schemas.get(table).cloned()
1333 }
1334 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
1335 (vec![], schema, projection.clone())
1336 }
1337 }
1338 };
1339
1340 if base_schema.is_none() {
1341 base_schema = Some(schema);
1342 }
1343 all_batches.extend(batches);
1344 }
1345
1346 let final_schema = base_schema.unwrap_or_else(|| {
1352 let schemas = self.table_schemas.read().unwrap();
1353 schemas
1354 .get(table)
1355 .cloned()
1356 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()))
1357 });
1358
1359 let projection_to_use = if projection.is_empty() {
1360 vec![]
1361 } else {
1362 projection.clone()
1363 };
1364 let mut scan =
1365 TableScanOperator::new(table.clone(), final_schema, projection_to_use);
1366
1367 if !ros_files.is_empty() {
1370 scan.start_tier_scan(all_batches, ros_files.clone());
1371 } else {
1372 scan.set_data(all_batches);
1373 }
1374
1375 if let Some(filter_expr) = filter {
1377 if !filter_pushed_down {
1378 Ok(Box::new(FilterOperator::new(
1379 Box::new(scan),
1380 filter_expr.clone(),
1381 )))
1382 } else {
1383 Ok(Box::new(scan))
1385 }
1386 } else {
1387 Ok(Box::new(scan))
1388 }
1389 }
1390
1391 PhysicalPlan::Projection {
1392 input,
1393 exprs,
1394 aliases,
1395 } => {
1396 let input_op = self.build_operator(input, tables, columnar_cache)?;
1397 use arrow::datatypes::Field;
1398
1399 let input_schema = input_op.schema();
1400 let fields: Vec<Field> = exprs
1401 .iter()
1402 .enumerate()
1403 .map(|(i, expr)| {
1404 let data_type = expr.get_type(input_schema);
1405 let field_name = if let Some(Some(alias)) = aliases.get(i) {
1406 alias.clone()
1407 } else {
1408 format!("col_{}", i)
1409 };
1410 Field::new(&field_name, data_type, true)
1411 })
1412 .collect();
1413
1414 let output_schema = Arc::new(Schema::new(fields));
1415 Ok(Box::new(ProjectionOperator::new(
1416 input_op,
1417 output_schema,
1418 exprs.clone(),
1419 )))
1420 }
1421
1422 PhysicalPlan::Limit {
1423 input,
1424 count,
1425 offset,
1426 } => {
1427 let input_op = self.build_operator(input, tables, columnar_cache)?;
1428 Ok(Box::new(LimitOperator::new(input_op, *count, *offset)))
1429 }
1430
1431 PhysicalPlan::SortMerge { input, order_by } => {
1432 let input_op = self.build_operator(input, tables, columnar_cache)?;
1433 Ok(Box::new(SortOperator::new(input_op, order_by.clone())))
1434 }
1435
1436 PhysicalPlan::HashAggregate {
1437 input,
1438 group_by,
1439 aggregates,
1440 mode,
1441 } => {
1442 self.workload_analyzer
1444 .write()
1445 .unwrap()
1446 .record(crate::engine::workload_analyzer::QueryPattern::Aggregation);
1447
1448 let input_op = self.build_operator(input, tables, columnar_cache)?;
1449 let input_schema = input_op.schema();
1450 let mut fields = Vec::new();
1451
1452 for &idx in group_by {
1453 fields.push(input_schema.field(idx).clone());
1454 }
1455
1456 for agg in aggregates {
1457 use arrow::datatypes::DataType;
1458 let data_type = match agg.function {
1459 crate::sql::planner::AggregateFunction::Count => DataType::Int64,
1460 crate::sql::planner::AggregateFunction::Sum
1461 | crate::sql::planner::AggregateFunction::Avg
1462 | crate::sql::planner::AggregateFunction::Min
1463 | crate::sql::planner::AggregateFunction::Max => DataType::Float64,
1464 };
1465 let name = agg
1466 .alias
1467 .clone()
1468 .unwrap_or_else(|| format!("agg_{:?}", agg.function));
1469 fields.push(arrow::datatypes::Field::new(&name, data_type, true));
1470 }
1471
1472 let agg_schema = Arc::new(arrow::datatypes::Schema::new(fields));
1473 Ok(Box::new(
1474 HashAggregateOperator::new(
1475 input_op,
1476 agg_schema,
1477 group_by.clone(),
1478 aggregates.clone(),
1479 *mode,
1480 )
1481 .with_gpu(self.gpu_manager.clone()),
1482 ))
1483 }
1484
1485 PhysicalPlan::HashJoin {
1486 left,
1487 right,
1488 on,
1489 join_type,
1490 } => {
1491 self.workload_analyzer
1493 .write()
1494 .unwrap()
1495 .record(crate::engine::workload_analyzer::QueryPattern::Join);
1496
1497 use arrow::datatypes::Field;
1498
1499 let left_op = self.build_operator(left, tables, columnar_cache)?;
1500 let right_op = self.build_operator(right, tables, columnar_cache)?;
1501
1502 let left_schema = left_op.schema();
1504 let right_schema = right_op.schema();
1505
1506 let mut joined_fields: Vec<Field> = Vec::new();
1507 for field in left_schema.fields().iter() {
1508 let mut f = field.as_ref().clone();
1509 if matches!(join_type, crate::sql::planner::JoinType::Right) {
1511 f = f.with_nullable(true);
1512 }
1513 joined_fields.push(f);
1514 }
1515 for field in right_schema.fields().iter() {
1516 let mut f = field.as_ref().clone();
1517 if matches!(join_type, crate::sql::planner::JoinType::Left) {
1519 f = f.with_nullable(true);
1520 }
1521 joined_fields.push(f);
1522 }
1523
1524 let joined_schema = Arc::new(Schema::new(joined_fields));
1525
1526 Ok(Box::new(HashJoinOperator::new(
1527 left_op,
1528 right_op,
1529 joined_schema,
1530 on.clone(),
1531 *join_type,
1532 )))
1533 }
1534
1535 PhysicalPlan::Insert { .. } => {
1536 unreachable!("INSERT should not reach build_operator")
1538 }
1539 PhysicalPlan::Update { .. } => {
1540 unreachable!("UPDATE should not reach build_operator")
1542 }
1543 PhysicalPlan::Delete { .. } => {
1544 unreachable!("DELETE should not reach build_operator")
1546 }
1547 PhysicalPlan::DropTable { .. } => {
1548 unreachable!("DROP TABLE should not reach build_operator")
1550 }
1551 PhysicalPlan::CreateTable { .. } => {
1552 unreachable!("CREATE TABLE should not reach build_operator")
1554 }
1555 PhysicalPlan::CreateIndex { .. } => {
1556 unreachable!("CREATE INDEX should not reach build_operator")
1558 }
1559 PhysicalPlan::DropIndex { .. } => {
1560 unreachable!("DROP INDEX should not reach build_operator")
1562 }
1563 PhysicalPlan::AlterTable { .. } => {
1564 unreachable!("ALTER TABLE should not reach build_operator")
1566 }
1567 PhysicalPlan::CreateFunction { .. } => {
1568 unreachable!("CREATE FUNCTION should not reach build_operator")
1569 }
1570 PhysicalPlan::CreateTrigger { .. } => {
1571 unreachable!("CREATE TRIGGER should not reach build_operator")
1572 }
1573 PhysicalPlan::CreateJob { .. } => {
1574 unreachable!("CREATE JOB should not reach build_operator")
1575 }
1576 PhysicalPlan::DropFunction { .. } => {
1577 unreachable!("DROP FUNCTION should not reach build_operator")
1578 }
1579 PhysicalPlan::DropTrigger { .. } => {
1580 unreachable!("DROP TRIGGER should not reach build_operator")
1581 }
1582 PhysicalPlan::DropJob { .. } => {
1583 unreachable!("DROP JOB should not reach build_operator")
1584 }
1585 PhysicalPlan::GridExchange { .. } => {
1586 unreachable!(
1589 "GridExchange placeholder must be replaced by DistributedExecutor before build_operator is called"
1590 )
1591 }
1592 PhysicalPlan::ShuffleWriter { .. } => {
1593 unreachable!(
1594 "ShuffleWriter is a distributed operator and not supported in singleton Database"
1595 )
1596 }
1597 }
1598 }
1599
1600 fn drain_operator(op: &mut dyn PhysicalOperator) -> DbxResult<Vec<RecordBatch>> {
1602 let mut results = Vec::new();
1603 while let Some(batch) = op.next()? {
1604 if batch.num_rows() > 0 {
1605 results.push(batch);
1606 }
1607 }
1608 Ok(results)
1609 }
1610}
1611
1612impl crate::traits::DatabaseSql for Database {
1617 fn execute_sql(&self, sql: &str) -> DbxResult<Vec<arrow::record_batch::RecordBatch>> {
1618 Database::execute_sql(self, sql)
1620 }
1621
1622 fn register_table(&self, name: &str, batches: Vec<arrow::record_batch::RecordBatch>) {
1623 Database::register_table(self, name, batches)
1625 }
1626
1627 fn append_batch(&self, table: &str, batch: arrow::record_batch::RecordBatch) -> DbxResult<()> {
1628 Database::append_batch(self, table, batch);
1630 Ok(())
1631 }
1632}