1use std::collections::HashMap;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22
23use arrow::array::{Array, AsArray, BooleanBuilder, RecordBatch};
24use arrow::datatypes::{
25 DataType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
26 UInt16Type, UInt32Type, UInt64Type, UInt8Type,
27};
28use datafusion::common::GetExt;
29use datafusion::datasource::listing::{
30 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
31};
32use datafusion::datasource::provider::DefaultTableFactory;
33use datafusion::datasource::MemTable;
34use datafusion::execution::SessionStateBuilder;
35use datafusion::prelude::*;
36use sqlparser::ast::{
37 AssignmentTarget, BinaryOperator, Expr, FromTable, SetExpr, Statement, TableFactor,
38 TableObject, UnaryOperator, Value,
39};
40use sqlparser::dialect::SQLiteDialect;
41use sqlparser::parser::Parser;
42use tokio::sync::RwLock;
43use tracing::debug;
44use vortex::session::VortexSession;
45use vortex::VortexSessionDefault;
46use vortex_datafusion::VortexFormat;
47use vortex_datafusion::VortexFormatFactory;
48
49use crate::error::DfOlapError;
50use crate::storage::{StorageMode, VortexLocation};
51
52#[cfg(feature = "cloud-storage")]
54use url::Url;
55
56struct TableData {
58 schema: SchemaRef,
59 batches: Vec<RecordBatch>,
61}
62
63struct VortexTableMeta {
68 schema: SchemaRef,
69 table_url: String,
71}
72
73fn build_vortex_session_context(location: &VortexLocation) -> Result<SessionContext, DfOlapError> {
78 let factory = Arc::new(VortexFormatFactory::new());
79
80 let mut state_builder = SessionStateBuilder::new()
81 .with_default_features()
82 .with_table_factory(
83 factory.get_ext().to_uppercase(),
84 Arc::new(DefaultTableFactory::new()),
85 );
86
87 if let Some(file_formats) = state_builder.file_formats() {
88 file_formats.push(factory.clone() as _);
89 }
90
91 let ctx = SessionContext::new_with_state(state_builder.build()).enable_url_table();
92
93 #[cfg(feature = "cloud-storage")]
95 if let VortexLocation::S3 { url } = location {
96 let bucket = parse_s3_bucket(url)?;
97 let store: Arc<dyn object_store::ObjectStore> = Arc::new(
98 object_store::aws::AmazonS3Builder::from_env()
99 .with_bucket_name(&bucket)
100 .build()
101 .map_err(DfOlapError::ObjectStore)?,
102 );
103 let base_url = Url::parse(&format!("s3://{bucket}")).map_err(DfOlapError::UrlParse)?;
104 ctx.runtime_env().register_object_store(&base_url, store);
105 tracing::info!(bucket, "registered S3 object store for Vortex");
106 }
107
108 let _ = location; Ok(ctx)
110}
111
112#[cfg(feature = "cloud-storage")]
114fn parse_s3_bucket(url: &str) -> Result<String, DfOlapError> {
115 let parsed = Url::parse(url).map_err(DfOlapError::UrlParse)?;
116 if parsed.scheme() != "s3" {
117 return Err(DfOlapError::StorageConfig(format!(
118 "expected s3:// URL, got '{url}'"
119 )));
120 }
121 parsed
122 .host_str()
123 .map(|h| h.to_string())
124 .ok_or_else(|| DfOlapError::StorageConfig(format!("missing bucket name in URL '{url}'")))
125}
126
127fn table_listing_url(location: &VortexLocation, table_name: &str) -> String {
132 match location {
133 VortexLocation::Local { base_path } => {
134 let dir = base_path.join(table_name);
135 format!("file://{}/", dir.to_string_lossy())
137 }
138 #[cfg(feature = "cloud-storage")]
139 VortexLocation::S3 { url } => {
140 let base = url.trim_end_matches('/');
141 format!("{base}/{table_name}/")
142 }
143 }
144}
145
146async fn register_vortex_listing_table(
148 ctx: &SessionContext,
149 table_name: &str,
150 schema: &SchemaRef,
151 listing_url: &str,
152) -> Result<(), DfOlapError> {
153 let vortex_format = Arc::new(VortexFormat::new(
154 <VortexSession as VortexSessionDefault>::default(),
155 ));
156 let listing_options = ListingOptions::new(vortex_format as _)
157 .with_file_extension("vortex")
158 .with_session_config_options(ctx.state().config());
159
160 let table_url = ListingTableUrl::parse(listing_url)?;
161
162 let config = ListingTableConfig::new(table_url)
163 .with_listing_options(listing_options)
164 .with_schema(schema.clone());
165
166 let listing_table = ListingTable::try_new(config)?;
167
168 let _ = ctx.deregister_table(table_name);
169 ctx.register_table(table_name, Arc::new(listing_table))?;
170 Ok(())
171}
172
173pub struct DataFusionEngine {
180 ctx: RwLock<SessionContext>,
181 tables: RwLock<HashMap<String, TableData>>,
183 vortex_tables: RwLock<HashMap<String, VortexTableMeta>>,
185 vortex_location: Option<VortexLocation>,
187 storage_mode: StorageMode,
189 tmp_counter: AtomicU64,
191}
192
193impl DataFusionEngine {
194 pub fn with_storage(mode: StorageMode) -> Result<Self, DfOlapError> {
199 let vortex_location = match mode.classify() {
200 Ok(Some(loc)) => {
201 #[cfg(not(feature = "cloud-storage"))]
203 {
204 let VortexLocation::Local { ref base_path } = loc;
205 std::fs::create_dir_all(base_path)?;
206 }
207 #[cfg(feature = "cloud-storage")]
208 if let VortexLocation::Local { ref base_path } = loc {
209 std::fs::create_dir_all(base_path)?;
210 }
211 Some(loc)
212 }
213 Ok(None) => None,
214 Err(e) => return Err(DfOlapError::StorageConfig(e)),
215 };
216
217 let ctx = if let Some(ref loc) = vortex_location {
218 build_vortex_session_context(loc)?
219 } else {
220 SessionContext::new()
222 };
223
224 Ok(Self {
225 ctx: RwLock::new(ctx),
226 tables: RwLock::new(HashMap::new()),
227 vortex_tables: RwLock::new(HashMap::new()),
228 vortex_location,
229 storage_mode: mode,
230 tmp_counter: AtomicU64::new(0),
231 })
232 }
233
234 pub fn new() -> Self {
236 Self::with_storage(StorageMode::InMemory).expect("in-memory mode cannot fail")
237 }
238
239 pub fn storage_mode(&self) -> &StorageMode {
241 &self.storage_mode
242 }
243
244 fn location(&self) -> Option<&VortexLocation> {
246 self.vortex_location.as_ref()
247 }
248
249 async fn refresh_table_mem(&self, name: &str) -> Result<(), DfOlapError> {
255 let tables = self.tables.read().await;
256 let table_data = tables
257 .get(name)
258 .ok_or_else(|| DfOlapError::TableNotFound(name.to_string()))?;
259
260 let partitions = if table_data.batches.is_empty() {
261 vec![vec![]]
262 } else {
263 vec![table_data.batches.clone()]
264 };
265 let mem_table = MemTable::try_new(table_data.schema.clone(), partitions)?;
266
267 let ctx = self.ctx.write().await;
268 let _ = ctx.deregister_table(name);
269 ctx.register_table(name, Arc::new(mem_table))?;
270 Ok(())
271 }
272
273 async fn refresh_table_vortex(&self, name: &str) -> Result<(), DfOlapError> {
279 let vortex_tables = self.vortex_tables.read().await;
280 let meta = vortex_tables
281 .get(name)
282 .ok_or_else(|| DfOlapError::TableNotFound(name.to_string()))?;
283 let schema = meta.schema.clone();
284 let listing_url = meta.table_url.clone();
285 drop(vortex_tables);
286
287 let ctx = self.ctx.read().await;
288 register_vortex_listing_table(&ctx, name, &schema, &listing_url).await
289 }
290
291 async fn vortex_table_schema(&self, table_name: &str) -> Result<SchemaRef, DfOlapError> {
293 let vortex_tables = self.vortex_tables.read().await;
294 vortex_tables
295 .get(table_name)
296 .map(|m| m.schema.clone())
297 .ok_or_else(|| DfOlapError::TableNotFound(table_name.to_string()))
298 }
299
300 async fn read_all_batches_vortex(
302 &self,
303 table_name: &str,
304 ) -> Result<(SchemaRef, Vec<RecordBatch>), DfOlapError> {
305 let schema = self.vortex_table_schema(table_name).await?;
306 let ctx = self.ctx.read().await;
307 let df = ctx.sql(&format!("SELECT * FROM \"{table_name}\"")).await?;
308 let batches = df.collect().await?;
309 Ok((schema, batches))
310 }
311
312 async fn clear_table_storage(&self, table_name: &str) -> Result<(), DfOlapError> {
320 let loc = self
321 .location()
322 .ok_or_else(|| DfOlapError::Other("expected Vortex location".into()))?;
323
324 match loc {
325 VortexLocation::Local { base_path } => {
326 let dir = base_path.join(table_name);
327 if !dir.exists() {
328 return Ok(());
329 }
330 tokio::task::spawn_blocking(move || {
331 let entries: Vec<_> = std::fs::read_dir(&dir)?
332 .filter_map(|e| e.ok())
333 .map(|e| e.path())
334 .filter(|p| p.extension().is_some_and(|x| x == "vortex"))
335 .collect();
336 for path in entries {
337 std::fs::remove_file(path)?;
338 }
339 Ok::<_, DfOlapError>(())
340 })
341 .await
342 .map_err(DfOlapError::from_join)?
343 }
344 #[cfg(feature = "cloud-storage")]
345 VortexLocation::S3 { url } => self.clear_s3_table_prefix(table_name, url).await,
346 }
347 }
348
349 #[cfg(feature = "cloud-storage")]
352 async fn clear_s3_table_prefix(&self, table_name: &str, url: &str) -> Result<(), DfOlapError> {
353 use futures::StreamExt;
354 #[allow(unused_imports)]
358 use object_store::{ObjectStore, ObjectStoreExt};
359
360 let bucket = parse_s3_bucket(url)?;
361 let table_prefix = {
362 let parsed = Url::parse(url).map_err(DfOlapError::UrlParse)?;
366 let trimmed = parsed.path().trim_start_matches('/').trim_end_matches('/');
367 if trimmed.is_empty() {
368 format!("{table_name}/")
369 } else {
370 format!("{trimmed}/{table_name}/")
371 }
372 };
373
374 let osu_str = format!("s3://{bucket}/");
378 let osu =
379 datafusion::execution::object_store::ObjectStoreUrl::parse(&osu_str).map_err(|e| {
380 DfOlapError::Other(format!("invalid object-store URL '{osu_str}': {e}"))
381 })?;
382 let store = self
383 .ctx
384 .read()
385 .await
386 .runtime_env()
387 .object_store(osu)
388 .map_err(|e| {
389 DfOlapError::Other(format!(
390 "object store for s3://{bucket} not registered: {e}"
391 ))
392 })?;
393
394 let prefix = object_store::path::Path::from(table_prefix.as_str());
395 let mut list = store.list(Some(&prefix));
396
397 let mut to_delete: Vec<object_store::path::Path> = Vec::new();
400 while let Some(meta) = list.next().await {
401 let meta = meta.map_err(DfOlapError::ObjectStore)?;
402 if meta
403 .location
404 .extension()
405 .is_some_and(|ext| ext.eq_ignore_ascii_case("vortex"))
406 {
407 to_delete.push(meta.location);
408 }
409 }
410 for path in to_delete {
411 store
412 .delete(&path)
413 .await
414 .map_err(DfOlapError::ObjectStore)?;
415 }
416 Ok(())
417 }
418
419 async fn insert_arrow_into_vortex(
428 &self,
429 table_name: &str,
430 schema: &SchemaRef,
431 batches: &[RecordBatch],
432 ) -> Result<u64, DfOlapError> {
433 if batches.is_empty() {
434 return Ok(0);
435 }
436 let total_rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
437
438 let tmp_name = format!(
439 "__tmp_load_{}",
440 self.tmp_counter.fetch_add(1, Ordering::Relaxed)
441 );
442
443 let mem_table = MemTable::try_new(schema.clone(), vec![batches.to_vec()])?;
444
445 {
446 let ctx = self.ctx.read().await;
447 let _ = ctx.deregister_table(&tmp_name);
448 ctx.register_table(&tmp_name, Arc::new(mem_table))?;
449
450 ctx.sql(&format!(
452 "INSERT INTO \"{table_name}\" SELECT * FROM \"{tmp_name}\""
453 ))
454 .await?
455 .collect()
456 .await?;
457
458 let _ = ctx.deregister_table(&tmp_name);
459 }
460
461 Ok(total_rows)
462 }
463
464 async fn rewrite_vortex_table(
468 &self,
469 table_name: &str,
470 schema: &SchemaRef,
471 batches: &[RecordBatch],
472 ) -> Result<(), DfOlapError> {
473 self.clear_table_storage(table_name).await?;
475
476 self.refresh_table_vortex(table_name).await?;
478
479 if !batches.is_empty() {
480 self.insert_arrow_into_vortex(table_name, schema, batches)
481 .await?;
482 self.refresh_table_vortex(table_name).await?;
484 }
485
486 Ok(())
487 }
488
489 async fn execute_sql(&self, sql: &str) -> Result<Vec<RecordBatch>, DfOlapError> {
494 let ctx = self.ctx.read().await;
495 let df = ctx.sql(sql).await?;
496 let batches = df.collect().await?;
497 Ok(batches)
498 }
499
500 async fn execute_insert_mem(&self, sql: &str) -> Result<u64, DfOlapError> {
505 let (table_name, col_names, batches) = parse_insert_values(sql)?;
506
507 let mut tables = self.tables.write().await;
508 let table_data = tables
509 .get_mut(&table_name)
510 .ok_or_else(|| DfOlapError::TableNotFound(table_name.clone()))?;
511
512 let table_schema = table_data.schema.clone();
513 let (aligned_batches, total_rows) =
514 align_batches_to_schema(&table_schema, &col_names, &batches)?;
515 table_data.batches.extend(aligned_batches);
516 drop(tables);
517
518 self.refresh_table_mem(&table_name).await?;
519 Ok(total_rows)
520 }
521
522 async fn execute_update_mem(&self, sql: &str) -> Result<u64, DfOlapError> {
523 let (table_name, assignments, where_clause) = parse_update(sql)?;
524
525 let mut tables = self.tables.write().await;
526 let table_data = tables
527 .get_mut(&table_name)
528 .ok_or_else(|| DfOlapError::TableNotFound(table_name.clone()))?;
529
530 let schema = table_data.schema.clone();
531 let mut updated_count = 0u64;
532
533 let all_rows = flatten_batches(&table_data.batches, &schema)?;
534 if let Some(all_rows) = all_rows {
535 let (updated_batch, count) =
536 apply_update(&all_rows, &schema, &assignments, &where_clause)?;
537 updated_count = count;
538 table_data.batches = vec![updated_batch];
539 }
540
541 drop(tables);
542 self.refresh_table_mem(&table_name).await?;
543 Ok(updated_count)
544 }
545
546 async fn execute_delete_mem(&self, sql: &str) -> Result<u64, DfOlapError> {
547 let (table_name, where_clause) = parse_delete(sql)?;
548
549 let mut tables = self.tables.write().await;
550 let table_data = tables
551 .get_mut(&table_name)
552 .ok_or_else(|| DfOlapError::TableNotFound(table_name.clone()))?;
553
554 let schema = table_data.schema.clone();
555 let all_rows = flatten_batches(&table_data.batches, &schema)?;
556
557 if let Some(all_rows) = all_rows {
558 let (filtered_batch, deleted_count) = apply_delete(&all_rows, &schema, &where_clause)?;
559 table_data.batches = if filtered_batch.num_rows() > 0 {
560 vec![filtered_batch]
561 } else {
562 vec![]
563 };
564 drop(tables);
565 self.refresh_table_mem(&table_name).await?;
566 Ok(deleted_count)
567 } else {
568 Ok(0)
569 }
570 }
571
572 async fn execute_insert_vortex(&self, sql: &str) -> Result<u64, DfOlapError> {
577 let (table_name, col_names, batches) = parse_insert_values(sql)?;
578
579 let schema = self.vortex_table_schema(&table_name).await?;
580 let (aligned_batches, total_rows) = align_batches_to_schema(&schema, &col_names, &batches)?;
581
582 self.insert_arrow_into_vortex(&table_name, &schema, &aligned_batches)
583 .await?;
584 self.refresh_table_vortex(&table_name).await?;
586
587 Ok(total_rows)
588 }
589
590 async fn execute_update_vortex(&self, sql: &str) -> Result<u64, DfOlapError> {
591 let (table_name, assignments, where_clause) = parse_update(sql)?;
592
593 let (schema, existing_batches) = self.read_all_batches_vortex(&table_name).await?;
594 let all_rows = flatten_batches(&existing_batches, &schema)?;
595
596 if let Some(all_rows) = all_rows {
597 let (updated_batch, count) =
598 apply_update(&all_rows, &schema, &assignments, &where_clause)?;
599 let new_batches = if updated_batch.num_rows() > 0 {
600 vec![updated_batch]
601 } else {
602 vec![]
603 };
604 self.rewrite_vortex_table(&table_name, &schema, &new_batches)
605 .await?;
606 Ok(count)
607 } else {
608 Ok(0)
609 }
610 }
611
612 async fn execute_delete_vortex(&self, sql: &str) -> Result<u64, DfOlapError> {
613 let (table_name, where_clause) = parse_delete(sql)?;
614
615 let (schema, existing_batches) = self.read_all_batches_vortex(&table_name).await?;
616 let all_rows = flatten_batches(&existing_batches, &schema)?;
617
618 if let Some(all_rows) = all_rows {
619 let (filtered_batch, deleted_count) = apply_delete(&all_rows, &schema, &where_clause)?;
620 let new_batches = if filtered_batch.num_rows() > 0 {
621 vec![filtered_batch]
622 } else {
623 vec![]
624 };
625 self.rewrite_vortex_table(&table_name, &schema, &new_batches)
626 .await?;
627 Ok(deleted_count)
628 } else {
629 Ok(0)
630 }
631 }
632
633 async fn execute_insert(&self, sql: &str) -> Result<u64, DfOlapError> {
638 match &self.storage_mode {
639 StorageMode::InMemory => self.execute_insert_mem(sql).await,
640 StorageMode::Vortex { .. } => self.execute_insert_vortex(sql).await,
641 }
642 }
643
644 async fn execute_update(&self, sql: &str) -> Result<u64, DfOlapError> {
645 match &self.storage_mode {
646 StorageMode::InMemory => self.execute_update_mem(sql).await,
647 StorageMode::Vortex { .. } => self.execute_update_vortex(sql).await,
648 }
649 }
650
651 async fn execute_delete(&self, sql: &str) -> Result<u64, DfOlapError> {
652 match &self.storage_mode {
653 StorageMode::InMemory => self.execute_delete_mem(sql).await,
654 StorageMode::Vortex { .. } => self.execute_delete_vortex(sql).await,
655 }
656 }
657}
658
659impl Default for DataFusionEngine {
660 fn default() -> Self {
661 Self::new()
662 }
663}
664
665impl rhei_core::OlapEngine for DataFusionEngine {
666 type Error = DfOlapError;
667
668 async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
669 debug!(sql, "DataFusion query");
670 self.execute_sql(sql).await
671 }
672
673 async fn query_stream(
674 &self,
675 sql: &str,
676 ) -> Result<rhei_core::RecordBatchBoxStream, Self::Error> {
677 debug!(sql, "DataFusion query_stream");
678 let ctx = self.ctx.read().await;
679 let df = ctx.sql(sql).await?;
680 let stream = df.execute_stream().await?;
681 let mapped = Box::pin(StreamAdapter(stream));
682 Ok(mapped)
683 }
684
685 async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
686 debug!(sql, "DataFusion execute");
687 let trimmed = sql.trim();
688 let upper = trimmed.to_ascii_uppercase();
689
690 if upper.starts_with("INSERT") {
691 self.execute_insert(trimmed).await
692 } else if upper.starts_with("UPDATE") {
693 self.execute_update(trimmed).await
694 } else if upper.starts_with("DELETE") {
695 self.execute_delete(trimmed).await
696 } else if upper.starts_with("BEGIN")
697 || upper.starts_with("COMMIT")
698 || upper.starts_with("ROLLBACK")
699 {
700 Ok(0)
702 } else {
703 let ctx = self.ctx.read().await;
705 let df = ctx.sql(trimmed).await?;
706 let _ = df.collect().await?;
707 Ok(0)
708 }
709 }
710
711 async fn load_arrow(&self, table: &str, batches: &[RecordBatch]) -> Result<u64, Self::Error> {
712 if batches.is_empty() {
713 return Ok(0);
714 }
715
716 debug!(table, batch_count = batches.len(), "DataFusion load_arrow");
717 rhei_core::validate_identifier(table).map_err(|e| DfOlapError::Other(e.to_string()))?;
718
719 let total_rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
720
721 match &self.storage_mode {
722 StorageMode::InMemory => {
723 let mut tables = self.tables.write().await;
724 let table_data = tables
725 .get_mut(table)
726 .ok_or_else(|| DfOlapError::TableNotFound(table.to_string()))?;
727
728 for batch in batches {
729 table_data.batches.push(batch.clone());
730 }
731 drop(tables);
732 self.refresh_table_mem(table).await?;
733 }
734 StorageMode::Vortex { .. } => {
735 let schema = self.vortex_table_schema(table).await?;
736 self.insert_arrow_into_vortex(table, &schema, batches)
737 .await?;
738 self.refresh_table_vortex(table).await?;
739 }
740 }
741
742 Ok(total_rows)
743 }
744
745 async fn create_table(
746 &self,
747 table_name: &str,
748 schema: &SchemaRef,
749 _primary_key: &[String],
750 ) -> Result<(), Self::Error> {
751 rhei_core::validate_identifier(table_name)
752 .map_err(|e| DfOlapError::Other(e.to_string()))?;
753 for field in schema.fields() {
754 rhei_core::validate_identifier(field.name())
755 .map_err(|e| DfOlapError::Other(e.to_string()))?;
756 }
757
758 debug!(
759 table = table_name,
760 storage = ?self.storage_mode,
761 "DataFusion create_table"
762 );
763
764 match &self.storage_mode {
765 StorageMode::InMemory => {
766 let mut tables = self.tables.write().await;
767 if tables.contains_key(table_name) {
768 return Ok(());
769 }
770 tables.insert(
771 table_name.to_string(),
772 TableData {
773 schema: schema.clone(),
774 batches: vec![],
775 },
776 );
777 drop(tables);
778 self.refresh_table_mem(table_name).await?;
779 }
780 StorageMode::Vortex { .. } => {
781 let loc = self
782 .location()
783 .expect("Vortex mode must have a resolved location");
784
785 {
787 let vortex_tables = self.vortex_tables.read().await;
788 if vortex_tables.contains_key(table_name) {
789 return Ok(());
790 }
791 }
792
793 let listing_url = table_listing_url(loc, table_name);
794
795 #[cfg(not(feature = "cloud-storage"))]
797 {
798 let VortexLocation::Local { ref base_path } = *loc;
799 let dir = base_path.join(table_name);
800 tokio::fs::create_dir_all(&dir).await?;
801 }
802 #[cfg(feature = "cloud-storage")]
803 if let VortexLocation::Local { ref base_path } = *loc {
804 let dir = base_path.join(table_name);
805 tokio::fs::create_dir_all(&dir).await?;
806 }
807
808 let mut vortex_tables = self.vortex_tables.write().await;
809 vortex_tables.insert(
810 table_name.to_string(),
811 VortexTableMeta {
812 schema: schema.clone(),
813 table_url: listing_url.clone(),
814 },
815 );
816 drop(vortex_tables);
817
818 let ctx = self.ctx.read().await;
820 register_vortex_listing_table(&ctx, table_name, schema, &listing_url).await?;
821 }
822 }
823
824 Ok(())
825 }
826
827 async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
828 match &self.storage_mode {
829 StorageMode::InMemory => {
830 let tables = self.tables.read().await;
831 Ok(tables.contains_key(table_name))
832 }
833 StorageMode::Vortex { .. } => {
834 let vortex_tables = self.vortex_tables.read().await;
835 Ok(vortex_tables.contains_key(table_name))
836 }
837 }
838 }
839
840 async fn add_column(
841 &self,
842 table_name: &str,
843 column_name: &str,
844 data_type: &DataType,
845 ) -> Result<(), Self::Error> {
846 rhei_core::validate_identifier(table_name)
847 .map_err(|e| DfOlapError::Other(e.to_string()))?;
848 rhei_core::validate_identifier(column_name)
849 .map_err(|e| DfOlapError::Other(e.to_string()))?;
850
851 debug!(
852 table = table_name,
853 column = column_name,
854 "DataFusion add_column"
855 );
856
857 match &self.storage_mode {
858 StorageMode::InMemory => {
859 let mut tables = self.tables.write().await;
860 let table_data = tables
861 .get_mut(table_name)
862 .ok_or_else(|| DfOlapError::TableNotFound(table_name.to_string()))?;
863
864 let new_schema = append_field(&table_data.schema, column_name, data_type);
865 let new_batches =
866 extend_batches_with_null_column(&table_data.batches, &new_schema, data_type)?;
867 table_data.schema = new_schema;
868 table_data.batches = new_batches;
869 drop(tables);
870 self.refresh_table_mem(table_name).await?;
871 }
872 StorageMode::Vortex { .. } => {
873 let (old_schema, existing_batches) =
874 self.read_all_batches_vortex(table_name).await?;
875 let new_schema = append_field(&old_schema, column_name, data_type);
876 let new_batches =
877 extend_batches_with_null_column(&existing_batches, &new_schema, data_type)?;
878
879 {
881 let mut vortex_tables = self.vortex_tables.write().await;
882 if let Some(meta) = vortex_tables.get_mut(table_name) {
883 meta.schema = new_schema.clone();
884 }
885 }
886
887 self.clear_table_storage(table_name).await?;
888 self.refresh_table_vortex(table_name).await?;
890
891 if !new_batches.is_empty() {
892 self.insert_arrow_into_vortex(table_name, &new_schema, &new_batches)
893 .await?;
894 self.refresh_table_vortex(table_name).await?;
895 }
896 }
897 }
898
899 Ok(())
900 }
901
902 async fn drop_column(&self, table_name: &str, column_name: &str) -> Result<(), Self::Error> {
903 rhei_core::validate_identifier(table_name)
904 .map_err(|e| DfOlapError::Other(e.to_string()))?;
905 rhei_core::validate_identifier(column_name)
906 .map_err(|e| DfOlapError::Other(e.to_string()))?;
907
908 debug!(
909 table = table_name,
910 column = column_name,
911 "DataFusion drop_column"
912 );
913
914 match &self.storage_mode {
915 StorageMode::InMemory => {
916 let mut tables = self.tables.write().await;
917 let table_data = tables
918 .get_mut(table_name)
919 .ok_or_else(|| DfOlapError::TableNotFound(table_name.to_string()))?;
920
921 let col_idx = find_column_index(&table_data.schema, column_name, table_name)?;
922 let new_schema = remove_field(&table_data.schema, col_idx);
923 let new_batches =
924 remove_column_from_batches(&table_data.batches, &new_schema, col_idx)?;
925 table_data.schema = new_schema;
926 table_data.batches = new_batches;
927 drop(tables);
928 self.refresh_table_mem(table_name).await?;
929 }
930 StorageMode::Vortex { .. } => {
931 let (old_schema, existing_batches) =
932 self.read_all_batches_vortex(table_name).await?;
933 let col_idx = find_column_index(&old_schema, column_name, table_name)?;
934 let new_schema = remove_field(&old_schema, col_idx);
935 let new_batches =
936 remove_column_from_batches(&existing_batches, &new_schema, col_idx)?;
937
938 {
940 let mut vortex_tables = self.vortex_tables.write().await;
941 if let Some(meta) = vortex_tables.get_mut(table_name) {
942 meta.schema = new_schema.clone();
943 }
944 }
945
946 self.clear_table_storage(table_name).await?;
947 self.refresh_table_vortex(table_name).await?;
948
949 if !new_batches.is_empty() {
950 self.insert_arrow_into_vortex(table_name, &new_schema, &new_batches)
951 .await?;
952 self.refresh_table_vortex(table_name).await?;
953 }
954 }
955 }
956
957 Ok(())
958 }
959}
960
961fn append_field(schema: &SchemaRef, column_name: &str, data_type: &DataType) -> SchemaRef {
966 let mut fields: Vec<arrow::datatypes::Field> =
967 schema.fields().iter().map(|f| f.as_ref().clone()).collect();
968 fields.push(arrow::datatypes::Field::new(
969 column_name,
970 data_type.clone(),
971 true,
972 ));
973 Arc::new(arrow::datatypes::Schema::new(fields))
974}
975
976fn remove_field(schema: &SchemaRef, col_idx: usize) -> SchemaRef {
977 let fields: Vec<arrow::datatypes::Field> = schema
978 .fields()
979 .iter()
980 .enumerate()
981 .filter(|(i, _)| *i != col_idx)
982 .map(|(_, f)| f.as_ref().clone())
983 .collect();
984 Arc::new(arrow::datatypes::Schema::new(fields))
985}
986
987fn find_column_index(
988 schema: &SchemaRef,
989 column_name: &str,
990 table_name: &str,
991) -> Result<usize, DfOlapError> {
992 schema
993 .fields()
994 .iter()
995 .position(|f| f.name() == column_name)
996 .ok_or_else(|| {
997 DfOlapError::Other(format!(
998 "column '{}' not found in table '{}'",
999 column_name, table_name
1000 ))
1001 })
1002}
1003
1004fn extend_batches_with_null_column(
1005 batches: &[RecordBatch],
1006 new_schema: &SchemaRef,
1007 data_type: &DataType,
1008) -> Result<Vec<RecordBatch>, DfOlapError> {
1009 let mut new_batches = Vec::with_capacity(batches.len());
1010 for batch in batches {
1011 let null_array = arrow::array::new_null_array(data_type, batch.num_rows());
1012 let mut columns: Vec<Arc<dyn Array>> = (0..batch.num_columns())
1013 .map(|i| batch.column(i).clone())
1014 .collect();
1015 columns.push(null_array);
1016 new_batches.push(RecordBatch::try_new(new_schema.clone(), columns)?);
1017 }
1018 Ok(new_batches)
1019}
1020
1021fn remove_column_from_batches(
1022 batches: &[RecordBatch],
1023 new_schema: &SchemaRef,
1024 col_idx: usize,
1025) -> Result<Vec<RecordBatch>, DfOlapError> {
1026 let mut new_batches = Vec::with_capacity(batches.len());
1027 for batch in batches {
1028 let columns: Vec<Arc<dyn Array>> = (0..batch.num_columns())
1029 .filter(|i| *i != col_idx)
1030 .map(|i| batch.column(i).clone())
1031 .collect();
1032 new_batches.push(RecordBatch::try_new(new_schema.clone(), columns)?);
1033 }
1034 Ok(new_batches)
1035}
1036
1037fn align_batches_to_schema(
1039 table_schema: &SchemaRef,
1040 col_names: &[String],
1041 batches: &[RecordBatch],
1042) -> Result<(Vec<RecordBatch>, u64), DfOlapError> {
1043 let mut aligned_batches = Vec::with_capacity(batches.len());
1044 let mut total_rows = 0u64;
1045 for batch in batches {
1046 let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(table_schema.fields().len());
1047 for field in table_schema.fields() {
1048 let idx = col_names
1049 .iter()
1050 .position(|c| c == field.name())
1051 .ok_or_else(|| {
1052 DfOlapError::SchemaMismatch(format!(
1053 "column '{}' not in INSERT column list",
1054 field.name()
1055 ))
1056 })?;
1057 let col = batch.column(idx);
1058 let col = if col.data_type() != field.data_type() {
1059 arrow::compute::cast(col, field.data_type())?
1060 } else {
1061 col.clone()
1062 };
1063 columns.push(col);
1064 }
1065 let aligned = RecordBatch::try_new(table_schema.clone(), columns)?;
1066 total_rows += aligned.num_rows() as u64;
1067 aligned_batches.push(aligned);
1068 }
1069 Ok((aligned_batches, total_rows))
1070}
1071
1072struct StreamAdapter(datafusion::physical_plan::SendableRecordBatchStream);
1074
1075impl futures_core::Stream for StreamAdapter {
1076 type Item = Result<RecordBatch, Box<dyn std::error::Error + Send + Sync>>;
1077
1078 fn poll_next(
1079 mut self: std::pin::Pin<&mut Self>,
1080 cx: &mut std::task::Context<'_>,
1081 ) -> std::task::Poll<Option<Self::Item>> {
1082 std::pin::Pin::new(&mut self.0).poll_next(cx).map(|opt| {
1083 opt.map(|r| r.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>))
1084 })
1085 }
1086}
1087
1088#[derive(Clone)]
1099pub struct SharedDataFusionEngine(pub Arc<DataFusionEngine>);
1100
1101impl SharedDataFusionEngine {
1102 pub fn new(engine: DataFusionEngine) -> Self {
1104 Self(Arc::new(engine))
1105 }
1106}
1107
1108impl std::ops::Deref for SharedDataFusionEngine {
1109 type Target = DataFusionEngine;
1110 fn deref(&self) -> &Self::Target {
1111 &self.0
1112 }
1113}
1114
1115impl rhei_core::OlapEngine for SharedDataFusionEngine {
1116 type Error = DfOlapError;
1117
1118 async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
1119 self.0.query(sql).await
1120 }
1121
1122 async fn query_stream(
1123 &self,
1124 sql: &str,
1125 ) -> Result<rhei_core::RecordBatchBoxStream, Self::Error> {
1126 self.0.query_stream(sql).await
1127 }
1128
1129 async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
1130 self.0.execute(sql).await
1131 }
1132
1133 async fn load_arrow(&self, table: &str, batches: &[RecordBatch]) -> Result<u64, Self::Error> {
1134 self.0.load_arrow(table, batches).await
1135 }
1136
1137 async fn create_table(
1138 &self,
1139 table_name: &str,
1140 schema: &SchemaRef,
1141 primary_key: &[String],
1142 ) -> Result<(), Self::Error> {
1143 self.0.create_table(table_name, schema, primary_key).await
1144 }
1145
1146 async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
1147 self.0.table_exists(table_name).await
1148 }
1149
1150 async fn add_column(
1151 &self,
1152 table_name: &str,
1153 column_name: &str,
1154 data_type: &DataType,
1155 ) -> Result<(), Self::Error> {
1156 self.0.add_column(table_name, column_name, data_type).await
1157 }
1158
1159 async fn drop_column(&self, table_name: &str, column_name: &str) -> Result<(), Self::Error> {
1160 self.0.drop_column(table_name, column_name).await
1161 }
1162}
1163
1164fn expr_to_sql_literal(expr: &Expr) -> Result<String, DfOlapError> {
1170 match expr {
1171 Expr::Value(v) => match &v.value {
1172 Value::Number(n, _) => Ok(n.clone()),
1173 Value::SingleQuotedString(s) => Ok(format!("'{}'", s.replace('\'', "''"))),
1174 Value::Boolean(b) => Ok(if *b { "TRUE".into() } else { "FALSE".into() }),
1175 Value::Null => Ok("NULL".into()),
1176 other => Err(DfOlapError::Other(format!(
1177 "unsupported value literal: {other:?}"
1178 ))),
1179 },
1180 Expr::UnaryOp {
1181 op: UnaryOperator::Minus,
1182 expr: inner,
1183 } => {
1184 if let Expr::Value(v) = inner.as_ref() {
1185 if let Value::Number(n, _) = &v.value {
1186 return Ok(format!("-{n}"));
1187 }
1188 }
1189 Err(DfOlapError::Other(format!(
1190 "unsupported unary expression: {expr}"
1191 )))
1192 }
1193 other => Err(DfOlapError::Other(format!(
1194 "unsupported expression in VALUES: {other}"
1195 ))),
1196 }
1197}
1198
1199fn ident_from_expr(expr: &Expr) -> Result<String, DfOlapError> {
1201 match expr {
1202 Expr::Identifier(ident) => Ok(ident.value.clone()),
1203 Expr::CompoundIdentifier(parts) => parts
1204 .last()
1205 .map(|i| i.value.clone())
1206 .ok_or_else(|| DfOlapError::Other("empty compound identifier".into())),
1207 other => Err(DfOlapError::Other(format!(
1208 "expected column name, got: {other}"
1209 ))),
1210 }
1211}
1212
1213fn extract_where_conditions(expr: &Expr) -> Result<Vec<(String, String)>, DfOlapError> {
1215 match expr {
1216 Expr::BinaryOp {
1217 left,
1218 op: BinaryOperator::And,
1219 right,
1220 } => {
1221 let mut conditions = extract_where_conditions(left)?;
1222 conditions.extend(extract_where_conditions(right)?);
1223 Ok(conditions)
1224 }
1225 Expr::BinaryOp {
1226 left,
1227 op: BinaryOperator::Eq,
1228 right,
1229 } => {
1230 let col = ident_from_expr(left)?;
1231 let val = expr_to_sql_literal(right)?;
1232 Ok(vec![(col, val)])
1233 }
1234 Expr::IsNull(inner) => {
1235 let col = ident_from_expr(inner)?;
1236 Ok(vec![(col, "NULL".into())])
1237 }
1238 Expr::IsNotNull(inner) => {
1239 let col = ident_from_expr(inner)?;
1240 Ok(vec![(col, "__IS_NOT_NULL__".into())])
1241 }
1242 Expr::Nested(inner) => extract_where_conditions(inner),
1243 other => Err(DfOlapError::Other(format!(
1244 "unsupported WHERE expression: {other}"
1245 ))),
1246 }
1247}
1248
1249fn parse_insert_values(sql: &str) -> Result<(String, Vec<String>, Vec<RecordBatch>), DfOlapError> {
1253 let mut stmts = Parser::parse_sql(&SQLiteDialect {}, sql)
1254 .map_err(|e| DfOlapError::Other(format!("failed to parse INSERT: {e}")))?;
1255
1256 let stmt = stmts
1257 .pop()
1258 .ok_or_else(|| DfOlapError::Other("empty SQL statement".into()))?;
1259
1260 let insert = match stmt {
1261 Statement::Insert(ins) => ins,
1262 other => {
1263 return Err(DfOlapError::Other(format!(
1264 "expected INSERT statement, got: {other:?}"
1265 )));
1266 }
1267 };
1268
1269 let table_name = match &insert.table {
1270 TableObject::TableName(obj_name) => obj_name
1271 .0
1272 .last()
1273 .and_then(|p| p.as_ident())
1274 .map(|id| id.value.clone())
1275 .ok_or_else(|| DfOlapError::Other("empty table name in INSERT".into()))?,
1276 TableObject::TableFunction(_) => {
1277 return Err(DfOlapError::Other(
1278 "INSERT INTO TABLE FUNCTION not supported".into(),
1279 ));
1280 }
1281 };
1282
1283 rhei_core::validate_identifier(&table_name).map_err(|e| DfOlapError::Other(e.to_string()))?;
1284
1285 let col_name_strings: Vec<String> = insert.columns.iter().map(|id| id.value.clone()).collect();
1286
1287 let source = match insert.source {
1288 Some(q) => q,
1289 None => return Ok((table_name, col_name_strings, vec![])),
1290 };
1291
1292 let values = match *source.body {
1293 SetExpr::Values(v) => v,
1294 other => {
1295 return Err(DfOlapError::Other(format!(
1296 "INSERT source is not a VALUES clause: {other:?}"
1297 )));
1298 }
1299 };
1300
1301 if values.rows.is_empty() {
1302 return Ok((table_name, col_name_strings, vec![]));
1303 }
1304
1305 let rows: Vec<Vec<String>> = values
1306 .rows
1307 .iter()
1308 .map(|row| {
1309 row.iter()
1310 .map(expr_to_sql_literal)
1311 .collect::<Result<_, _>>()
1312 })
1313 .collect::<Result<_, _>>()?;
1314
1315 let col_name_refs: Vec<&str> = col_name_strings.iter().map(|s| s.as_str()).collect();
1316 let num_cols = col_name_refs.len();
1317
1318 if num_cols == 0 {
1319 return Err(DfOlapError::Other(format!(
1320 "INSERT INTO {table_name} requires an explicit column list; `VALUES (...)` without columns is not supported"
1321 )));
1322 }
1323
1324 let batch = build_record_batch_from_values(&col_name_refs, &rows, num_cols)?;
1325 Ok((table_name, col_name_strings, vec![batch]))
1326}
1327
1328fn build_record_batch_from_values(
1330 col_names: &[&str],
1331 rows: &[Vec<String>],
1332 num_cols: usize,
1333) -> Result<RecordBatch, DfOlapError> {
1334 use arrow::array::*;
1335 use arrow::datatypes::{Field, Schema};
1336
1337 let mut types = vec![DataType::Utf8; num_cols];
1338 for col_idx in 0..num_cols {
1339 for row in rows {
1340 if col_idx < row.len() {
1341 let val = &row[col_idx];
1342 let upper = val.to_ascii_uppercase();
1343 if upper == "NULL" {
1344 continue;
1345 }
1346 if upper == "TRUE" || upper == "FALSE" {
1347 types[col_idx] = DataType::Boolean;
1348 break;
1349 }
1350 if val.starts_with('\'') {
1351 types[col_idx] = DataType::Utf8;
1352 break;
1353 }
1354 if val.contains('.') {
1355 if val.parse::<f64>().is_ok() {
1356 types[col_idx] = DataType::Float64;
1357 break;
1358 }
1359 } else if val.parse::<i64>().is_ok() {
1360 types[col_idx] = DataType::Int64;
1361 break;
1362 }
1363 break;
1364 }
1365 }
1366 }
1367
1368 let fields: Vec<Field> = col_names
1369 .iter()
1370 .zip(types.iter())
1371 .map(|(name, dt)| Field::new(*name, dt.clone(), true))
1372 .collect();
1373 let schema = Arc::new(Schema::new(fields));
1374
1375 let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(num_cols);
1376 for col_idx in 0..num_cols {
1377 let col_values: Vec<&str> = rows
1378 .iter()
1379 .map(|row| {
1380 if col_idx < row.len() {
1381 row[col_idx].as_str()
1382 } else {
1383 "NULL"
1384 }
1385 })
1386 .collect();
1387
1388 columns.push(build_array(&types[col_idx], &col_values)?);
1389 }
1390
1391 let batch = RecordBatch::try_new(schema, columns)?;
1392 Ok(batch)
1393}
1394
1395fn build_array(dt: &DataType, values: &[&str]) -> Result<Arc<dyn Array>, DfOlapError> {
1397 use arrow::array::*;
1398
1399 match dt {
1400 DataType::Int64 => {
1401 let mut builder = Int64Builder::new();
1402 for v in values {
1403 if v.eq_ignore_ascii_case("NULL") {
1404 builder.append_null();
1405 } else {
1406 builder.append_value(
1407 v.parse::<i64>()
1408 .map_err(|e| DfOlapError::Other(format!("parse i64: {e}")))?,
1409 );
1410 }
1411 }
1412 Ok(Arc::new(builder.finish()))
1413 }
1414 DataType::Float64 => {
1415 let mut builder = Float64Builder::new();
1416 for v in values {
1417 if v.eq_ignore_ascii_case("NULL") {
1418 builder.append_null();
1419 } else {
1420 builder.append_value(
1421 v.parse::<f64>()
1422 .map_err(|e| DfOlapError::Other(format!("parse f64: {e}")))?,
1423 );
1424 }
1425 }
1426 Ok(Arc::new(builder.finish()))
1427 }
1428 DataType::Boolean => {
1429 let mut builder = BooleanBuilder::new();
1430 for v in values {
1431 let upper = v.to_ascii_uppercase();
1432 if upper == "NULL" {
1433 builder.append_null();
1434 } else {
1435 builder.append_value(upper == "TRUE");
1436 }
1437 }
1438 Ok(Arc::new(builder.finish()))
1439 }
1440 _ => {
1441 let mut builder = StringBuilder::new();
1442 for v in values {
1443 if v.eq_ignore_ascii_case("NULL") {
1444 builder.append_null();
1445 } else {
1446 let stripped = if v.starts_with('\'') && v.ends_with('\'') && v.len() >= 2 {
1447 &v[1..v.len() - 1]
1448 } else {
1449 v
1450 };
1451 builder.append_value(stripped.replace("''", "'"));
1452 }
1453 }
1454 Ok(Arc::new(builder.finish()))
1455 }
1456 }
1457}
1458
1459type ColVal = (String, String);
1461
1462fn parse_update(sql: &str) -> Result<(String, Vec<ColVal>, Vec<ColVal>), DfOlapError> {
1464 let mut stmts = Parser::parse_sql(&SQLiteDialect {}, sql)
1465 .map_err(|e| DfOlapError::Other(format!("failed to parse UPDATE: {e}")))?;
1466
1467 let stmt = stmts
1468 .pop()
1469 .ok_or_else(|| DfOlapError::Other("empty SQL statement".into()))?;
1470
1471 let update = match stmt {
1472 Statement::Update(upd) => upd,
1473 other => {
1474 return Err(DfOlapError::Other(format!(
1475 "expected UPDATE statement, got: {other:?}"
1476 )));
1477 }
1478 };
1479
1480 let table_name = match &update.table.relation {
1481 TableFactor::Table { name, .. } => name
1482 .0
1483 .last()
1484 .and_then(|p| p.as_ident())
1485 .map(|id| id.value.clone())
1486 .ok_or_else(|| DfOlapError::Other("empty table name in UPDATE".into()))?,
1487 other => {
1488 return Err(DfOlapError::Other(format!(
1489 "unexpected table factor in UPDATE: {other:?}"
1490 )));
1491 }
1492 };
1493
1494 let assignments: Vec<ColVal> = update
1495 .assignments
1496 .iter()
1497 .map(|a| {
1498 let col = match &a.target {
1499 AssignmentTarget::ColumnName(obj) => obj
1500 .0
1501 .last()
1502 .and_then(|p| p.as_ident())
1503 .map(|id| id.value.clone())
1504 .ok_or_else(|| DfOlapError::Other("empty column name in SET".into()))?,
1505 AssignmentTarget::Tuple(_) => {
1506 return Err(DfOlapError::Other(
1507 "tuple assignments in SET not supported".into(),
1508 ));
1509 }
1510 };
1511 let val = expr_to_sql_literal(&a.value)?;
1512 Ok((col, val))
1513 })
1514 .collect::<Result<_, DfOlapError>>()?;
1515
1516 let where_clause = match &update.selection {
1517 Some(expr) => extract_where_conditions(expr)?,
1518 None => vec![],
1519 };
1520
1521 Ok((table_name, assignments, where_clause))
1522}
1523
1524fn parse_delete(sql: &str) -> Result<(String, Vec<(String, String)>), DfOlapError> {
1526 let mut stmts = Parser::parse_sql(&SQLiteDialect {}, sql)
1527 .map_err(|e| DfOlapError::Other(format!("failed to parse DELETE: {e}")))?;
1528
1529 let stmt = stmts
1530 .pop()
1531 .ok_or_else(|| DfOlapError::Other("empty SQL statement".into()))?;
1532
1533 let delete = match stmt {
1534 Statement::Delete(del) => del,
1535 other => {
1536 return Err(DfOlapError::Other(format!(
1537 "expected DELETE statement, got: {other:?}"
1538 )));
1539 }
1540 };
1541
1542 let tables = match &delete.from {
1543 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
1544 };
1545
1546 let table_name = tables
1547 .first()
1548 .and_then(|twj| {
1549 if let TableFactor::Table { name, .. } = &twj.relation {
1550 name.0
1551 .last()
1552 .and_then(|p| p.as_ident())
1553 .map(|id| id.value.clone())
1554 } else {
1555 None
1556 }
1557 })
1558 .ok_or_else(|| DfOlapError::Other("missing table name in DELETE".into()))?;
1559
1560 let where_clause = match &delete.selection {
1561 Some(expr) => extract_where_conditions(expr)?,
1562 None => vec![],
1563 };
1564
1565 Ok((table_name, where_clause))
1566}
1567
1568fn flatten_batches(
1570 batches: &[RecordBatch],
1571 schema: &SchemaRef,
1572) -> Result<Option<RecordBatch>, DfOlapError> {
1573 if batches.is_empty() {
1574 return Ok(None);
1575 }
1576
1577 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1578 if total_rows == 0 {
1579 return Ok(None);
1580 }
1581
1582 let batch = arrow::compute::concat_batches(schema, batches)?;
1583 Ok(Some(batch))
1584}
1585
1586fn apply_update(
1588 batch: &RecordBatch,
1589 schema: &SchemaRef,
1590 assignments: &[(String, String)],
1591 where_conditions: &[(String, String)],
1592) -> Result<(RecordBatch, u64), DfOlapError> {
1593 let matching = find_matching_rows(batch, schema, where_conditions)?;
1594 let updated_count = matching.iter().filter(|&&m| m).count() as u64;
1595
1596 let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(schema.fields().len());
1597 for (col_idx, field) in schema.fields().iter().enumerate() {
1598 let assignment = assignments.iter().find(|(col, _)| col == field.name());
1599
1600 if let Some((_, new_val)) = assignment {
1601 let original = batch.column(col_idx);
1602 new_columns.push(apply_value_to_matching(
1603 original,
1604 &matching,
1605 new_val,
1606 field.data_type(),
1607 )?);
1608 } else {
1609 new_columns.push(batch.column(col_idx).clone());
1610 }
1611 }
1612
1613 let new_batch = RecordBatch::try_new(schema.clone(), new_columns)?;
1614 Ok((new_batch, updated_count))
1615}
1616
1617fn apply_delete(
1619 batch: &RecordBatch,
1620 schema: &SchemaRef,
1621 where_conditions: &[(String, String)],
1622) -> Result<(RecordBatch, u64), DfOlapError> {
1623 let matching = find_matching_rows(batch, schema, where_conditions)?;
1624 let deleted_count = matching.iter().filter(|&&m| m).count() as u64;
1625
1626 let mut builder = BooleanBuilder::new();
1627 for &m in &matching {
1628 builder.append_value(!m);
1629 }
1630 let filter_array = builder.finish();
1631
1632 let new_columns: Vec<Arc<dyn Array>> = (0..batch.num_columns())
1633 .map(|i| arrow::compute::filter(batch.column(i), &filter_array).map_err(DfOlapError::Arrow))
1634 .collect::<Result<_, _>>()?;
1635
1636 let new_batch = RecordBatch::try_new(schema.clone(), new_columns)?;
1637 Ok((new_batch, deleted_count))
1638}
1639
1640fn find_matching_rows(
1642 batch: &RecordBatch,
1643 schema: &SchemaRef,
1644 conditions: &[(String, String)],
1645) -> Result<Vec<bool>, DfOlapError> {
1646 let num_rows = batch.num_rows();
1647 let mut matching = vec![true; num_rows];
1648
1649 for (col_name, expected_val) in conditions {
1650 let col_idx = schema
1651 .fields()
1652 .iter()
1653 .position(|f| f.name() == col_name)
1654 .ok_or_else(|| DfOlapError::Other(format!("column not found: {col_name}")))?;
1655
1656 let col = batch.column(col_idx);
1657 for (row_idx, m) in matching.iter_mut().enumerate() {
1658 if !*m {
1659 continue;
1660 }
1661 *m = value_matches(col, row_idx, expected_val);
1662 }
1663 }
1664
1665 Ok(matching)
1666}
1667
1668fn value_matches(array: &dyn Array, row_idx: usize, expected: &str) -> bool {
1670 if expected == "__IS_NOT_NULL__" {
1671 return !array.is_null(row_idx);
1672 }
1673 if array.is_null(row_idx) {
1674 return expected.eq_ignore_ascii_case("NULL");
1675 }
1676
1677 match array.data_type() {
1678 DataType::Int8 => {
1679 expected.parse::<i8>().ok() == Some(array.as_primitive::<Int8Type>().value(row_idx))
1680 }
1681 DataType::Int16 => {
1682 expected.parse::<i16>().ok() == Some(array.as_primitive::<Int16Type>().value(row_idx))
1683 }
1684 DataType::Int32 => {
1685 expected.parse::<i32>().ok() == Some(array.as_primitive::<Int32Type>().value(row_idx))
1686 }
1687 DataType::Int64 => {
1688 expected.parse::<i64>().ok() == Some(array.as_primitive::<Int64Type>().value(row_idx))
1689 }
1690 DataType::UInt8 => {
1691 expected.parse::<u8>().ok() == Some(array.as_primitive::<UInt8Type>().value(row_idx))
1692 }
1693 DataType::UInt16 => {
1694 expected.parse::<u16>().ok() == Some(array.as_primitive::<UInt16Type>().value(row_idx))
1695 }
1696 DataType::UInt32 => {
1697 expected.parse::<u32>().ok() == Some(array.as_primitive::<UInt32Type>().value(row_idx))
1698 }
1699 DataType::UInt64 => {
1700 expected.parse::<u64>().ok() == Some(array.as_primitive::<UInt64Type>().value(row_idx))
1701 }
1702 DataType::Float32 => {
1703 expected.parse::<f32>().ok() == Some(array.as_primitive::<Float32Type>().value(row_idx))
1704 }
1705 DataType::Float64 => {
1706 expected.parse::<f64>().ok() == Some(array.as_primitive::<Float64Type>().value(row_idx))
1707 }
1708 DataType::Utf8 => {
1709 let arr = array.as_string::<i32>();
1710 let stripped =
1711 if expected.starts_with('\'') && expected.ends_with('\'') && expected.len() >= 2 {
1712 &expected[1..expected.len() - 1]
1713 } else {
1714 expected
1715 };
1716 arr.value(row_idx) == stripped
1717 }
1718 DataType::Boolean => {
1719 let arr = array.as_boolean();
1720 match expected.to_ascii_uppercase().as_str() {
1721 "TRUE" => arr.value(row_idx),
1722 "FALSE" => !arr.value(row_idx),
1723 _ => false,
1724 }
1725 }
1726 _ => false,
1727 }
1728}
1729
1730fn apply_value_to_matching(
1732 original: &dyn Array,
1733 matching: &[bool],
1734 new_val: &str,
1735 dt: &DataType,
1736) -> Result<Arc<dyn Array>, DfOlapError> {
1737 use arrow::array::*;
1738
1739 match dt {
1740 DataType::Int64 => {
1741 let orig = original.as_primitive::<Int64Type>();
1742 let parsed: i64 = new_val
1743 .parse()
1744 .map_err(|e| DfOlapError::Other(format!("parse i64: {e}")))?;
1745 let mut builder = Int64Builder::new();
1746 for (i, &m) in matching.iter().enumerate() {
1747 if m {
1748 builder.append_value(parsed);
1749 } else if orig.is_null(i) {
1750 builder.append_null();
1751 } else {
1752 builder.append_value(orig.value(i));
1753 }
1754 }
1755 Ok(Arc::new(builder.finish()))
1756 }
1757 DataType::Float64 => {
1758 let orig = original.as_primitive::<Float64Type>();
1759 let parsed: f64 = new_val
1760 .parse()
1761 .map_err(|e| DfOlapError::Other(format!("parse f64: {e}")))?;
1762 let mut builder = Float64Builder::new();
1763 for (i, &m) in matching.iter().enumerate() {
1764 if m {
1765 builder.append_value(parsed);
1766 } else if orig.is_null(i) {
1767 builder.append_null();
1768 } else {
1769 builder.append_value(orig.value(i));
1770 }
1771 }
1772 Ok(Arc::new(builder.finish()))
1773 }
1774 DataType::Utf8 => {
1775 let orig = original.as_string::<i32>();
1776 let stripped =
1777 if new_val.starts_with('\'') && new_val.ends_with('\'') && new_val.len() >= 2 {
1778 &new_val[1..new_val.len() - 1]
1779 } else {
1780 new_val
1781 };
1782 let unescaped = stripped.replace("''", "'");
1783 let mut builder = StringBuilder::new();
1784 for (i, &m) in matching.iter().enumerate() {
1785 if m {
1786 builder.append_value(&unescaped);
1787 } else if orig.is_null(i) {
1788 builder.append_null();
1789 } else {
1790 builder.append_value(orig.value(i));
1791 }
1792 }
1793 Ok(Arc::new(builder.finish()))
1794 }
1795 DataType::Boolean => {
1796 let orig = original.as_boolean();
1797 let parsed = new_val.eq_ignore_ascii_case("TRUE");
1798 let mut builder = BooleanBuilder::new();
1799 for (i, &m) in matching.iter().enumerate() {
1800 if m {
1801 builder.append_value(parsed);
1802 } else if orig.is_null(i) {
1803 builder.append_null();
1804 } else {
1805 builder.append_value(orig.value(i));
1806 }
1807 }
1808 Ok(Arc::new(builder.finish()))
1809 }
1810 _ => {
1811 let orig = original.as_string::<i32>();
1812 let mut builder = StringBuilder::new();
1813 for (i, &m) in matching.iter().enumerate() {
1814 if m {
1815 builder.append_value(new_val);
1816 } else if orig.is_null(i) {
1817 builder.append_null();
1818 } else {
1819 builder.append_value(orig.value(i));
1820 }
1821 }
1822 Ok(Arc::new(builder.finish()))
1823 }
1824 }
1825}
1826
1827#[cfg(test)]
1828mod tests {
1829 use super::*;
1830 use arrow::datatypes::{Field, Schema};
1831 use rhei_core::OlapEngine;
1832
1833 fn users_schema() -> SchemaRef {
1834 Arc::new(Schema::new(vec![
1835 Field::new("id", DataType::Int64, false),
1836 Field::new("name", DataType::Utf8, true),
1837 Field::new("age", DataType::Int64, true),
1838 ]))
1839 }
1840
1841 fn make_in_memory(_: &std::path::Path) -> DataFusionEngine {
1842 DataFusionEngine::new()
1843 }
1844
1845 fn make_vortex(tmp: &std::path::Path) -> DataFusionEngine {
1846 DataFusionEngine::with_storage(StorageMode::Vortex {
1847 url: tmp.join("vortex_olap").to_string_lossy().to_string(),
1848 })
1849 .unwrap()
1850 }
1851
1852 macro_rules! storage_mode_tests {
1854 ($mod_name:ident, $make_engine:ident) => {
1855 mod $mod_name {
1856 use super::*;
1857
1858 #[tokio::test]
1859 async fn create_and_query_empty() {
1860 let _tmp = tempfile::tempdir().unwrap();
1861 let engine = $make_engine(_tmp.path());
1862 let schema = users_schema();
1863 engine.create_table("users", &schema, &[]).await.unwrap();
1864
1865 assert!(engine.table_exists("users").await.unwrap());
1866 assert!(!engine.table_exists("nonexistent").await.unwrap());
1867 }
1868
1869 #[tokio::test]
1870 async fn insert_and_query() {
1871 let _tmp = tempfile::tempdir().unwrap();
1872 let engine = $make_engine(_tmp.path());
1873 let schema = users_schema();
1874 engine.create_table("users", &schema, &[]).await.unwrap();
1875
1876 engine
1877 .execute("INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30)")
1878 .await
1879 .unwrap();
1880 engine
1881 .execute("INSERT INTO users (id, name, age) VALUES (2, 'Bob', 25)")
1882 .await
1883 .unwrap();
1884
1885 let batches = engine
1886 .query("SELECT * FROM users ORDER BY id")
1887 .await
1888 .unwrap();
1889 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1890 assert_eq!(total_rows, 2);
1891 }
1892
1893 #[tokio::test]
1894 async fn update() {
1895 let _tmp = tempfile::tempdir().unwrap();
1896 let engine = $make_engine(_tmp.path());
1897 let schema = users_schema();
1898 engine.create_table("users", &schema, &[]).await.unwrap();
1899
1900 engine
1901 .execute("INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30)")
1902 .await
1903 .unwrap();
1904
1905 let updated = engine
1906 .execute("UPDATE users SET age = 31 WHERE id = 1")
1907 .await
1908 .unwrap();
1909 assert_eq!(updated, 1);
1910
1911 let batches = engine
1912 .query("SELECT age FROM users WHERE id = 1")
1913 .await
1914 .unwrap();
1915 let age = batches[0].column(0).as_primitive::<Int64Type>().value(0);
1916 assert_eq!(age, 31);
1917 }
1918
1919 #[tokio::test]
1920 async fn delete() {
1921 let _tmp = tempfile::tempdir().unwrap();
1922 let engine = $make_engine(_tmp.path());
1923 let schema = users_schema();
1924 engine.create_table("users", &schema, &[]).await.unwrap();
1925
1926 engine
1927 .execute("INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30), (2, 'Bob', 25)")
1928 .await
1929 .unwrap();
1930
1931 let deleted = engine
1932 .execute("DELETE FROM users WHERE id = 1")
1933 .await
1934 .unwrap();
1935 assert_eq!(deleted, 1);
1936
1937 let batches = engine
1938 .query("SELECT COUNT(*) as cnt FROM users")
1939 .await
1940 .unwrap();
1941 let count = batches[0].column(0).as_primitive::<Int64Type>().value(0);
1942 assert_eq!(count, 1);
1943 }
1944
1945 #[tokio::test]
1946 async fn load_arrow_bulk() {
1947 let _tmp = tempfile::tempdir().unwrap();
1948 let engine = $make_engine(_tmp.path());
1949 let schema = users_schema();
1950 engine.create_table("users", &schema, &[]).await.unwrap();
1951
1952 let batch = RecordBatch::try_new(
1953 schema.clone(),
1954 vec![
1955 Arc::new(arrow::array::Int64Array::from(vec![1, 2, 3])),
1956 Arc::new(arrow::array::StringArray::from(vec![
1957 "Alice", "Bob", "Charlie",
1958 ])),
1959 Arc::new(arrow::array::Int64Array::from(vec![30, 25, 35])),
1960 ],
1961 )
1962 .unwrap();
1963
1964 let loaded = engine.load_arrow("users", &[batch]).await.unwrap();
1965 assert_eq!(loaded, 3);
1966
1967 let batches = engine
1968 .query("SELECT COUNT(*) as cnt FROM users")
1969 .await
1970 .unwrap();
1971 let count = batches[0].column(0).as_primitive::<Int64Type>().value(0);
1972 assert_eq!(count, 3);
1973 }
1974
1975 #[tokio::test]
1976 async fn aggregate() {
1977 let _tmp = tempfile::tempdir().unwrap();
1978 let engine = $make_engine(_tmp.path());
1979 let schema = users_schema();
1980 engine.create_table("users", &schema, &[]).await.unwrap();
1981
1982 engine
1983 .execute(
1984 "INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30), (2, 'Bob', 25), (3, 'Charlie', 35)",
1985 )
1986 .await
1987 .unwrap();
1988
1989 let batches = engine
1990 .query("SELECT AVG(age) as avg_age FROM users")
1991 .await
1992 .unwrap();
1993 let avg = batches[0].column(0).as_primitive::<Float64Type>().value(0);
1994 assert!((avg - 30.0).abs() < 0.01);
1995 }
1996 }
1997 };
1998 }
1999
2000 storage_mode_tests!(in_memory, make_in_memory);
2001 storage_mode_tests!(vortex_local, make_vortex);
2002
2003 #[tokio::test]
2010 async fn vortex_local_persist_restart() {
2011 let tmp = tempfile::tempdir().unwrap();
2012 let base = tmp.path().join("restart_test");
2013
2014 let schema = users_schema();
2015
2016 {
2018 let engine = DataFusionEngine::with_storage(StorageMode::Vortex {
2019 url: base.to_string_lossy().to_string(),
2020 })
2021 .unwrap();
2022 engine.create_table("users", &schema, &[]).await.unwrap();
2023 engine
2024 .execute(
2025 "INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30), (2, 'Bob', 25)",
2026 )
2027 .await
2028 .unwrap();
2029 }
2030
2031 {
2033 let engine2 = DataFusionEngine::with_storage(StorageMode::Vortex {
2034 url: base.to_string_lossy().to_string(),
2035 })
2036 .unwrap();
2037 engine2.create_table("users", &schema, &[]).await.unwrap();
2039
2040 let batches = engine2
2041 .query("SELECT COUNT(*) as cnt FROM users")
2042 .await
2043 .unwrap();
2044 let count = batches[0].column(0).as_primitive::<Int64Type>().value(0);
2045 assert_eq!(count, 2, "data should survive engine restart");
2046 }
2047 }
2048
2049 #[tokio::test]
2054 async fn insert_string_with_comma() {
2055 let engine = DataFusionEngine::new();
2056 let schema = users_schema();
2057 engine.create_table("users", &schema, &[]).await.unwrap();
2058
2059 engine
2060 .execute("INSERT INTO users (id, name, age) VALUES (1, 'Alice, B', 30)")
2061 .await
2062 .unwrap();
2063
2064 let batches = engine
2065 .query("SELECT name FROM users WHERE id = 1")
2066 .await
2067 .unwrap();
2068 let name_arr = batches[0].column(0).as_string::<i32>();
2069 assert_eq!(name_arr.value(0), "Alice, B");
2070 }
2071
2072 #[tokio::test]
2073 async fn insert_null_value() {
2074 let engine = DataFusionEngine::new();
2075 let schema = users_schema();
2076 engine.create_table("users", &schema, &[]).await.unwrap();
2077
2078 engine
2079 .execute("INSERT INTO users (id, name, age) VALUES (1, NULL, 30)")
2080 .await
2081 .unwrap();
2082
2083 let batches = engine
2084 .query("SELECT name FROM users WHERE id = 1")
2085 .await
2086 .unwrap();
2087 assert!(batches[0].column(0).is_null(0));
2088 }
2089
2090 #[tokio::test]
2091 async fn update_where_and() {
2092 let engine = DataFusionEngine::new();
2093 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
2094 arrow::datatypes::Field::new("id", DataType::Int64, false),
2095 arrow::datatypes::Field::new("name", DataType::Utf8, true),
2096 arrow::datatypes::Field::new("status", DataType::Utf8, true),
2097 ]));
2098 engine.create_table("t", &schema, &[]).await.unwrap();
2099
2100 engine
2101 .execute("INSERT INTO t (id, name, status) VALUES (1, 'x', 'active')")
2102 .await
2103 .unwrap();
2104 engine
2105 .execute("INSERT INTO t (id, name, status) VALUES (2, 'y', 'inactive')")
2106 .await
2107 .unwrap();
2108
2109 let updated = engine
2110 .execute("UPDATE t SET name = 'updated' WHERE id = 1 AND status = 'active'")
2111 .await
2112 .unwrap();
2113 assert_eq!(updated, 1);
2114
2115 let batches = engine
2116 .query("SELECT name FROM t WHERE id = 1")
2117 .await
2118 .unwrap();
2119 assert_eq!(batches[0].column(0).as_string::<i32>().value(0), "updated");
2120
2121 let batches2 = engine
2122 .query("SELECT name FROM t WHERE id = 2")
2123 .await
2124 .unwrap();
2125 assert_eq!(batches2[0].column(0).as_string::<i32>().value(0), "y");
2126 }
2127
2128 #[tokio::test]
2129 async fn delete_quoted_identifier() {
2130 let engine = DataFusionEngine::new();
2131 let schema = users_schema();
2132 engine.create_table("users", &schema, &[]).await.unwrap();
2133
2134 engine
2135 .execute("INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30)")
2136 .await
2137 .unwrap();
2138 engine
2139 .execute("INSERT INTO users (id, name, age) VALUES (2, 'Bob', 25)")
2140 .await
2141 .unwrap();
2142
2143 let deleted = engine
2144 .execute(r#"DELETE FROM "users" WHERE id = 1"#)
2145 .await
2146 .unwrap();
2147 assert_eq!(deleted, 1);
2148
2149 let batches = engine.query("SELECT COUNT(*) FROM users").await.unwrap();
2150 let count = batches[0].column(0).as_primitive::<Int64Type>().value(0);
2151 assert_eq!(count, 1);
2152 }
2153
2154 #[tokio::test]
2155 async fn insert_escaped_single_quote() {
2156 let engine = DataFusionEngine::new();
2157 let schema = users_schema();
2158 engine.create_table("users", &schema, &[]).await.unwrap();
2159
2160 engine
2161 .execute("INSERT INTO users (id, name, age) VALUES (1, 'O''Brien', 42)")
2162 .await
2163 .unwrap();
2164
2165 let batches = engine
2166 .query("SELECT name FROM users WHERE id = 1")
2167 .await
2168 .unwrap();
2169 assert_eq!(batches[0].column(0).as_string::<i32>().value(0), "O'Brien");
2170 }
2171
2172 #[test]
2173 fn parse_insert_multi_row() {
2174 let (table, cols, batches) =
2175 parse_insert_values("INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')")
2176 .unwrap();
2177 assert_eq!(table, "users");
2178 assert_eq!(cols, vec!["id", "name"]);
2179 assert_eq!(batches.len(), 1);
2180 assert_eq!(batches[0].num_rows(), 2);
2181 }
2182
2183 #[test]
2184 fn parse_update_basic() {
2185 let (table, assignments, where_clause) =
2186 parse_update("UPDATE users SET name = 'Alice' WHERE id = 1").unwrap();
2187 assert_eq!(table, "users");
2188 assert_eq!(
2189 assignments,
2190 vec![("name".to_string(), "'Alice'".to_string())]
2191 );
2192 assert_eq!(where_clause, vec![("id".to_string(), "1".to_string())]);
2193 }
2194
2195 #[test]
2196 fn parse_delete_no_where() {
2197 let (table, conditions) = parse_delete("DELETE FROM logs").unwrap();
2198 assert_eq!(table, "logs");
2199 assert!(conditions.is_empty());
2200 }
2201
2202 #[test]
2207 fn vortex_url_local_path_classified() {
2208 let mode = StorageMode::Vortex {
2209 url: "/tmp/rhei".to_string(),
2210 };
2211 assert!(!mode.is_cloud());
2212 assert!(mode.local_base_path().is_some());
2213 }
2214
2215 #[cfg(feature = "cloud-storage")]
2216 #[test]
2217 fn vortex_url_s3_classified() {
2218 let mode = StorageMode::Vortex {
2219 url: "s3://my-bucket/prefix".to_string(),
2220 };
2221 assert!(mode.is_cloud());
2222 assert_eq!(mode.cloud_base_url(), Some("s3://my-bucket/prefix"));
2223 }
2224
2225 #[cfg(feature = "cloud-storage")]
2230 #[tokio::test]
2231 async fn vortex_s3_round_trip() {
2232 if std::env::var("RHEI_TEST_S3").as_deref() != Ok("1") {
2233 return; }
2235
2236 use std::time::{SystemTime, UNIX_EPOCH};
2237
2238 let ts = SystemTime::now()
2240 .duration_since(UNIX_EPOCH)
2241 .unwrap()
2242 .subsec_nanos();
2243 let prefix = format!("test_{:08x}", ts);
2244 let base_url = format!("s3://pixai-rec-sys/dev/rhei/{prefix}");
2245
2246 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
2247 arrow::datatypes::Field::new("id", DataType::Int64, false),
2248 arrow::datatypes::Field::new("val", DataType::Utf8, true),
2249 ]));
2250
2251 let engine = DataFusionEngine::with_storage(StorageMode::Vortex {
2252 url: base_url.clone(),
2253 })
2254 .expect("S3 engine construction should succeed with AWS credentials");
2255
2256 engine.create_table("s3test", &schema, &[]).await.unwrap();
2257 engine
2258 .execute("INSERT INTO s3test (id, val) VALUES (1, 'hello'), (2, 'world')")
2259 .await
2260 .unwrap();
2261
2262 let batches = engine
2263 .query("SELECT COUNT(*) as cnt FROM s3test")
2264 .await
2265 .unwrap();
2266 let count = batches[0].column(0).as_primitive::<Int64Type>().value(0);
2267 assert_eq!(count, 2, "S3 round-trip INSERT+SELECT should return 2 rows");
2268
2269 let updated = engine
2271 .execute("UPDATE s3test SET val = 'updated' WHERE id = 1")
2272 .await
2273 .unwrap();
2274 assert_eq!(updated, 1);
2275
2276 let batches2 = engine
2277 .query("SELECT val FROM s3test WHERE id = 1")
2278 .await
2279 .unwrap();
2280 assert_eq!(batches2[0].column(0).as_string::<i32>().value(0), "updated");
2281
2282 tracing::warn!(prefix, "S3 test data not cleaned up — remove manually");
2286 }
2287}