Skip to main content

rhei_datafusion/
engine.rs

1//! DataFusion-backed OLAP engine.
2//!
3//! Uses Apache DataFusion as the query engine with pluggable storage:
4//! - **InMemory**: `Vec<RecordBatch>` in a `HashMap` (default, lost on shutdown)
5//! - **Vortex local**: `.vortex` files per table in a local directory
6//! - **Vortex S3**: `.vortex` objects in an S3-compatible bucket (requires
7//!   `cloud-storage` feature)
8//!
9//! ## DML Strategy
10//!
11//! DataFusion's `MemTable` does not support INSERT/UPDATE/DELETE DML natively.
12//! For `InMemory` mode, we maintain a `HashMap<String, TableData>` and
13//! re-register a fresh `MemTable` after each mutation.
14//!
15//! For `Vortex` mode, DataFusion writes directly through the `VortexFormatFactory`
16//! sink: INSERT uses `ctx.sql("INSERT INTO ...")`, UPDATE/DELETE use a
17//! read-modify-write cycle (read all → mutate → truncate directory → re-insert).
18
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22
23use arrow::array::{Array, AsArray, BooleanBuilder, RecordBatch};
24use arrow::datatypes::{
25    DataType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
26    UInt16Type, UInt32Type, UInt64Type, UInt8Type,
27};
28use datafusion::common::GetExt;
29use datafusion::datasource::listing::{
30    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
31};
32use datafusion::datasource::provider::DefaultTableFactory;
33use datafusion::datasource::MemTable;
34use datafusion::execution::SessionStateBuilder;
35use datafusion::prelude::*;
36use sqlparser::ast::{
37    AssignmentTarget, BinaryOperator, Expr, FromTable, SetExpr, Statement, TableFactor,
38    TableObject, UnaryOperator, Value,
39};
40use sqlparser::dialect::SQLiteDialect;
41use sqlparser::parser::Parser;
42use tokio::sync::RwLock;
43use tracing::debug;
44use vortex::session::VortexSession;
45use vortex::VortexSessionDefault;
46use vortex_datafusion::VortexFormat;
47use vortex_datafusion::VortexFormatFactory;
48
49use crate::error::DfOlapError;
50use crate::storage::{StorageMode, VortexLocation};
51
52// cloud-storage: object_store + url imports
53#[cfg(feature = "cloud-storage")]
54use url::Url;
55
56/// Per-table in-memory data store (used in `InMemory` mode).
57struct TableData {
58    schema: SchemaRef,
59    /// Stored as a flat list of RecordBatches. Periodically compacted.
60    batches: Vec<RecordBatch>,
61}
62
63/// Per-table metadata for Vortex-backed tables.
64///
65/// Tracks the local table directory or cloud URL so that DML operations
66/// (read-modify-write for UPDATE/DELETE) can access the data directly.
67struct VortexTableMeta {
68    schema: SchemaRef,
69    /// DataFusion listing URL for this table (e.g. `file:///base/users/` or `s3://...`)
70    table_url: String,
71}
72
73/// Build a DataFusion `SessionContext` with VortexFormatFactory registered.
74///
75/// For S3-backed storage (requires `cloud-storage` feature), also registers
76/// an AmazonS3 object store for the bucket.
77fn build_vortex_session_context(location: &VortexLocation) -> Result<SessionContext, DfOlapError> {
78    let factory = Arc::new(VortexFormatFactory::new());
79
80    let mut state_builder = SessionStateBuilder::new()
81        .with_default_features()
82        .with_table_factory(
83            factory.get_ext().to_uppercase(),
84            Arc::new(DefaultTableFactory::new()),
85        );
86
87    if let Some(file_formats) = state_builder.file_formats() {
88        file_formats.push(factory.clone() as _);
89    }
90
91    let ctx = SessionContext::new_with_state(state_builder.build()).enable_url_table();
92
93    // For S3 locations, register the object store with the session.
94    #[cfg(feature = "cloud-storage")]
95    if let VortexLocation::S3 { url } = location {
96        let bucket = parse_s3_bucket(url)?;
97        let store: Arc<dyn object_store::ObjectStore> = Arc::new(
98            object_store::aws::AmazonS3Builder::from_env()
99                .with_bucket_name(&bucket)
100                .build()
101                .map_err(DfOlapError::ObjectStore)?,
102        );
103        let base_url = Url::parse(&format!("s3://{bucket}")).map_err(DfOlapError::UrlParse)?;
104        ctx.runtime_env().register_object_store(&base_url, store);
105        tracing::info!(bucket, "registered S3 object store for Vortex");
106    }
107
108    let _ = location; // suppress unused warning on non-cloud builds without the #[cfg]
109    Ok(ctx)
110}
111
112/// Parse the bucket name from an `s3://bucket/prefix` URL.
113#[cfg(feature = "cloud-storage")]
114fn parse_s3_bucket(url: &str) -> Result<String, DfOlapError> {
115    let parsed = Url::parse(url).map_err(DfOlapError::UrlParse)?;
116    if parsed.scheme() != "s3" {
117        return Err(DfOlapError::StorageConfig(format!(
118            "expected s3:// URL, got '{url}'"
119        )));
120    }
121    parsed
122        .host_str()
123        .map(|h| h.to_string())
124        .ok_or_else(|| DfOlapError::StorageConfig(format!("missing bucket name in URL '{url}'")))
125}
126
127/// Build the DataFusion listing URL for a specific table.
128///
129/// For local storage: `file:///base/table/`
130/// For S3 storage: `s3://bucket/prefix/table/`
131fn table_listing_url(location: &VortexLocation, table_name: &str) -> String {
132    match location {
133        VortexLocation::Local { base_path } => {
134            let dir = base_path.join(table_name);
135            // DataFusion expects file:// URLs with a trailing slash for directories.
136            format!("file://{}/", dir.to_string_lossy())
137        }
138        #[cfg(feature = "cloud-storage")]
139        VortexLocation::S3 { url } => {
140            let base = url.trim_end_matches('/');
141            format!("{base}/{table_name}/")
142        }
143    }
144}
145
146/// Register a Vortex-backed table as a `ListingTable` with DataFusion.
147async fn register_vortex_listing_table(
148    ctx: &SessionContext,
149    table_name: &str,
150    schema: &SchemaRef,
151    listing_url: &str,
152) -> Result<(), DfOlapError> {
153    let vortex_format = Arc::new(VortexFormat::new(
154        <VortexSession as VortexSessionDefault>::default(),
155    ));
156    let listing_options = ListingOptions::new(vortex_format as _)
157        .with_file_extension("vortex")
158        .with_session_config_options(ctx.state().config());
159
160    let table_url = ListingTableUrl::parse(listing_url)?;
161
162    let config = ListingTableConfig::new(table_url)
163        .with_listing_options(listing_options)
164        .with_schema(schema.clone());
165
166    let listing_table = ListingTable::try_new(config)?;
167
168    let _ = ctx.deregister_table(table_name);
169    ctx.register_table(table_name, Arc::new(listing_table))?;
170    Ok(())
171}
172
173/// DataFusion-backed OLAP engine.
174///
175/// Supports pluggable storage via [`StorageMode`]:
176/// - `InMemory`: stores Arrow data in memory, registers as `MemTable`
177/// - `Vortex`: stores data as `.vortex` files (local) or objects (S3),
178///   registered as `ListingTable` with `VortexFormat`
179pub struct DataFusionEngine {
180    ctx: RwLock<SessionContext>,
181    /// In-memory table store (only used in `InMemory` mode).
182    tables: RwLock<HashMap<String, TableData>>,
183    /// Vortex-backed table metadata (only used in `Vortex` mode).
184    vortex_tables: RwLock<HashMap<String, VortexTableMeta>>,
185    /// Resolved Vortex location (only valid when storage_mode is Vortex).
186    vortex_location: Option<VortexLocation>,
187    /// Storage mode for this engine instance.
188    storage_mode: StorageMode,
189    /// Monotonic counter for generating unique temporary table names.
190    tmp_counter: AtomicU64,
191}
192
193impl DataFusionEngine {
194    /// Create a new DataFusion engine with the given storage mode.
195    ///
196    /// For Vortex local mode, creates the base directory if it doesn't exist.
197    /// For Vortex S3 mode (requires `cloud-storage`), registers the object store.
198    pub fn with_storage(mode: StorageMode) -> Result<Self, DfOlapError> {
199        let vortex_location = match mode.classify() {
200            Ok(Some(loc)) => {
201                // For local storage, ensure the base directory exists.
202                #[cfg(not(feature = "cloud-storage"))]
203                {
204                    let VortexLocation::Local { ref base_path } = loc;
205                    std::fs::create_dir_all(base_path)?;
206                }
207                #[cfg(feature = "cloud-storage")]
208                if let VortexLocation::Local { ref base_path } = loc {
209                    std::fs::create_dir_all(base_path)?;
210                }
211                Some(loc)
212            }
213            Ok(None) => None,
214            Err(e) => return Err(DfOlapError::StorageConfig(e)),
215        };
216
217        let ctx = if let Some(ref loc) = vortex_location {
218            build_vortex_session_context(loc)?
219        } else {
220            // InMemory: plain SessionContext, no Vortex needed.
221            SessionContext::new()
222        };
223
224        Ok(Self {
225            ctx: RwLock::new(ctx),
226            tables: RwLock::new(HashMap::new()),
227            vortex_tables: RwLock::new(HashMap::new()),
228            vortex_location,
229            storage_mode: mode,
230            tmp_counter: AtomicU64::new(0),
231        })
232    }
233
234    /// Create a new in-memory DataFusion engine (default).
235    pub fn new() -> Self {
236        Self::with_storage(StorageMode::InMemory).expect("in-memory mode cannot fail")
237    }
238
239    /// Returns the storage mode of this engine.
240    pub fn storage_mode(&self) -> &StorageMode {
241        &self.storage_mode
242    }
243
244    /// Returns the Vortex location, or None for InMemory mode.
245    fn location(&self) -> Option<&VortexLocation> {
246        self.vortex_location.as_ref()
247    }
248
249    // -----------------------------------------------------------------------
250    // In-memory table registration
251    // -----------------------------------------------------------------------
252
253    /// Re-register a table with DataFusion's SessionContext from in-memory data.
254    async fn refresh_table_mem(&self, name: &str) -> Result<(), DfOlapError> {
255        let tables = self.tables.read().await;
256        let table_data = tables
257            .get(name)
258            .ok_or_else(|| DfOlapError::TableNotFound(name.to_string()))?;
259
260        let partitions = if table_data.batches.is_empty() {
261            vec![vec![]]
262        } else {
263            vec![table_data.batches.clone()]
264        };
265        let mem_table = MemTable::try_new(table_data.schema.clone(), partitions)?;
266
267        let ctx = self.ctx.write().await;
268        let _ = ctx.deregister_table(name);
269        ctx.register_table(name, Arc::new(mem_table))?;
270        Ok(())
271    }
272
273    // -----------------------------------------------------------------------
274    // Vortex-backed table helpers
275    // -----------------------------------------------------------------------
276
277    /// Re-register a Vortex-backed table as a ListingTable with DataFusion.
278    async fn refresh_table_vortex(&self, name: &str) -> Result<(), DfOlapError> {
279        let vortex_tables = self.vortex_tables.read().await;
280        let meta = vortex_tables
281            .get(name)
282            .ok_or_else(|| DfOlapError::TableNotFound(name.to_string()))?;
283        let schema = meta.schema.clone();
284        let listing_url = meta.table_url.clone();
285        drop(vortex_tables);
286
287        let ctx = self.ctx.read().await;
288        register_vortex_listing_table(&ctx, name, &schema, &listing_url).await
289    }
290
291    /// Get the schema for a Vortex-backed table.
292    async fn vortex_table_schema(&self, table_name: &str) -> Result<SchemaRef, DfOlapError> {
293        let vortex_tables = self.vortex_tables.read().await;
294        vortex_tables
295            .get(table_name)
296            .map(|m| m.schema.clone())
297            .ok_or_else(|| DfOlapError::TableNotFound(table_name.to_string()))
298    }
299
300    /// Read all rows from a Vortex-backed table via DataFusion SQL.
301    async fn read_all_batches_vortex(
302        &self,
303        table_name: &str,
304    ) -> Result<(SchemaRef, Vec<RecordBatch>), DfOlapError> {
305        let schema = self.vortex_table_schema(table_name).await?;
306        let ctx = self.ctx.read().await;
307        let df = ctx.sql(&format!("SELECT * FROM \"{table_name}\"")).await?;
308        let batches = df.collect().await?;
309        Ok((schema, batches))
310    }
311
312    /// Clear all `.vortex` files from a table's storage prefix (local dir or
313    /// S3 prefix). Used by UPDATE / DELETE / schema-rewrite codepaths that
314    /// follow a read-modify-write cycle: callers download the table, mutate
315    /// it in memory, write the new version as fresh objects, then call this
316    /// to remove the stale ones. Without this step, listing-based reads
317    /// would surface both old and new objects, producing duplicates or stale
318    /// rows.
319    async fn clear_table_storage(&self, table_name: &str) -> Result<(), DfOlapError> {
320        let loc = self
321            .location()
322            .ok_or_else(|| DfOlapError::Other("expected Vortex location".into()))?;
323
324        match loc {
325            VortexLocation::Local { base_path } => {
326                let dir = base_path.join(table_name);
327                if !dir.exists() {
328                    return Ok(());
329                }
330                tokio::task::spawn_blocking(move || {
331                    let entries: Vec<_> = std::fs::read_dir(&dir)?
332                        .filter_map(|e| e.ok())
333                        .map(|e| e.path())
334                        .filter(|p| p.extension().is_some_and(|x| x == "vortex"))
335                        .collect();
336                    for path in entries {
337                        std::fs::remove_file(path)?;
338                    }
339                    Ok::<_, DfOlapError>(())
340                })
341                .await
342                .map_err(DfOlapError::from_join)?
343            }
344            #[cfg(feature = "cloud-storage")]
345            VortexLocation::S3 { url } => self.clear_s3_table_prefix(table_name, url).await,
346        }
347    }
348
349    /// List and delete every `.vortex` object under the table's S3 prefix.
350    /// Counterpart of `clear_table_storage` for cloud builds.
351    #[cfg(feature = "cloud-storage")]
352    async fn clear_s3_table_prefix(&self, table_name: &str, url: &str) -> Result<(), DfOlapError> {
353        use futures::StreamExt;
354        // `ObjectStore` only provides the type; `ObjectStoreExt` is the
355        // higher-level helper trait that exposes `delete`/`list` directly on
356        // `Arc<dyn ObjectStore>` (without manual stream handling).
357        #[allow(unused_imports)]
358        use object_store::{ObjectStore, ObjectStoreExt};
359
360        let bucket = parse_s3_bucket(url)?;
361        let table_prefix = {
362            // S3 prefix path *within* the bucket (no scheme/host). e.g. for
363            // `s3://my-bucket/rhei-data` and table `orders`, this is
364            // `rhei-data/orders/`.
365            let parsed = Url::parse(url).map_err(DfOlapError::UrlParse)?;
366            let trimmed = parsed.path().trim_start_matches('/').trim_end_matches('/');
367            if trimmed.is_empty() {
368                format!("{table_name}/")
369            } else {
370                format!("{trimmed}/{table_name}/")
371            }
372        };
373
374        // `runtime_env().object_store(...)` takes `impl AsRef<Url>`; pass an
375        // ObjectStoreUrl which implements that trait. This is the same handle
376        // we registered in `build_vortex_session_context`.
377        let osu_str = format!("s3://{bucket}/");
378        let osu =
379            datafusion::execution::object_store::ObjectStoreUrl::parse(&osu_str).map_err(|e| {
380                DfOlapError::Other(format!("invalid object-store URL '{osu_str}': {e}"))
381            })?;
382        let store = self
383            .ctx
384            .read()
385            .await
386            .runtime_env()
387            .object_store(osu)
388            .map_err(|e| {
389                DfOlapError::Other(format!(
390                    "object store for s3://{bucket} not registered: {e}"
391                ))
392            })?;
393
394        let prefix = object_store::path::Path::from(table_prefix.as_str());
395        let mut list = store.list(Some(&prefix));
396
397        // Collect first so we don't hold the listing stream open while issuing
398        // deletes (some object-store impls don't allow interleaved ops).
399        let mut to_delete: Vec<object_store::path::Path> = Vec::new();
400        while let Some(meta) = list.next().await {
401            let meta = meta.map_err(DfOlapError::ObjectStore)?;
402            if meta
403                .location
404                .extension()
405                .is_some_and(|ext| ext.eq_ignore_ascii_case("vortex"))
406            {
407                to_delete.push(meta.location);
408            }
409        }
410        for path in to_delete {
411            store
412                .delete(&path)
413                .await
414                .map_err(DfOlapError::ObjectStore)?;
415        }
416        Ok(())
417    }
418
419    /// Insert rows from Arrow RecordBatches into a Vortex-backed table via a
420    /// temporary MemTable registered in the same SessionContext.
421    ///
422    /// Strategy: register batches as `__tmp_load_<counter>` MemTable, then
423    /// `INSERT INTO <table> SELECT * FROM __tmp_...`, then deregister the temp
424    /// table. DataFusion's VortexSink handles the actual file writes.
425    ///
426    /// This is the Arrow-native bulk-load path: no SQL literal serialization.
427    async fn insert_arrow_into_vortex(
428        &self,
429        table_name: &str,
430        schema: &SchemaRef,
431        batches: &[RecordBatch],
432    ) -> Result<u64, DfOlapError> {
433        if batches.is_empty() {
434            return Ok(0);
435        }
436        let total_rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
437
438        let tmp_name = format!(
439            "__tmp_load_{}",
440            self.tmp_counter.fetch_add(1, Ordering::Relaxed)
441        );
442
443        let mem_table = MemTable::try_new(schema.clone(), vec![batches.to_vec()])?;
444
445        {
446            let ctx = self.ctx.read().await;
447            let _ = ctx.deregister_table(&tmp_name);
448            ctx.register_table(&tmp_name, Arc::new(mem_table))?;
449
450            // INSERT INTO <target> SELECT * FROM <tmp>
451            ctx.sql(&format!(
452                "INSERT INTO \"{table_name}\" SELECT * FROM \"{tmp_name}\""
453            ))
454            .await?
455            .collect()
456            .await?;
457
458            let _ = ctx.deregister_table(&tmp_name);
459        }
460
461        Ok(total_rows)
462    }
463
464    /// Re-insert Arrow batches into a Vortex table after clearing existing files.
465    ///
466    /// Used by UPDATE/DELETE read-modify-write cycle.
467    async fn rewrite_vortex_table(
468        &self,
469        table_name: &str,
470        schema: &SchemaRef,
471        batches: &[RecordBatch],
472    ) -> Result<(), DfOlapError> {
473        // Clear existing files.
474        self.clear_table_storage(table_name).await?;
475
476        // Re-register the (now empty) listing table so DataFusion doesn't see stale data.
477        self.refresh_table_vortex(table_name).await?;
478
479        if !batches.is_empty() {
480            self.insert_arrow_into_vortex(table_name, schema, batches)
481                .await?;
482            // Refresh again so DataFusion picks up the newly written files.
483            self.refresh_table_vortex(table_name).await?;
484        }
485
486        Ok(())
487    }
488
489    // -----------------------------------------------------------------------
490    // SQL execution helper
491    // -----------------------------------------------------------------------
492
493    async fn execute_sql(&self, sql: &str) -> Result<Vec<RecordBatch>, DfOlapError> {
494        let ctx = self.ctx.read().await;
495        let df = ctx.sql(sql).await?;
496        let batches = df.collect().await?;
497        Ok(batches)
498    }
499
500    // -----------------------------------------------------------------------
501    // In-memory DML
502    // -----------------------------------------------------------------------
503
504    async fn execute_insert_mem(&self, sql: &str) -> Result<u64, DfOlapError> {
505        let (table_name, col_names, batches) = parse_insert_values(sql)?;
506
507        let mut tables = self.tables.write().await;
508        let table_data = tables
509            .get_mut(&table_name)
510            .ok_or_else(|| DfOlapError::TableNotFound(table_name.clone()))?;
511
512        let table_schema = table_data.schema.clone();
513        let (aligned_batches, total_rows) =
514            align_batches_to_schema(&table_schema, &col_names, &batches)?;
515        table_data.batches.extend(aligned_batches);
516        drop(tables);
517
518        self.refresh_table_mem(&table_name).await?;
519        Ok(total_rows)
520    }
521
522    async fn execute_update_mem(&self, sql: &str) -> Result<u64, DfOlapError> {
523        let (table_name, assignments, where_clause) = parse_update(sql)?;
524
525        let mut tables = self.tables.write().await;
526        let table_data = tables
527            .get_mut(&table_name)
528            .ok_or_else(|| DfOlapError::TableNotFound(table_name.clone()))?;
529
530        let schema = table_data.schema.clone();
531        let mut updated_count = 0u64;
532
533        let all_rows = flatten_batches(&table_data.batches, &schema)?;
534        if let Some(all_rows) = all_rows {
535            let (updated_batch, count) =
536                apply_update(&all_rows, &schema, &assignments, &where_clause)?;
537            updated_count = count;
538            table_data.batches = vec![updated_batch];
539        }
540
541        drop(tables);
542        self.refresh_table_mem(&table_name).await?;
543        Ok(updated_count)
544    }
545
546    async fn execute_delete_mem(&self, sql: &str) -> Result<u64, DfOlapError> {
547        let (table_name, where_clause) = parse_delete(sql)?;
548
549        let mut tables = self.tables.write().await;
550        let table_data = tables
551            .get_mut(&table_name)
552            .ok_or_else(|| DfOlapError::TableNotFound(table_name.clone()))?;
553
554        let schema = table_data.schema.clone();
555        let all_rows = flatten_batches(&table_data.batches, &schema)?;
556
557        if let Some(all_rows) = all_rows {
558            let (filtered_batch, deleted_count) = apply_delete(&all_rows, &schema, &where_clause)?;
559            table_data.batches = if filtered_batch.num_rows() > 0 {
560                vec![filtered_batch]
561            } else {
562                vec![]
563            };
564            drop(tables);
565            self.refresh_table_mem(&table_name).await?;
566            Ok(deleted_count)
567        } else {
568            Ok(0)
569        }
570    }
571
572    // -----------------------------------------------------------------------
573    // Vortex DML
574    // -----------------------------------------------------------------------
575
576    async fn execute_insert_vortex(&self, sql: &str) -> Result<u64, DfOlapError> {
577        let (table_name, col_names, batches) = parse_insert_values(sql)?;
578
579        let schema = self.vortex_table_schema(&table_name).await?;
580        let (aligned_batches, total_rows) = align_batches_to_schema(&schema, &col_names, &batches)?;
581
582        self.insert_arrow_into_vortex(&table_name, &schema, &aligned_batches)
583            .await?;
584        // Refresh listing table so DataFusion sees the new file.
585        self.refresh_table_vortex(&table_name).await?;
586
587        Ok(total_rows)
588    }
589
590    async fn execute_update_vortex(&self, sql: &str) -> Result<u64, DfOlapError> {
591        let (table_name, assignments, where_clause) = parse_update(sql)?;
592
593        let (schema, existing_batches) = self.read_all_batches_vortex(&table_name).await?;
594        let all_rows = flatten_batches(&existing_batches, &schema)?;
595
596        if let Some(all_rows) = all_rows {
597            let (updated_batch, count) =
598                apply_update(&all_rows, &schema, &assignments, &where_clause)?;
599            let new_batches = if updated_batch.num_rows() > 0 {
600                vec![updated_batch]
601            } else {
602                vec![]
603            };
604            self.rewrite_vortex_table(&table_name, &schema, &new_batches)
605                .await?;
606            Ok(count)
607        } else {
608            Ok(0)
609        }
610    }
611
612    async fn execute_delete_vortex(&self, sql: &str) -> Result<u64, DfOlapError> {
613        let (table_name, where_clause) = parse_delete(sql)?;
614
615        let (schema, existing_batches) = self.read_all_batches_vortex(&table_name).await?;
616        let all_rows = flatten_batches(&existing_batches, &schema)?;
617
618        if let Some(all_rows) = all_rows {
619            let (filtered_batch, deleted_count) = apply_delete(&all_rows, &schema, &where_clause)?;
620            let new_batches = if filtered_batch.num_rows() > 0 {
621                vec![filtered_batch]
622            } else {
623                vec![]
624            };
625            self.rewrite_vortex_table(&table_name, &schema, &new_batches)
626                .await?;
627            Ok(deleted_count)
628        } else {
629            Ok(0)
630        }
631    }
632
633    // -----------------------------------------------------------------------
634    // Unified DML dispatch
635    // -----------------------------------------------------------------------
636
637    async fn execute_insert(&self, sql: &str) -> Result<u64, DfOlapError> {
638        match &self.storage_mode {
639            StorageMode::InMemory => self.execute_insert_mem(sql).await,
640            StorageMode::Vortex { .. } => self.execute_insert_vortex(sql).await,
641        }
642    }
643
644    async fn execute_update(&self, sql: &str) -> Result<u64, DfOlapError> {
645        match &self.storage_mode {
646            StorageMode::InMemory => self.execute_update_mem(sql).await,
647            StorageMode::Vortex { .. } => self.execute_update_vortex(sql).await,
648        }
649    }
650
651    async fn execute_delete(&self, sql: &str) -> Result<u64, DfOlapError> {
652        match &self.storage_mode {
653            StorageMode::InMemory => self.execute_delete_mem(sql).await,
654            StorageMode::Vortex { .. } => self.execute_delete_vortex(sql).await,
655        }
656    }
657}
658
659impl Default for DataFusionEngine {
660    fn default() -> Self {
661        Self::new()
662    }
663}
664
665impl rhei_core::OlapEngine for DataFusionEngine {
666    type Error = DfOlapError;
667
668    async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
669        debug!(sql, "DataFusion query");
670        self.execute_sql(sql).await
671    }
672
673    async fn query_stream(
674        &self,
675        sql: &str,
676    ) -> Result<rhei_core::RecordBatchBoxStream, Self::Error> {
677        debug!(sql, "DataFusion query_stream");
678        let ctx = self.ctx.read().await;
679        let df = ctx.sql(sql).await?;
680        let stream = df.execute_stream().await?;
681        let mapped = Box::pin(StreamAdapter(stream));
682        Ok(mapped)
683    }
684
685    async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
686        debug!(sql, "DataFusion execute");
687        let trimmed = sql.trim();
688        let upper = trimmed.to_ascii_uppercase();
689
690        if upper.starts_with("INSERT") {
691            self.execute_insert(trimmed).await
692        } else if upper.starts_with("UPDATE") {
693            self.execute_update(trimmed).await
694        } else if upper.starts_with("DELETE") {
695            self.execute_delete(trimmed).await
696        } else if upper.starts_with("BEGIN")
697            || upper.starts_with("COMMIT")
698            || upper.starts_with("ROLLBACK")
699        {
700            // Transaction markers — no-op for DataFusion
701            Ok(0)
702        } else {
703            // DDL or other — execute via DataFusion SQL
704            let ctx = self.ctx.read().await;
705            let df = ctx.sql(trimmed).await?;
706            let _ = df.collect().await?;
707            Ok(0)
708        }
709    }
710
711    async fn load_arrow(&self, table: &str, batches: &[RecordBatch]) -> Result<u64, Self::Error> {
712        if batches.is_empty() {
713            return Ok(0);
714        }
715
716        debug!(table, batch_count = batches.len(), "DataFusion load_arrow");
717        rhei_core::validate_identifier(table).map_err(|e| DfOlapError::Other(e.to_string()))?;
718
719        let total_rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
720
721        match &self.storage_mode {
722            StorageMode::InMemory => {
723                let mut tables = self.tables.write().await;
724                let table_data = tables
725                    .get_mut(table)
726                    .ok_or_else(|| DfOlapError::TableNotFound(table.to_string()))?;
727
728                for batch in batches {
729                    table_data.batches.push(batch.clone());
730                }
731                drop(tables);
732                self.refresh_table_mem(table).await?;
733            }
734            StorageMode::Vortex { .. } => {
735                let schema = self.vortex_table_schema(table).await?;
736                self.insert_arrow_into_vortex(table, &schema, batches)
737                    .await?;
738                self.refresh_table_vortex(table).await?;
739            }
740        }
741
742        Ok(total_rows)
743    }
744
745    async fn create_table(
746        &self,
747        table_name: &str,
748        schema: &SchemaRef,
749        _primary_key: &[String],
750    ) -> Result<(), Self::Error> {
751        rhei_core::validate_identifier(table_name)
752            .map_err(|e| DfOlapError::Other(e.to_string()))?;
753        for field in schema.fields() {
754            rhei_core::validate_identifier(field.name())
755                .map_err(|e| DfOlapError::Other(e.to_string()))?;
756        }
757
758        debug!(
759            table = table_name,
760            storage = ?self.storage_mode,
761            "DataFusion create_table"
762        );
763
764        match &self.storage_mode {
765            StorageMode::InMemory => {
766                let mut tables = self.tables.write().await;
767                if tables.contains_key(table_name) {
768                    return Ok(());
769                }
770                tables.insert(
771                    table_name.to_string(),
772                    TableData {
773                        schema: schema.clone(),
774                        batches: vec![],
775                    },
776                );
777                drop(tables);
778                self.refresh_table_mem(table_name).await?;
779            }
780            StorageMode::Vortex { .. } => {
781                let loc = self
782                    .location()
783                    .expect("Vortex mode must have a resolved location");
784
785                // Idempotent: skip if already registered.
786                {
787                    let vortex_tables = self.vortex_tables.read().await;
788                    if vortex_tables.contains_key(table_name) {
789                        return Ok(());
790                    }
791                }
792
793                let listing_url = table_listing_url(loc, table_name);
794
795                // Ensure the local table directory exists.
796                #[cfg(not(feature = "cloud-storage"))]
797                {
798                    let VortexLocation::Local { ref base_path } = *loc;
799                    let dir = base_path.join(table_name);
800                    tokio::fs::create_dir_all(&dir).await?;
801                }
802                #[cfg(feature = "cloud-storage")]
803                if let VortexLocation::Local { ref base_path } = *loc {
804                    let dir = base_path.join(table_name);
805                    tokio::fs::create_dir_all(&dir).await?;
806                }
807
808                let mut vortex_tables = self.vortex_tables.write().await;
809                vortex_tables.insert(
810                    table_name.to_string(),
811                    VortexTableMeta {
812                        schema: schema.clone(),
813                        table_url: listing_url.clone(),
814                    },
815                );
816                drop(vortex_tables);
817
818                // Register `CREATE EXTERNAL TABLE` equivalent via ListingTable.
819                let ctx = self.ctx.read().await;
820                register_vortex_listing_table(&ctx, table_name, schema, &listing_url).await?;
821            }
822        }
823
824        Ok(())
825    }
826
827    async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
828        match &self.storage_mode {
829            StorageMode::InMemory => {
830                let tables = self.tables.read().await;
831                Ok(tables.contains_key(table_name))
832            }
833            StorageMode::Vortex { .. } => {
834                let vortex_tables = self.vortex_tables.read().await;
835                Ok(vortex_tables.contains_key(table_name))
836            }
837        }
838    }
839
840    async fn add_column(
841        &self,
842        table_name: &str,
843        column_name: &str,
844        data_type: &DataType,
845    ) -> Result<(), Self::Error> {
846        rhei_core::validate_identifier(table_name)
847            .map_err(|e| DfOlapError::Other(e.to_string()))?;
848        rhei_core::validate_identifier(column_name)
849            .map_err(|e| DfOlapError::Other(e.to_string()))?;
850
851        debug!(
852            table = table_name,
853            column = column_name,
854            "DataFusion add_column"
855        );
856
857        match &self.storage_mode {
858            StorageMode::InMemory => {
859                let mut tables = self.tables.write().await;
860                let table_data = tables
861                    .get_mut(table_name)
862                    .ok_or_else(|| DfOlapError::TableNotFound(table_name.to_string()))?;
863
864                let new_schema = append_field(&table_data.schema, column_name, data_type);
865                let new_batches =
866                    extend_batches_with_null_column(&table_data.batches, &new_schema, data_type)?;
867                table_data.schema = new_schema;
868                table_data.batches = new_batches;
869                drop(tables);
870                self.refresh_table_mem(table_name).await?;
871            }
872            StorageMode::Vortex { .. } => {
873                let (old_schema, existing_batches) =
874                    self.read_all_batches_vortex(table_name).await?;
875                let new_schema = append_field(&old_schema, column_name, data_type);
876                let new_batches =
877                    extend_batches_with_null_column(&existing_batches, &new_schema, data_type)?;
878
879                // Update schema in metadata first, then rewrite.
880                {
881                    let mut vortex_tables = self.vortex_tables.write().await;
882                    if let Some(meta) = vortex_tables.get_mut(table_name) {
883                        meta.schema = new_schema.clone();
884                    }
885                }
886
887                self.clear_table_storage(table_name).await?;
888                // Re-register with the new schema.
889                self.refresh_table_vortex(table_name).await?;
890
891                if !new_batches.is_empty() {
892                    self.insert_arrow_into_vortex(table_name, &new_schema, &new_batches)
893                        .await?;
894                    self.refresh_table_vortex(table_name).await?;
895                }
896            }
897        }
898
899        Ok(())
900    }
901
902    async fn drop_column(&self, table_name: &str, column_name: &str) -> Result<(), Self::Error> {
903        rhei_core::validate_identifier(table_name)
904            .map_err(|e| DfOlapError::Other(e.to_string()))?;
905        rhei_core::validate_identifier(column_name)
906            .map_err(|e| DfOlapError::Other(e.to_string()))?;
907
908        debug!(
909            table = table_name,
910            column = column_name,
911            "DataFusion drop_column"
912        );
913
914        match &self.storage_mode {
915            StorageMode::InMemory => {
916                let mut tables = self.tables.write().await;
917                let table_data = tables
918                    .get_mut(table_name)
919                    .ok_or_else(|| DfOlapError::TableNotFound(table_name.to_string()))?;
920
921                let col_idx = find_column_index(&table_data.schema, column_name, table_name)?;
922                let new_schema = remove_field(&table_data.schema, col_idx);
923                let new_batches =
924                    remove_column_from_batches(&table_data.batches, &new_schema, col_idx)?;
925                table_data.schema = new_schema;
926                table_data.batches = new_batches;
927                drop(tables);
928                self.refresh_table_mem(table_name).await?;
929            }
930            StorageMode::Vortex { .. } => {
931                let (old_schema, existing_batches) =
932                    self.read_all_batches_vortex(table_name).await?;
933                let col_idx = find_column_index(&old_schema, column_name, table_name)?;
934                let new_schema = remove_field(&old_schema, col_idx);
935                let new_batches =
936                    remove_column_from_batches(&existing_batches, &new_schema, col_idx)?;
937
938                // Update schema in metadata first, then rewrite.
939                {
940                    let mut vortex_tables = self.vortex_tables.write().await;
941                    if let Some(meta) = vortex_tables.get_mut(table_name) {
942                        meta.schema = new_schema.clone();
943                    }
944                }
945
946                self.clear_table_storage(table_name).await?;
947                self.refresh_table_vortex(table_name).await?;
948
949                if !new_batches.is_empty() {
950                    self.insert_arrow_into_vortex(table_name, &new_schema, &new_batches)
951                        .await?;
952                    self.refresh_table_vortex(table_name).await?;
953                }
954            }
955        }
956
957        Ok(())
958    }
959}
960
961// ---------------------------------------------------------------------------
962// Schema / batch helpers (shared between in-memory and Vortex modes)
963// ---------------------------------------------------------------------------
964
965fn append_field(schema: &SchemaRef, column_name: &str, data_type: &DataType) -> SchemaRef {
966    let mut fields: Vec<arrow::datatypes::Field> =
967        schema.fields().iter().map(|f| f.as_ref().clone()).collect();
968    fields.push(arrow::datatypes::Field::new(
969        column_name,
970        data_type.clone(),
971        true,
972    ));
973    Arc::new(arrow::datatypes::Schema::new(fields))
974}
975
976fn remove_field(schema: &SchemaRef, col_idx: usize) -> SchemaRef {
977    let fields: Vec<arrow::datatypes::Field> = schema
978        .fields()
979        .iter()
980        .enumerate()
981        .filter(|(i, _)| *i != col_idx)
982        .map(|(_, f)| f.as_ref().clone())
983        .collect();
984    Arc::new(arrow::datatypes::Schema::new(fields))
985}
986
987fn find_column_index(
988    schema: &SchemaRef,
989    column_name: &str,
990    table_name: &str,
991) -> Result<usize, DfOlapError> {
992    schema
993        .fields()
994        .iter()
995        .position(|f| f.name() == column_name)
996        .ok_or_else(|| {
997            DfOlapError::Other(format!(
998                "column '{}' not found in table '{}'",
999                column_name, table_name
1000            ))
1001        })
1002}
1003
1004fn extend_batches_with_null_column(
1005    batches: &[RecordBatch],
1006    new_schema: &SchemaRef,
1007    data_type: &DataType,
1008) -> Result<Vec<RecordBatch>, DfOlapError> {
1009    let mut new_batches = Vec::with_capacity(batches.len());
1010    for batch in batches {
1011        let null_array = arrow::array::new_null_array(data_type, batch.num_rows());
1012        let mut columns: Vec<Arc<dyn Array>> = (0..batch.num_columns())
1013            .map(|i| batch.column(i).clone())
1014            .collect();
1015        columns.push(null_array);
1016        new_batches.push(RecordBatch::try_new(new_schema.clone(), columns)?);
1017    }
1018    Ok(new_batches)
1019}
1020
1021fn remove_column_from_batches(
1022    batches: &[RecordBatch],
1023    new_schema: &SchemaRef,
1024    col_idx: usize,
1025) -> Result<Vec<RecordBatch>, DfOlapError> {
1026    let mut new_batches = Vec::with_capacity(batches.len());
1027    for batch in batches {
1028        let columns: Vec<Arc<dyn Array>> = (0..batch.num_columns())
1029            .filter(|i| *i != col_idx)
1030            .map(|i| batch.column(i).clone())
1031            .collect();
1032        new_batches.push(RecordBatch::try_new(new_schema.clone(), columns)?);
1033    }
1034    Ok(new_batches)
1035}
1036
1037/// Align parsed INSERT batches to the table schema (reorder + cast columns).
1038fn align_batches_to_schema(
1039    table_schema: &SchemaRef,
1040    col_names: &[String],
1041    batches: &[RecordBatch],
1042) -> Result<(Vec<RecordBatch>, u64), DfOlapError> {
1043    let mut aligned_batches = Vec::with_capacity(batches.len());
1044    let mut total_rows = 0u64;
1045    for batch in batches {
1046        let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(table_schema.fields().len());
1047        for field in table_schema.fields() {
1048            let idx = col_names
1049                .iter()
1050                .position(|c| c == field.name())
1051                .ok_or_else(|| {
1052                    DfOlapError::SchemaMismatch(format!(
1053                        "column '{}' not in INSERT column list",
1054                        field.name()
1055                    ))
1056                })?;
1057            let col = batch.column(idx);
1058            let col = if col.data_type() != field.data_type() {
1059                arrow::compute::cast(col, field.data_type())?
1060            } else {
1061                col.clone()
1062            };
1063            columns.push(col);
1064        }
1065        let aligned = RecordBatch::try_new(table_schema.clone(), columns)?;
1066        total_rows += aligned.num_rows() as u64;
1067        aligned_batches.push(aligned);
1068    }
1069    Ok((aligned_batches, total_rows))
1070}
1071
1072/// Adapter: maps DataFusion's `SendableRecordBatchStream` to `RecordBatchBoxStream`.
1073struct StreamAdapter(datafusion::physical_plan::SendableRecordBatchStream);
1074
1075impl futures_core::Stream for StreamAdapter {
1076    type Item = Result<RecordBatch, Box<dyn std::error::Error + Send + Sync>>;
1077
1078    fn poll_next(
1079        mut self: std::pin::Pin<&mut Self>,
1080        cx: &mut std::task::Context<'_>,
1081    ) -> std::task::Poll<Option<Self::Item>> {
1082        std::pin::Pin::new(&mut self.0).poll_next(cx).map(|opt| {
1083            opt.map(|r| r.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>))
1084        })
1085    }
1086}
1087
1088/// A cheaply-cloneable, `Arc`-wrapped [`DataFusionEngine`] that implements
1089/// [`rhei_core::OlapEngine`].
1090///
1091/// All method calls delegate to the inner engine.  Cloning a
1092/// `SharedDataFusionEngine` only increments the reference count — the
1093/// underlying engine (and its table store) is shared.
1094///
1095/// Use [`SharedDataFusionEngine::new`] to construct from a
1096/// [`DataFusionEngine`], or access the inner engine through the public `Deref`
1097/// impl or the `0` field.
1098#[derive(Clone)]
1099pub struct SharedDataFusionEngine(pub Arc<DataFusionEngine>);
1100
1101impl SharedDataFusionEngine {
1102    /// Wrap a [`DataFusionEngine`] in an `Arc` so it can be shared across tasks.
1103    pub fn new(engine: DataFusionEngine) -> Self {
1104        Self(Arc::new(engine))
1105    }
1106}
1107
1108impl std::ops::Deref for SharedDataFusionEngine {
1109    type Target = DataFusionEngine;
1110    fn deref(&self) -> &Self::Target {
1111        &self.0
1112    }
1113}
1114
1115impl rhei_core::OlapEngine for SharedDataFusionEngine {
1116    type Error = DfOlapError;
1117
1118    async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
1119        self.0.query(sql).await
1120    }
1121
1122    async fn query_stream(
1123        &self,
1124        sql: &str,
1125    ) -> Result<rhei_core::RecordBatchBoxStream, Self::Error> {
1126        self.0.query_stream(sql).await
1127    }
1128
1129    async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
1130        self.0.execute(sql).await
1131    }
1132
1133    async fn load_arrow(&self, table: &str, batches: &[RecordBatch]) -> Result<u64, Self::Error> {
1134        self.0.load_arrow(table, batches).await
1135    }
1136
1137    async fn create_table(
1138        &self,
1139        table_name: &str,
1140        schema: &SchemaRef,
1141        primary_key: &[String],
1142    ) -> Result<(), Self::Error> {
1143        self.0.create_table(table_name, schema, primary_key).await
1144    }
1145
1146    async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
1147        self.0.table_exists(table_name).await
1148    }
1149
1150    async fn add_column(
1151        &self,
1152        table_name: &str,
1153        column_name: &str,
1154        data_type: &DataType,
1155    ) -> Result<(), Self::Error> {
1156        self.0.add_column(table_name, column_name, data_type).await
1157    }
1158
1159    async fn drop_column(&self, table_name: &str, column_name: &str) -> Result<(), Self::Error> {
1160        self.0.drop_column(table_name, column_name).await
1161    }
1162}
1163
1164// ---------------------------------------------------------------------------
1165// SQL parsing helpers — sqlparser-rs AST based
1166// ---------------------------------------------------------------------------
1167
1168/// Convert a sqlparser `Expr` from a VALUES list to a SQL literal string.
1169fn expr_to_sql_literal(expr: &Expr) -> Result<String, DfOlapError> {
1170    match expr {
1171        Expr::Value(v) => match &v.value {
1172            Value::Number(n, _) => Ok(n.clone()),
1173            Value::SingleQuotedString(s) => Ok(format!("'{}'", s.replace('\'', "''"))),
1174            Value::Boolean(b) => Ok(if *b { "TRUE".into() } else { "FALSE".into() }),
1175            Value::Null => Ok("NULL".into()),
1176            other => Err(DfOlapError::Other(format!(
1177                "unsupported value literal: {other:?}"
1178            ))),
1179        },
1180        Expr::UnaryOp {
1181            op: UnaryOperator::Minus,
1182            expr: inner,
1183        } => {
1184            if let Expr::Value(v) = inner.as_ref() {
1185                if let Value::Number(n, _) = &v.value {
1186                    return Ok(format!("-{n}"));
1187                }
1188            }
1189            Err(DfOlapError::Other(format!(
1190                "unsupported unary expression: {expr}"
1191            )))
1192        }
1193        other => Err(DfOlapError::Other(format!(
1194            "unsupported expression in VALUES: {other}"
1195        ))),
1196    }
1197}
1198
1199/// Extract the unquoted column name from an identifier expression.
1200fn ident_from_expr(expr: &Expr) -> Result<String, DfOlapError> {
1201    match expr {
1202        Expr::Identifier(ident) => Ok(ident.value.clone()),
1203        Expr::CompoundIdentifier(parts) => parts
1204            .last()
1205            .map(|i| i.value.clone())
1206            .ok_or_else(|| DfOlapError::Other("empty compound identifier".into())),
1207        other => Err(DfOlapError::Other(format!(
1208            "expected column name, got: {other}"
1209        ))),
1210    }
1211}
1212
1213/// Extract `col = val` and `col IS NULL` pairs from a WHERE expression tree.
1214fn extract_where_conditions(expr: &Expr) -> Result<Vec<(String, String)>, DfOlapError> {
1215    match expr {
1216        Expr::BinaryOp {
1217            left,
1218            op: BinaryOperator::And,
1219            right,
1220        } => {
1221            let mut conditions = extract_where_conditions(left)?;
1222            conditions.extend(extract_where_conditions(right)?);
1223            Ok(conditions)
1224        }
1225        Expr::BinaryOp {
1226            left,
1227            op: BinaryOperator::Eq,
1228            right,
1229        } => {
1230            let col = ident_from_expr(left)?;
1231            let val = expr_to_sql_literal(right)?;
1232            Ok(vec![(col, val)])
1233        }
1234        Expr::IsNull(inner) => {
1235            let col = ident_from_expr(inner)?;
1236            Ok(vec![(col, "NULL".into())])
1237        }
1238        Expr::IsNotNull(inner) => {
1239            let col = ident_from_expr(inner)?;
1240            Ok(vec![(col, "__IS_NOT_NULL__".into())])
1241        }
1242        Expr::Nested(inner) => extract_where_conditions(inner),
1243        other => Err(DfOlapError::Other(format!(
1244            "unsupported WHERE expression: {other}"
1245        ))),
1246    }
1247}
1248
1249/// Parse `INSERT INTO <table> (<cols>) VALUES (<vals>), ...`
1250///
1251/// Returns `(table_name, column_names, Vec<RecordBatch>)`.
1252fn parse_insert_values(sql: &str) -> Result<(String, Vec<String>, Vec<RecordBatch>), DfOlapError> {
1253    let mut stmts = Parser::parse_sql(&SQLiteDialect {}, sql)
1254        .map_err(|e| DfOlapError::Other(format!("failed to parse INSERT: {e}")))?;
1255
1256    let stmt = stmts
1257        .pop()
1258        .ok_or_else(|| DfOlapError::Other("empty SQL statement".into()))?;
1259
1260    let insert = match stmt {
1261        Statement::Insert(ins) => ins,
1262        other => {
1263            return Err(DfOlapError::Other(format!(
1264                "expected INSERT statement, got: {other:?}"
1265            )));
1266        }
1267    };
1268
1269    let table_name = match &insert.table {
1270        TableObject::TableName(obj_name) => obj_name
1271            .0
1272            .last()
1273            .and_then(|p| p.as_ident())
1274            .map(|id| id.value.clone())
1275            .ok_or_else(|| DfOlapError::Other("empty table name in INSERT".into()))?,
1276        TableObject::TableFunction(_) => {
1277            return Err(DfOlapError::Other(
1278                "INSERT INTO TABLE FUNCTION not supported".into(),
1279            ));
1280        }
1281    };
1282
1283    rhei_core::validate_identifier(&table_name).map_err(|e| DfOlapError::Other(e.to_string()))?;
1284
1285    let col_name_strings: Vec<String> = insert.columns.iter().map(|id| id.value.clone()).collect();
1286
1287    let source = match insert.source {
1288        Some(q) => q,
1289        None => return Ok((table_name, col_name_strings, vec![])),
1290    };
1291
1292    let values = match *source.body {
1293        SetExpr::Values(v) => v,
1294        other => {
1295            return Err(DfOlapError::Other(format!(
1296                "INSERT source is not a VALUES clause: {other:?}"
1297            )));
1298        }
1299    };
1300
1301    if values.rows.is_empty() {
1302        return Ok((table_name, col_name_strings, vec![]));
1303    }
1304
1305    let rows: Vec<Vec<String>> = values
1306        .rows
1307        .iter()
1308        .map(|row| {
1309            row.iter()
1310                .map(expr_to_sql_literal)
1311                .collect::<Result<_, _>>()
1312        })
1313        .collect::<Result<_, _>>()?;
1314
1315    let col_name_refs: Vec<&str> = col_name_strings.iter().map(|s| s.as_str()).collect();
1316    let num_cols = col_name_refs.len();
1317
1318    if num_cols == 0 {
1319        return Err(DfOlapError::Other(format!(
1320            "INSERT INTO {table_name} requires an explicit column list; `VALUES (...)` without columns is not supported"
1321        )));
1322    }
1323
1324    let batch = build_record_batch_from_values(&col_name_refs, &rows, num_cols)?;
1325    Ok((table_name, col_name_strings, vec![batch]))
1326}
1327
1328/// Build an Arrow RecordBatch from parsed SQL values.
1329fn build_record_batch_from_values(
1330    col_names: &[&str],
1331    rows: &[Vec<String>],
1332    num_cols: usize,
1333) -> Result<RecordBatch, DfOlapError> {
1334    use arrow::array::*;
1335    use arrow::datatypes::{Field, Schema};
1336
1337    let mut types = vec![DataType::Utf8; num_cols];
1338    for col_idx in 0..num_cols {
1339        for row in rows {
1340            if col_idx < row.len() {
1341                let val = &row[col_idx];
1342                let upper = val.to_ascii_uppercase();
1343                if upper == "NULL" {
1344                    continue;
1345                }
1346                if upper == "TRUE" || upper == "FALSE" {
1347                    types[col_idx] = DataType::Boolean;
1348                    break;
1349                }
1350                if val.starts_with('\'') {
1351                    types[col_idx] = DataType::Utf8;
1352                    break;
1353                }
1354                if val.contains('.') {
1355                    if val.parse::<f64>().is_ok() {
1356                        types[col_idx] = DataType::Float64;
1357                        break;
1358                    }
1359                } else if val.parse::<i64>().is_ok() {
1360                    types[col_idx] = DataType::Int64;
1361                    break;
1362                }
1363                break;
1364            }
1365        }
1366    }
1367
1368    let fields: Vec<Field> = col_names
1369        .iter()
1370        .zip(types.iter())
1371        .map(|(name, dt)| Field::new(*name, dt.clone(), true))
1372        .collect();
1373    let schema = Arc::new(Schema::new(fields));
1374
1375    let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(num_cols);
1376    for col_idx in 0..num_cols {
1377        let col_values: Vec<&str> = rows
1378            .iter()
1379            .map(|row| {
1380                if col_idx < row.len() {
1381                    row[col_idx].as_str()
1382                } else {
1383                    "NULL"
1384                }
1385            })
1386            .collect();
1387
1388        columns.push(build_array(&types[col_idx], &col_values)?);
1389    }
1390
1391    let batch = RecordBatch::try_new(schema, columns)?;
1392    Ok(batch)
1393}
1394
1395/// Build an Arrow array from SQL literal strings.
1396fn build_array(dt: &DataType, values: &[&str]) -> Result<Arc<dyn Array>, DfOlapError> {
1397    use arrow::array::*;
1398
1399    match dt {
1400        DataType::Int64 => {
1401            let mut builder = Int64Builder::new();
1402            for v in values {
1403                if v.eq_ignore_ascii_case("NULL") {
1404                    builder.append_null();
1405                } else {
1406                    builder.append_value(
1407                        v.parse::<i64>()
1408                            .map_err(|e| DfOlapError::Other(format!("parse i64: {e}")))?,
1409                    );
1410                }
1411            }
1412            Ok(Arc::new(builder.finish()))
1413        }
1414        DataType::Float64 => {
1415            let mut builder = Float64Builder::new();
1416            for v in values {
1417                if v.eq_ignore_ascii_case("NULL") {
1418                    builder.append_null();
1419                } else {
1420                    builder.append_value(
1421                        v.parse::<f64>()
1422                            .map_err(|e| DfOlapError::Other(format!("parse f64: {e}")))?,
1423                    );
1424                }
1425            }
1426            Ok(Arc::new(builder.finish()))
1427        }
1428        DataType::Boolean => {
1429            let mut builder = BooleanBuilder::new();
1430            for v in values {
1431                let upper = v.to_ascii_uppercase();
1432                if upper == "NULL" {
1433                    builder.append_null();
1434                } else {
1435                    builder.append_value(upper == "TRUE");
1436                }
1437            }
1438            Ok(Arc::new(builder.finish()))
1439        }
1440        _ => {
1441            let mut builder = StringBuilder::new();
1442            for v in values {
1443                if v.eq_ignore_ascii_case("NULL") {
1444                    builder.append_null();
1445                } else {
1446                    let stripped = if v.starts_with('\'') && v.ends_with('\'') && v.len() >= 2 {
1447                        &v[1..v.len() - 1]
1448                    } else {
1449                        v
1450                    };
1451                    builder.append_value(stripped.replace("''", "'"));
1452                }
1453            }
1454            Ok(Arc::new(builder.finish()))
1455        }
1456    }
1457}
1458
1459/// Column assignment or condition: (column_name, value_literal).
1460type ColVal = (String, String);
1461
1462/// Parse `UPDATE <table> SET col=val, ... WHERE col=val AND ...`
1463fn parse_update(sql: &str) -> Result<(String, Vec<ColVal>, Vec<ColVal>), DfOlapError> {
1464    let mut stmts = Parser::parse_sql(&SQLiteDialect {}, sql)
1465        .map_err(|e| DfOlapError::Other(format!("failed to parse UPDATE: {e}")))?;
1466
1467    let stmt = stmts
1468        .pop()
1469        .ok_or_else(|| DfOlapError::Other("empty SQL statement".into()))?;
1470
1471    let update = match stmt {
1472        Statement::Update(upd) => upd,
1473        other => {
1474            return Err(DfOlapError::Other(format!(
1475                "expected UPDATE statement, got: {other:?}"
1476            )));
1477        }
1478    };
1479
1480    let table_name = match &update.table.relation {
1481        TableFactor::Table { name, .. } => name
1482            .0
1483            .last()
1484            .and_then(|p| p.as_ident())
1485            .map(|id| id.value.clone())
1486            .ok_or_else(|| DfOlapError::Other("empty table name in UPDATE".into()))?,
1487        other => {
1488            return Err(DfOlapError::Other(format!(
1489                "unexpected table factor in UPDATE: {other:?}"
1490            )));
1491        }
1492    };
1493
1494    let assignments: Vec<ColVal> = update
1495        .assignments
1496        .iter()
1497        .map(|a| {
1498            let col = match &a.target {
1499                AssignmentTarget::ColumnName(obj) => obj
1500                    .0
1501                    .last()
1502                    .and_then(|p| p.as_ident())
1503                    .map(|id| id.value.clone())
1504                    .ok_or_else(|| DfOlapError::Other("empty column name in SET".into()))?,
1505                AssignmentTarget::Tuple(_) => {
1506                    return Err(DfOlapError::Other(
1507                        "tuple assignments in SET not supported".into(),
1508                    ));
1509                }
1510            };
1511            let val = expr_to_sql_literal(&a.value)?;
1512            Ok((col, val))
1513        })
1514        .collect::<Result<_, DfOlapError>>()?;
1515
1516    let where_clause = match &update.selection {
1517        Some(expr) => extract_where_conditions(expr)?,
1518        None => vec![],
1519    };
1520
1521    Ok((table_name, assignments, where_clause))
1522}
1523
1524/// Parse `DELETE FROM <table> WHERE col=val AND ...`
1525fn parse_delete(sql: &str) -> Result<(String, Vec<(String, String)>), DfOlapError> {
1526    let mut stmts = Parser::parse_sql(&SQLiteDialect {}, sql)
1527        .map_err(|e| DfOlapError::Other(format!("failed to parse DELETE: {e}")))?;
1528
1529    let stmt = stmts
1530        .pop()
1531        .ok_or_else(|| DfOlapError::Other("empty SQL statement".into()))?;
1532
1533    let delete = match stmt {
1534        Statement::Delete(del) => del,
1535        other => {
1536            return Err(DfOlapError::Other(format!(
1537                "expected DELETE statement, got: {other:?}"
1538            )));
1539        }
1540    };
1541
1542    let tables = match &delete.from {
1543        FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
1544    };
1545
1546    let table_name = tables
1547        .first()
1548        .and_then(|twj| {
1549            if let TableFactor::Table { name, .. } = &twj.relation {
1550                name.0
1551                    .last()
1552                    .and_then(|p| p.as_ident())
1553                    .map(|id| id.value.clone())
1554            } else {
1555                None
1556            }
1557        })
1558        .ok_or_else(|| DfOlapError::Other("missing table name in DELETE".into()))?;
1559
1560    let where_clause = match &delete.selection {
1561        Some(expr) => extract_where_conditions(expr)?,
1562        None => vec![],
1563    };
1564
1565    Ok((table_name, where_clause))
1566}
1567
1568/// Flatten multiple RecordBatches into a single one.
1569fn flatten_batches(
1570    batches: &[RecordBatch],
1571    schema: &SchemaRef,
1572) -> Result<Option<RecordBatch>, DfOlapError> {
1573    if batches.is_empty() {
1574        return Ok(None);
1575    }
1576
1577    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1578    if total_rows == 0 {
1579        return Ok(None);
1580    }
1581
1582    let batch = arrow::compute::concat_batches(schema, batches)?;
1583    Ok(Some(batch))
1584}
1585
1586/// Apply UPDATE assignments to matching rows, return (new_batch, updated_count).
1587fn apply_update(
1588    batch: &RecordBatch,
1589    schema: &SchemaRef,
1590    assignments: &[(String, String)],
1591    where_conditions: &[(String, String)],
1592) -> Result<(RecordBatch, u64), DfOlapError> {
1593    let matching = find_matching_rows(batch, schema, where_conditions)?;
1594    let updated_count = matching.iter().filter(|&&m| m).count() as u64;
1595
1596    let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(schema.fields().len());
1597    for (col_idx, field) in schema.fields().iter().enumerate() {
1598        let assignment = assignments.iter().find(|(col, _)| col == field.name());
1599
1600        if let Some((_, new_val)) = assignment {
1601            let original = batch.column(col_idx);
1602            new_columns.push(apply_value_to_matching(
1603                original,
1604                &matching,
1605                new_val,
1606                field.data_type(),
1607            )?);
1608        } else {
1609            new_columns.push(batch.column(col_idx).clone());
1610        }
1611    }
1612
1613    let new_batch = RecordBatch::try_new(schema.clone(), new_columns)?;
1614    Ok((new_batch, updated_count))
1615}
1616
1617/// Apply DELETE, returning (filtered_batch, deleted_count).
1618fn apply_delete(
1619    batch: &RecordBatch,
1620    schema: &SchemaRef,
1621    where_conditions: &[(String, String)],
1622) -> Result<(RecordBatch, u64), DfOlapError> {
1623    let matching = find_matching_rows(batch, schema, where_conditions)?;
1624    let deleted_count = matching.iter().filter(|&&m| m).count() as u64;
1625
1626    let mut builder = BooleanBuilder::new();
1627    for &m in &matching {
1628        builder.append_value(!m);
1629    }
1630    let filter_array = builder.finish();
1631
1632    let new_columns: Vec<Arc<dyn Array>> = (0..batch.num_columns())
1633        .map(|i| arrow::compute::filter(batch.column(i), &filter_array).map_err(DfOlapError::Arrow))
1634        .collect::<Result<_, _>>()?;
1635
1636    let new_batch = RecordBatch::try_new(schema.clone(), new_columns)?;
1637    Ok((new_batch, deleted_count))
1638}
1639
1640/// Find which rows match the WHERE conditions.
1641fn find_matching_rows(
1642    batch: &RecordBatch,
1643    schema: &SchemaRef,
1644    conditions: &[(String, String)],
1645) -> Result<Vec<bool>, DfOlapError> {
1646    let num_rows = batch.num_rows();
1647    let mut matching = vec![true; num_rows];
1648
1649    for (col_name, expected_val) in conditions {
1650        let col_idx = schema
1651            .fields()
1652            .iter()
1653            .position(|f| f.name() == col_name)
1654            .ok_or_else(|| DfOlapError::Other(format!("column not found: {col_name}")))?;
1655
1656        let col = batch.column(col_idx);
1657        for (row_idx, m) in matching.iter_mut().enumerate() {
1658            if !*m {
1659                continue;
1660            }
1661            *m = value_matches(col, row_idx, expected_val);
1662        }
1663    }
1664
1665    Ok(matching)
1666}
1667
1668/// Check if an Arrow value at a given row matches a SQL literal.
1669fn value_matches(array: &dyn Array, row_idx: usize, expected: &str) -> bool {
1670    if expected == "__IS_NOT_NULL__" {
1671        return !array.is_null(row_idx);
1672    }
1673    if array.is_null(row_idx) {
1674        return expected.eq_ignore_ascii_case("NULL");
1675    }
1676
1677    match array.data_type() {
1678        DataType::Int8 => {
1679            expected.parse::<i8>().ok() == Some(array.as_primitive::<Int8Type>().value(row_idx))
1680        }
1681        DataType::Int16 => {
1682            expected.parse::<i16>().ok() == Some(array.as_primitive::<Int16Type>().value(row_idx))
1683        }
1684        DataType::Int32 => {
1685            expected.parse::<i32>().ok() == Some(array.as_primitive::<Int32Type>().value(row_idx))
1686        }
1687        DataType::Int64 => {
1688            expected.parse::<i64>().ok() == Some(array.as_primitive::<Int64Type>().value(row_idx))
1689        }
1690        DataType::UInt8 => {
1691            expected.parse::<u8>().ok() == Some(array.as_primitive::<UInt8Type>().value(row_idx))
1692        }
1693        DataType::UInt16 => {
1694            expected.parse::<u16>().ok() == Some(array.as_primitive::<UInt16Type>().value(row_idx))
1695        }
1696        DataType::UInt32 => {
1697            expected.parse::<u32>().ok() == Some(array.as_primitive::<UInt32Type>().value(row_idx))
1698        }
1699        DataType::UInt64 => {
1700            expected.parse::<u64>().ok() == Some(array.as_primitive::<UInt64Type>().value(row_idx))
1701        }
1702        DataType::Float32 => {
1703            expected.parse::<f32>().ok() == Some(array.as_primitive::<Float32Type>().value(row_idx))
1704        }
1705        DataType::Float64 => {
1706            expected.parse::<f64>().ok() == Some(array.as_primitive::<Float64Type>().value(row_idx))
1707        }
1708        DataType::Utf8 => {
1709            let arr = array.as_string::<i32>();
1710            let stripped =
1711                if expected.starts_with('\'') && expected.ends_with('\'') && expected.len() >= 2 {
1712                    &expected[1..expected.len() - 1]
1713                } else {
1714                    expected
1715                };
1716            arr.value(row_idx) == stripped
1717        }
1718        DataType::Boolean => {
1719            let arr = array.as_boolean();
1720            match expected.to_ascii_uppercase().as_str() {
1721                "TRUE" => arr.value(row_idx),
1722                "FALSE" => !arr.value(row_idx),
1723                _ => false,
1724            }
1725        }
1726        _ => false,
1727    }
1728}
1729
1730/// Replace values in an array at matching positions with a new SQL literal value.
1731fn apply_value_to_matching(
1732    original: &dyn Array,
1733    matching: &[bool],
1734    new_val: &str,
1735    dt: &DataType,
1736) -> Result<Arc<dyn Array>, DfOlapError> {
1737    use arrow::array::*;
1738
1739    match dt {
1740        DataType::Int64 => {
1741            let orig = original.as_primitive::<Int64Type>();
1742            let parsed: i64 = new_val
1743                .parse()
1744                .map_err(|e| DfOlapError::Other(format!("parse i64: {e}")))?;
1745            let mut builder = Int64Builder::new();
1746            for (i, &m) in matching.iter().enumerate() {
1747                if m {
1748                    builder.append_value(parsed);
1749                } else if orig.is_null(i) {
1750                    builder.append_null();
1751                } else {
1752                    builder.append_value(orig.value(i));
1753                }
1754            }
1755            Ok(Arc::new(builder.finish()))
1756        }
1757        DataType::Float64 => {
1758            let orig = original.as_primitive::<Float64Type>();
1759            let parsed: f64 = new_val
1760                .parse()
1761                .map_err(|e| DfOlapError::Other(format!("parse f64: {e}")))?;
1762            let mut builder = Float64Builder::new();
1763            for (i, &m) in matching.iter().enumerate() {
1764                if m {
1765                    builder.append_value(parsed);
1766                } else if orig.is_null(i) {
1767                    builder.append_null();
1768                } else {
1769                    builder.append_value(orig.value(i));
1770                }
1771            }
1772            Ok(Arc::new(builder.finish()))
1773        }
1774        DataType::Utf8 => {
1775            let orig = original.as_string::<i32>();
1776            let stripped =
1777                if new_val.starts_with('\'') && new_val.ends_with('\'') && new_val.len() >= 2 {
1778                    &new_val[1..new_val.len() - 1]
1779                } else {
1780                    new_val
1781                };
1782            let unescaped = stripped.replace("''", "'");
1783            let mut builder = StringBuilder::new();
1784            for (i, &m) in matching.iter().enumerate() {
1785                if m {
1786                    builder.append_value(&unescaped);
1787                } else if orig.is_null(i) {
1788                    builder.append_null();
1789                } else {
1790                    builder.append_value(orig.value(i));
1791                }
1792            }
1793            Ok(Arc::new(builder.finish()))
1794        }
1795        DataType::Boolean => {
1796            let orig = original.as_boolean();
1797            let parsed = new_val.eq_ignore_ascii_case("TRUE");
1798            let mut builder = BooleanBuilder::new();
1799            for (i, &m) in matching.iter().enumerate() {
1800                if m {
1801                    builder.append_value(parsed);
1802                } else if orig.is_null(i) {
1803                    builder.append_null();
1804                } else {
1805                    builder.append_value(orig.value(i));
1806                }
1807            }
1808            Ok(Arc::new(builder.finish()))
1809        }
1810        _ => {
1811            let orig = original.as_string::<i32>();
1812            let mut builder = StringBuilder::new();
1813            for (i, &m) in matching.iter().enumerate() {
1814                if m {
1815                    builder.append_value(new_val);
1816                } else if orig.is_null(i) {
1817                    builder.append_null();
1818                } else {
1819                    builder.append_value(orig.value(i));
1820                }
1821            }
1822            Ok(Arc::new(builder.finish()))
1823        }
1824    }
1825}
1826
1827#[cfg(test)]
1828mod tests {
1829    use super::*;
1830    use arrow::datatypes::{Field, Schema};
1831    use rhei_core::OlapEngine;
1832
1833    fn users_schema() -> SchemaRef {
1834        Arc::new(Schema::new(vec![
1835            Field::new("id", DataType::Int64, false),
1836            Field::new("name", DataType::Utf8, true),
1837            Field::new("age", DataType::Int64, true),
1838        ]))
1839    }
1840
1841    fn make_in_memory(_: &std::path::Path) -> DataFusionEngine {
1842        DataFusionEngine::new()
1843    }
1844
1845    fn make_vortex(tmp: &std::path::Path) -> DataFusionEngine {
1846        DataFusionEngine::with_storage(StorageMode::Vortex {
1847            url: tmp.join("vortex_olap").to_string_lossy().to_string(),
1848        })
1849        .unwrap()
1850    }
1851
1852    /// Generate a full test suite for a given storage mode.
1853    macro_rules! storage_mode_tests {
1854        ($mod_name:ident, $make_engine:ident) => {
1855            mod $mod_name {
1856                use super::*;
1857
1858                #[tokio::test]
1859                async fn create_and_query_empty() {
1860                    let _tmp = tempfile::tempdir().unwrap();
1861                    let engine = $make_engine(_tmp.path());
1862                    let schema = users_schema();
1863                    engine.create_table("users", &schema, &[]).await.unwrap();
1864
1865                    assert!(engine.table_exists("users").await.unwrap());
1866                    assert!(!engine.table_exists("nonexistent").await.unwrap());
1867                }
1868
1869                #[tokio::test]
1870                async fn insert_and_query() {
1871                    let _tmp = tempfile::tempdir().unwrap();
1872                    let engine = $make_engine(_tmp.path());
1873                    let schema = users_schema();
1874                    engine.create_table("users", &schema, &[]).await.unwrap();
1875
1876                    engine
1877                        .execute("INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30)")
1878                        .await
1879                        .unwrap();
1880                    engine
1881                        .execute("INSERT INTO users (id, name, age) VALUES (2, 'Bob', 25)")
1882                        .await
1883                        .unwrap();
1884
1885                    let batches = engine
1886                        .query("SELECT * FROM users ORDER BY id")
1887                        .await
1888                        .unwrap();
1889                    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1890                    assert_eq!(total_rows, 2);
1891                }
1892
1893                #[tokio::test]
1894                async fn update() {
1895                    let _tmp = tempfile::tempdir().unwrap();
1896                    let engine = $make_engine(_tmp.path());
1897                    let schema = users_schema();
1898                    engine.create_table("users", &schema, &[]).await.unwrap();
1899
1900                    engine
1901                        .execute("INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30)")
1902                        .await
1903                        .unwrap();
1904
1905                    let updated = engine
1906                        .execute("UPDATE users SET age = 31 WHERE id = 1")
1907                        .await
1908                        .unwrap();
1909                    assert_eq!(updated, 1);
1910
1911                    let batches = engine
1912                        .query("SELECT age FROM users WHERE id = 1")
1913                        .await
1914                        .unwrap();
1915                    let age = batches[0].column(0).as_primitive::<Int64Type>().value(0);
1916                    assert_eq!(age, 31);
1917                }
1918
1919                #[tokio::test]
1920                async fn delete() {
1921                    let _tmp = tempfile::tempdir().unwrap();
1922                    let engine = $make_engine(_tmp.path());
1923                    let schema = users_schema();
1924                    engine.create_table("users", &schema, &[]).await.unwrap();
1925
1926                    engine
1927                        .execute("INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30), (2, 'Bob', 25)")
1928                        .await
1929                        .unwrap();
1930
1931                    let deleted = engine
1932                        .execute("DELETE FROM users WHERE id = 1")
1933                        .await
1934                        .unwrap();
1935                    assert_eq!(deleted, 1);
1936
1937                    let batches = engine
1938                        .query("SELECT COUNT(*) as cnt FROM users")
1939                        .await
1940                        .unwrap();
1941                    let count = batches[0].column(0).as_primitive::<Int64Type>().value(0);
1942                    assert_eq!(count, 1);
1943                }
1944
1945                #[tokio::test]
1946                async fn load_arrow_bulk() {
1947                    let _tmp = tempfile::tempdir().unwrap();
1948                    let engine = $make_engine(_tmp.path());
1949                    let schema = users_schema();
1950                    engine.create_table("users", &schema, &[]).await.unwrap();
1951
1952                    let batch = RecordBatch::try_new(
1953                        schema.clone(),
1954                        vec![
1955                            Arc::new(arrow::array::Int64Array::from(vec![1, 2, 3])),
1956                            Arc::new(arrow::array::StringArray::from(vec![
1957                                "Alice", "Bob", "Charlie",
1958                            ])),
1959                            Arc::new(arrow::array::Int64Array::from(vec![30, 25, 35])),
1960                        ],
1961                    )
1962                    .unwrap();
1963
1964                    let loaded = engine.load_arrow("users", &[batch]).await.unwrap();
1965                    assert_eq!(loaded, 3);
1966
1967                    let batches = engine
1968                        .query("SELECT COUNT(*) as cnt FROM users")
1969                        .await
1970                        .unwrap();
1971                    let count = batches[0].column(0).as_primitive::<Int64Type>().value(0);
1972                    assert_eq!(count, 3);
1973                }
1974
1975                #[tokio::test]
1976                async fn aggregate() {
1977                    let _tmp = tempfile::tempdir().unwrap();
1978                    let engine = $make_engine(_tmp.path());
1979                    let schema = users_schema();
1980                    engine.create_table("users", &schema, &[]).await.unwrap();
1981
1982                    engine
1983                        .execute(
1984                            "INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30), (2, 'Bob', 25), (3, 'Charlie', 35)",
1985                        )
1986                        .await
1987                        .unwrap();
1988
1989                    let batches = engine
1990                        .query("SELECT AVG(age) as avg_age FROM users")
1991                        .await
1992                        .unwrap();
1993                    let avg = batches[0].column(0).as_primitive::<Float64Type>().value(0);
1994                    assert!((avg - 30.0).abs() < 0.01);
1995                }
1996            }
1997        };
1998    }
1999
2000    storage_mode_tests!(in_memory, make_in_memory);
2001    storage_mode_tests!(vortex_local, make_vortex);
2002
2003    // -----------------------------------------------------------------------
2004    // Vortex persist/restart round-trip test
2005    // -----------------------------------------------------------------------
2006
2007    /// Insert data, drop engine, re-create pointing at same directory,
2008    /// verify data survives the restart.
2009    #[tokio::test]
2010    async fn vortex_local_persist_restart() {
2011        let tmp = tempfile::tempdir().unwrap();
2012        let base = tmp.path().join("restart_test");
2013
2014        let schema = users_schema();
2015
2016        // First engine: write data.
2017        {
2018            let engine = DataFusionEngine::with_storage(StorageMode::Vortex {
2019                url: base.to_string_lossy().to_string(),
2020            })
2021            .unwrap();
2022            engine.create_table("users", &schema, &[]).await.unwrap();
2023            engine
2024                .execute(
2025                    "INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30), (2, 'Bob', 25)",
2026                )
2027                .await
2028                .unwrap();
2029        }
2030
2031        // Second engine: verify data persisted.
2032        {
2033            let engine2 = DataFusionEngine::with_storage(StorageMode::Vortex {
2034                url: base.to_string_lossy().to_string(),
2035            })
2036            .unwrap();
2037            // Re-register the table (the schema must be known to re-open).
2038            engine2.create_table("users", &schema, &[]).await.unwrap();
2039
2040            let batches = engine2
2041                .query("SELECT COUNT(*) as cnt FROM users")
2042                .await
2043                .unwrap();
2044            let count = batches[0].column(0).as_primitive::<Int64Type>().value(0);
2045            assert_eq!(count, 2, "data should survive engine restart");
2046        }
2047    }
2048
2049    // -----------------------------------------------------------------------
2050    // Parser edge-case tests
2051    // -----------------------------------------------------------------------
2052
2053    #[tokio::test]
2054    async fn insert_string_with_comma() {
2055        let engine = DataFusionEngine::new();
2056        let schema = users_schema();
2057        engine.create_table("users", &schema, &[]).await.unwrap();
2058
2059        engine
2060            .execute("INSERT INTO users (id, name, age) VALUES (1, 'Alice, B', 30)")
2061            .await
2062            .unwrap();
2063
2064        let batches = engine
2065            .query("SELECT name FROM users WHERE id = 1")
2066            .await
2067            .unwrap();
2068        let name_arr = batches[0].column(0).as_string::<i32>();
2069        assert_eq!(name_arr.value(0), "Alice, B");
2070    }
2071
2072    #[tokio::test]
2073    async fn insert_null_value() {
2074        let engine = DataFusionEngine::new();
2075        let schema = users_schema();
2076        engine.create_table("users", &schema, &[]).await.unwrap();
2077
2078        engine
2079            .execute("INSERT INTO users (id, name, age) VALUES (1, NULL, 30)")
2080            .await
2081            .unwrap();
2082
2083        let batches = engine
2084            .query("SELECT name FROM users WHERE id = 1")
2085            .await
2086            .unwrap();
2087        assert!(batches[0].column(0).is_null(0));
2088    }
2089
2090    #[tokio::test]
2091    async fn update_where_and() {
2092        let engine = DataFusionEngine::new();
2093        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
2094            arrow::datatypes::Field::new("id", DataType::Int64, false),
2095            arrow::datatypes::Field::new("name", DataType::Utf8, true),
2096            arrow::datatypes::Field::new("status", DataType::Utf8, true),
2097        ]));
2098        engine.create_table("t", &schema, &[]).await.unwrap();
2099
2100        engine
2101            .execute("INSERT INTO t (id, name, status) VALUES (1, 'x', 'active')")
2102            .await
2103            .unwrap();
2104        engine
2105            .execute("INSERT INTO t (id, name, status) VALUES (2, 'y', 'inactive')")
2106            .await
2107            .unwrap();
2108
2109        let updated = engine
2110            .execute("UPDATE t SET name = 'updated' WHERE id = 1 AND status = 'active'")
2111            .await
2112            .unwrap();
2113        assert_eq!(updated, 1);
2114
2115        let batches = engine
2116            .query("SELECT name FROM t WHERE id = 1")
2117            .await
2118            .unwrap();
2119        assert_eq!(batches[0].column(0).as_string::<i32>().value(0), "updated");
2120
2121        let batches2 = engine
2122            .query("SELECT name FROM t WHERE id = 2")
2123            .await
2124            .unwrap();
2125        assert_eq!(batches2[0].column(0).as_string::<i32>().value(0), "y");
2126    }
2127
2128    #[tokio::test]
2129    async fn delete_quoted_identifier() {
2130        let engine = DataFusionEngine::new();
2131        let schema = users_schema();
2132        engine.create_table("users", &schema, &[]).await.unwrap();
2133
2134        engine
2135            .execute("INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30)")
2136            .await
2137            .unwrap();
2138        engine
2139            .execute("INSERT INTO users (id, name, age) VALUES (2, 'Bob', 25)")
2140            .await
2141            .unwrap();
2142
2143        let deleted = engine
2144            .execute(r#"DELETE FROM "users" WHERE id = 1"#)
2145            .await
2146            .unwrap();
2147        assert_eq!(deleted, 1);
2148
2149        let batches = engine.query("SELECT COUNT(*) FROM users").await.unwrap();
2150        let count = batches[0].column(0).as_primitive::<Int64Type>().value(0);
2151        assert_eq!(count, 1);
2152    }
2153
2154    #[tokio::test]
2155    async fn insert_escaped_single_quote() {
2156        let engine = DataFusionEngine::new();
2157        let schema = users_schema();
2158        engine.create_table("users", &schema, &[]).await.unwrap();
2159
2160        engine
2161            .execute("INSERT INTO users (id, name, age) VALUES (1, 'O''Brien', 42)")
2162            .await
2163            .unwrap();
2164
2165        let batches = engine
2166            .query("SELECT name FROM users WHERE id = 1")
2167            .await
2168            .unwrap();
2169        assert_eq!(batches[0].column(0).as_string::<i32>().value(0), "O'Brien");
2170    }
2171
2172    #[test]
2173    fn parse_insert_multi_row() {
2174        let (table, cols, batches) =
2175            parse_insert_values("INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')")
2176                .unwrap();
2177        assert_eq!(table, "users");
2178        assert_eq!(cols, vec!["id", "name"]);
2179        assert_eq!(batches.len(), 1);
2180        assert_eq!(batches[0].num_rows(), 2);
2181    }
2182
2183    #[test]
2184    fn parse_update_basic() {
2185        let (table, assignments, where_clause) =
2186            parse_update("UPDATE users SET name = 'Alice' WHERE id = 1").unwrap();
2187        assert_eq!(table, "users");
2188        assert_eq!(
2189            assignments,
2190            vec![("name".to_string(), "'Alice'".to_string())]
2191        );
2192        assert_eq!(where_clause, vec![("id".to_string(), "1".to_string())]);
2193    }
2194
2195    #[test]
2196    fn parse_delete_no_where() {
2197        let (table, conditions) = parse_delete("DELETE FROM logs").unwrap();
2198        assert_eq!(table, "logs");
2199        assert!(conditions.is_empty());
2200    }
2201
2202    // -----------------------------------------------------------------------
2203    // StorageMode URL classification tests
2204    // -----------------------------------------------------------------------
2205
2206    #[test]
2207    fn vortex_url_local_path_classified() {
2208        let mode = StorageMode::Vortex {
2209            url: "/tmp/rhei".to_string(),
2210        };
2211        assert!(!mode.is_cloud());
2212        assert!(mode.local_base_path().is_some());
2213    }
2214
2215    #[cfg(feature = "cloud-storage")]
2216    #[test]
2217    fn vortex_url_s3_classified() {
2218        let mode = StorageMode::Vortex {
2219            url: "s3://my-bucket/prefix".to_string(),
2220        };
2221        assert!(mode.is_cloud());
2222        assert_eq!(mode.cloud_base_url(), Some("s3://my-bucket/prefix"));
2223    }
2224
2225    // -----------------------------------------------------------------------
2226    // S3 integration test (gated on RHEI_TEST_S3=1)
2227    // -----------------------------------------------------------------------
2228
2229    #[cfg(feature = "cloud-storage")]
2230    #[tokio::test]
2231    async fn vortex_s3_round_trip() {
2232        if std::env::var("RHEI_TEST_S3").as_deref() != Ok("1") {
2233            return; // Skip unless RHEI_TEST_S3=1 is set
2234        }
2235
2236        use std::time::{SystemTime, UNIX_EPOCH};
2237
2238        // Use a randomized prefix so concurrent test runs don't collide.
2239        let ts = SystemTime::now()
2240            .duration_since(UNIX_EPOCH)
2241            .unwrap()
2242            .subsec_nanos();
2243        let prefix = format!("test_{:08x}", ts);
2244        let base_url = format!("s3://pixai-rec-sys/dev/rhei/{prefix}");
2245
2246        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
2247            arrow::datatypes::Field::new("id", DataType::Int64, false),
2248            arrow::datatypes::Field::new("val", DataType::Utf8, true),
2249        ]));
2250
2251        let engine = DataFusionEngine::with_storage(StorageMode::Vortex {
2252            url: base_url.clone(),
2253        })
2254        .expect("S3 engine construction should succeed with AWS credentials");
2255
2256        engine.create_table("s3test", &schema, &[]).await.unwrap();
2257        engine
2258            .execute("INSERT INTO s3test (id, val) VALUES (1, 'hello'), (2, 'world')")
2259            .await
2260            .unwrap();
2261
2262        let batches = engine
2263            .query("SELECT COUNT(*) as cnt FROM s3test")
2264            .await
2265            .unwrap();
2266        let count = batches[0].column(0).as_primitive::<Int64Type>().value(0);
2267        assert_eq!(count, 2, "S3 round-trip INSERT+SELECT should return 2 rows");
2268
2269        // Update a row and verify.
2270        let updated = engine
2271            .execute("UPDATE s3test SET val = 'updated' WHERE id = 1")
2272            .await
2273            .unwrap();
2274        assert_eq!(updated, 1);
2275
2276        let batches2 = engine
2277            .query("SELECT val FROM s3test WHERE id = 1")
2278            .await
2279            .unwrap();
2280        assert_eq!(batches2[0].column(0).as_string::<i32>().value(0), "updated");
2281
2282        // TODO: clean up the S3 prefix after the test (requires object_store list+delete)
2283        // This is a known gap: S3 test data under `dev/rhei/{prefix}` persists.
2284        // Run `aws s3 rm --recursive s3://pixai-rec-sys/dev/rhei/{prefix}` to clean up.
2285        tracing::warn!(prefix, "S3 test data not cleaned up — remove manually");
2286    }
2287}