Skip to main content

datapress_datafusion/
store.rs

1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::sync::{Arc, Mutex};
4
5use arc_swap::ArcSwap;
6use arrow::array::{
7    Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
8    Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
9    StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
10};
11use arrow::compute;
12use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
13use arrow::datatypes::{DataType, Field, Schema};
14use async_trait::async_trait;
15use parquet::arrow::ProjectionMask;
16use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
17use serde_json::Value as JsonValue;
18
19use datafusion::datasource::file_format::parquet::ParquetFormat;
20use datafusion::datasource::listing::{
21    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
22};
23use datafusion::datasource::{MemTable, TableProvider};
24use datafusion::prelude::SessionContext;
25use datafusion::scalar::ScalarValue;
26
27use object_store::aws::AmazonS3Builder;
28use url::Url;
29
30use datapress_core::backend::{
31    ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
32};
33use datapress_core::config::{
34    AddressingStyle, AppConfig, DatasetConfig, IndexConfig, IndexMode, Partitioning, ResolvedCreds,
35    S3Config, SourceKind,
36};
37use datapress_core::errors::AppError;
38use datapress_core::models::{CountRequest, Predicate, QueryRequest};
39use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
40
41// ---------------------------------------------------------------------------
42// Public types
43// ---------------------------------------------------------------------------
44
45/// Pre-built equality index: lowercase col name → string-encoded value → sorted row ids.
46type EqIndex = HashMap<String, HashMap<String, Vec<u32>>>;
47
48/// Per-dataset state: schema metadata, the resident chunks, and the
49/// equality index built per the dataset's `[dataset.index]` policy.
50///
51/// `data` is the dataset as a `Vec<RecordBatch>` — exactly the chunks
52/// produced by the underlying reader, after temporal columns are cast to
53/// `Utf8`. We deliberately do **not** call `concat_batches` to fuse them
54/// into one batch: on wide schemas (hundreds of columns) that transiently
55/// allocates a second full copy of the decoded Arrow data, pushing peak
56/// RSS to ~2× the resident size and OOM-killing the process at startup.
57///
58/// When `lazy` is true the dataset is *not* materialised: `data` is empty,
59/// `index` is empty, and every query is dispatched to DataFusion SQL
60/// against a registered `ListingTable`. `arrow_schema` still carries the
61/// inferred schema so discovery endpoints work.
62pub struct DatasetState {
63    pub schema: DatasetSchema,
64    pub data: Vec<RecordBatch>,
65    pub arrow_schema: Arc<Schema>,
66    pub index: EqIndex,
67    pub lazy: bool,
68}
69
70impl DatasetState {
71    /// Sum of `num_rows()` across all resident chunks. `0` for lazy datasets.
72    pub fn num_rows(&self) -> usize {
73        self.data.iter().map(|b| b.num_rows()).sum()
74    }
75}
76
77/// Multi-dataset registry. Each dataset is registered in the shared
78/// `SessionContext` under its configured name. The per-dataset state is
79/// held behind `ArcSwap` so a reload can atomically replace it without
80/// blocking concurrent queries.
81pub struct Store {
82    ctx: SessionContext,
83    max_page_size: u64,
84    /// Original dataset configs, indexed by name. Reload reads the source
85    /// path from here — clients can't redirect a reload at an arbitrary file.
86    configs: HashMap<String, DatasetConfig>,
87    /// Hot-swappable snapshot of all currently loaded datasets.
88    datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
89    /// Per-name reload mutex. Serialises concurrent reloads of the same
90    /// dataset; reloads of different datasets proceed in parallel.
91    reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
92}
93
94impl Store {
95    /// Load every dataset declared in `cfg`.
96    pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
97        // One-shot init for the deltalake S3 backend. Safe to call more
98        // than once — the handlers are idempotent.
99        if cfg
100            .datasets
101            .iter()
102            .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
103        {
104            deltalake::aws::register_handlers(None);
105        }
106
107        let ctx = SessionContext::new();
108        let mut datasets = HashMap::with_capacity(cfg.datasets.len());
109        let mut configs = HashMap::with_capacity(cfg.datasets.len());
110
111        for d in &cfg.datasets {
112            let (state, provider) = build_dataset(d, &ctx).await?;
113            ctx.register_table(d.name.as_str(), provider)?;
114            datasets.insert(d.name.clone(), Arc::new(state));
115            configs.insert(d.name.clone(), d.clone());
116        }
117        Ok(Self {
118            ctx,
119            max_page_size: cfg.server.max_page_size.max(1),
120            configs,
121            datasets: ArcSwap::from_pointee(datasets),
122            reload_locks: Mutex::new(HashMap::new()),
123        })
124    }
125
126    /// Sorted list of dataset names.
127    pub fn names(&self) -> Vec<String> {
128        let snap = self.datasets.load();
129        let mut v: Vec<String> = snap.keys().cloned().collect();
130        v.sort();
131        v
132    }
133
134    pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
135        self.datasets
136            .load()
137            .get(name)
138            .cloned()
139            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
140    }
141
142    /// JSON for the first row of the dataset, or `null` if empty. Used by
143    /// `GET /api/datasets/{name}/schema` for discoverability.
144    pub async fn sample(&self, name: &str) -> Result<String, AppError> {
145        let st = self.dataset(name)?;
146
147        // Lazy datasets have no resident batch — pull one row via SQL.
148        if st.lazy {
149            let table = DatasetSchema::quote_ident(&st.schema.name);
150            let sql = format!("SELECT * FROM {table} LIMIT 1");
151            let df = self.ctx.sql(&sql).await?;
152            let batches = df.collect().await?;
153            if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
154                return Ok("null".into());
155            }
156            let arr = serialize(&batches[0].slice(0, 1))?;
157            let trimmed = arr.trim();
158            let inner = trimmed
159                .strip_prefix('[')
160                .and_then(|s| s.strip_suffix(']'))
161                .unwrap_or(trimmed);
162            return Ok(inner.to_string());
163        }
164
165        let first = match st.data.iter().find(|b| b.num_rows() > 0) {
166            Some(b) => b,
167            None => return Ok("null".into()),
168        };
169        let arr = serialize(&first.slice(0, 1))?;
170        // strip the outer [] to return a single object
171        let trimmed = arr.trim();
172        let inner = trimmed
173            .strip_prefix('[')
174            .and_then(|s| s.strip_suffix(']'))
175            .unwrap_or(trimmed);
176        Ok(inner.to_string())
177    }
178
179    /// Rebuild `name` from disk and atomically swap it in. Concurrent queries
180    /// against the same name continue to see the *old* `Arc<DatasetState>`
181    /// until they finish; the old data is dropped once the last reference
182    /// goes away.
183    pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
184        // 1. Look up the dataset config. Not finding it = 404.
185        let cfg = self
186            .configs
187            .get(name)
188            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
189            .clone();
190
191        // 2. Per-name lock: only one reload of this dataset at a time.
192        let lock = {
193            let mut locks = self.reload_locks.lock().unwrap();
194            locks
195                .entry(name.to_string())
196                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
197                .clone()
198        };
199        let _guard = lock.lock().await;
200
201        let started = std::time::Instant::now();
202
203        // 3. Heavy lifting (source read + index build). Parquet/delta
204        // readers are themselves async, so we don't wrap in `web::block`.
205        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
206        let rows = state.num_rows();
207
208        // 4. Atomic swap.
209        //   a) Replace the MemTable inside the SessionContext.
210        //   b) ArcSwap a new snapshot map with the updated Arc<DatasetState>.
211        // In-flight queries already hold the old provider + old Arc; they
212        // run to completion. New queries see the new data.
213        let _ = self.ctx.deregister_table(name)?;
214        self.ctx.register_table(name, provider)?;
215
216        let mut new_map = (**self.datasets.load()).clone();
217        new_map.insert(name.to_string(), Arc::new(state));
218        self.datasets.store(Arc::new(new_map));
219
220        let elapsed_ms = started.elapsed().as_millis();
221        log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
222        Ok(ReloadStats { rows, elapsed_ms })
223    }
224
225    /// Run a `QueryRequest` against `name`. Empty predicates → O(1) Arrow
226    /// slice. Otherwise → DataFusion SQL on the single registered table.
227    /// Lazy datasets skip the in-memory hot paths and always dispatch to SQL.
228    pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
229        let batch = self.query_batch(name, req).await?;
230        if batch.num_rows() == 0 {
231            return Ok("[]".to_string());
232        }
233        serialize(&batch)
234    }
235
236    /// Same plan as [`Self::query`], but encode the result page as an
237    /// Arrow IPC stream (one schema message + one batch + EOS). Empty
238    /// results still produce a valid, self-describing zero-batch stream.
239    pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
240        let batch = self.query_batch(name, req).await?;
241        let schema = batch.schema();
242        let mut buf = Vec::with_capacity(8 * 1024);
243        {
244            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
245            if batch.num_rows() > 0 {
246                w.write(&batch)?;
247            }
248            w.finish()?;
249        }
250        Ok(buf)
251    }
252
253    pub async fn query_arrow_stream(
254        &self,
255        name: &str,
256        req: &QueryRequest,
257    ) -> Result<ArrowIpcStream, AppError> {
258        let batches = self.query_batches(name, req).await?;
259        Ok(stream_arrow_batches(batches))
260    }    pub async fn query_arrow_stream_all(
261        &self,
262        name: &str,
263        req: &QueryRequest,
264    ) -> Result<ArrowIpcStream, AppError> {
265        let batches = self.query_batches_all(name, req).await?;
266        Ok(stream_arrow_batches(batches))
267    }
268
269    /// Encode the entire dataset as a single self-contained Parquet file.
270    ///
271    /// Collects every row (all columns, no predicates, no paging) and runs
272    /// it through a single [`parquet::arrow::ArrowWriter`], so the result
273    /// carries the row-group + footer metadata a Parquet reader needs to
274    /// answer `count(*)` straight from the footer. Powers the cached
275    /// `GET /datasets/{name}/parquet` HTTP endpoint.
276    pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
277        // All rows, all columns, no predicates / ordering / limit.
278        let req = QueryRequest {
279            columns: Vec::new(),
280            predicates: Vec::new(),
281            group_by: Vec::new(),
282            aggregations: Vec::new(),
283            distinct: false,
284            order_by: Vec::new(),
285            limit: None,
286            page: 1,
287            page_size: 1,
288        };
289        let st = self.dataset(name)?;
290        let batches = self.query_batches_all(name, &req).await?;
291        // Use the actual batch schema when we have rows so the writer schema
292        // matches exactly (projection/nullability); fall back to the
293        // dataset schema for an empty dataset.
294        let schema = batches
295            .first()
296            .map(|b| b.schema())
297            .unwrap_or_else(|| st.arrow_schema.clone());
298
299        let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
300        {
301            let props = parquet::file::properties::WriterProperties::builder()
302                .set_compression(parquet::basic::Compression::SNAPPY)
303                .build();
304            let mut writer =
305                parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
306                    .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
307            for batch in &batches {
308                if batch.num_rows() > 0 {
309                    writer
310                        .write(batch)
311                        .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
312                }
313            }
314            writer
315                .close()
316                .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
317        }
318        Ok(bytes::Bytes::from(buf))
319    }
320
321    /// Compute the result page as a single `RecordBatch`. Shared between
322    /// the JSON and Arrow IPC encoders.
323    async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
324        let batches = self.query_batches(name, req).await?;
325        if batches.is_empty() {
326            return Ok(RecordBatch::new_empty(Arc::new(
327                arrow::datatypes::Schema::empty(),
328            )));
329        }
330        if batches.len() == 1 {
331            return Ok(batches.into_iter().next().expect("checked len"));
332        }
333        if batches.iter().all(|b| b.num_rows() == 0) {
334            return Ok(RecordBatch::new_empty(batches[0].schema()));
335        }
336        let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
337        Ok(batch)
338    }
339
340    /// Compute the result page as Arrow batches. Arrow IPC responses can
341    /// write these directly, while JSON callers concatenate via
342    /// [`Self::query_batch`] for the existing row conversion path.
343    async fn query_batches(
344        &self,
345        name: &str,
346        req: &QueryRequest,
347    ) -> Result<Vec<RecordBatch>, AppError> {
348        let st = self.dataset(name)?;
349
350        let page = req.page.max(1);
351        let page_size = req.page_size.clamp(1, self.max_page_size);
352        let offset = ((page - 1) * page_size) as usize;
353        let limit = page_size as usize;
354
355        self.query_batches_inner(st, req, Some((offset, limit)))
356            .await
357    }
358
359    /// Compute all matching rows as Arrow batches for the one-request
360    /// streaming endpoint. `page` and `page_size` are intentionally ignored;
361    /// optional `limit` still caps the total result size.
362    async fn query_batches_all(
363        &self,
364        name: &str,
365        req: &QueryRequest,
366    ) -> Result<Vec<RecordBatch>, AppError> {
367        let st = self.dataset(name)?;
368        self.query_batches_inner(st, req, None).await
369    }
370
371    async fn query_batches_inner(
372        &self,
373        st: Arc<DatasetState>,
374        req: &QueryRequest,
375        page_window: Option<(usize, usize)>,
376    ) -> Result<Vec<RecordBatch>, AppError> {
377        let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
378
379        // In-memory hot paths only fire when:
380        //   - the dataset is materialised,
381        //   - the caller did not ask for ordering,
382        //   - and did not ask for a hard `limit` cap on a paged request.
383        // Both of the latter two require sorting / capping that the SQL
384        // engine handles uniformly across all data types.
385        let can_fast_path = !st.lazy
386            && req.order_by.is_empty()
387            && (page_window.is_none() || req.limit.is_none())
388            && req.group_by.is_empty()
389            && !req.distinct;
390
391        if can_fast_path {
392            let total = st.num_rows();
393
394            // No predicates -> O(1) raw Arrow slices over resident batches,
395            // no engine overhead.
396            if req.predicates.is_empty() {
397                if page_window.is_none() && req.limit.is_none() {
398                    return st
399                        .data
400                        .iter()
401                        .cloned()
402                        .map(|batch| project(&st.schema, batch, &req.columns))
403                        .collect();
404                }
405                let start = offset.min(total);
406                let len = limit.min(total - start);
407                let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
408                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
409            }
410
411            // Index fast path: if every predicate is eq/in on an indexed column,
412            // resolve via the pre-built equality index.
413            if let Some(rows) = try_index(&st.index, &req.predicates) {
414                let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
415                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
416            }
417        }
418
419        // Fallback (and only path for lazy datasets): DataFusion SQL.
420        let (sql, params) = match page_window {
421            Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
422            None => build_query_stream_sql(&st.schema, req)?,
423        };
424        let mut df = self.ctx.sql(&sql).await?;
425        if !params.is_empty() {
426            df = df.with_param_values(params)?;
427        }
428        let batches = df.collect().await?;
429        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
430            let schema = batches
431                .first()
432                .map(|b| b.schema())
433                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
434            return Ok(vec![RecordBatch::new_empty(schema)]);
435        }
436        Ok(batches)
437    }
438}
439
440fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
441    let schema = batches
442        .first()
443        .map(|batch| batch.schema())
444        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
445    let (mut writer, stream) = arrow_ipc_stream_channel(8);
446
447    tokio::task::spawn_blocking(move || {
448        let result = (|| -> Result<(), AppError> {
449            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
450            for batch in batches {
451                if batch.num_rows() > 0 {
452                    w.write(&batch)?;
453                }
454            }
455            w.finish()?;
456            Ok(())
457        })();
458        if let Err(err) = result {
459            log::error!("datafusion arrow stream failed: {err}");
460            writer.send_error(err);
461        }
462    });
463
464    stream
465}
466
467impl Store {
468    /// Return the number of rows matching `req.predicates`. With no
469    /// predicates this is a cheap metadata lookup on materialised datasets
470    /// and a `SELECT COUNT(*)` on lazy ones.
471    pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
472        let st = self.dataset(name)?;
473
474        if !st.lazy {
475            // No predicates → resident row count, no scan.
476            if req.predicates.is_empty() {
477                return Ok(st.num_rows() as i64);
478            }
479            // Index fast path: same eligibility rules as `query`.
480            if let Some(rows) = try_index(&st.index, &req.predicates) {
481                return Ok(rows.len() as i64);
482            }
483        }
484
485        // Fallback: DataFusion SQL — same predicate translation as `query`,
486        // with predicate values bound as typed parameters.
487        let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
488        let mut df = self.ctx.sql(&sql).await?;
489        if !params.is_empty() {
490            df = df.with_param_values(params)?;
491        }
492        let batches = df.collect().await?;
493        let n = batches
494            .first()
495            .and_then(|b| {
496                b.column(0)
497                    .as_any()
498                    .downcast_ref::<arrow::array::Int64Array>()
499            })
500            .filter(|a| !a.is_empty())
501            .map(|a| a.value(0))
502            .unwrap_or(0);
503        Ok(n)
504    }
505}
506
507// ---------------------------------------------------------------------------
508// Dataset loading
509// ---------------------------------------------------------------------------
510
511async fn build_dataset(
512    d: &DatasetConfig,
513    ctx: &SessionContext,
514) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
515    // Lazy datasets: register a ListingTable straight against the source
516    // and skip the materialise / index / partition pipeline below. Delta
517    // is rejected — deltalake reads the transaction log eagerly to know
518    // which parquet files are current, so "lazy delta" doesn't map onto
519    // ListingTable cleanly.
520    if d.lazy {
521        match (d.source.kind, d.source.is_s3()) {
522            (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
523            (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
524            (SourceKind::Delta, _) => {
525                return Err(AppError::Internal(format!(
526                    "dataset '{}': lazy mode is not supported for delta sources",
527                    d.name
528                )));
529            }
530        }
531    }
532
533    // Fetch raw RecordBatches from whichever backing store the dataset
534    // is configured to use. All four (parquet, delta) x (local, s3)
535    // combinations converge into one Vec<RecordBatch>; the materialisation
536    // / indexing / partitioning logic below is shared.
537    let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
538        (SourceKind::Parquet, false) => read_local_parquet(d)?,
539        (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
540        (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
541        (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
542    };
543    if raw_batches.is_empty() {
544        return Err(AppError::Internal(format!(
545            "dataset '{}': source produced no batches",
546            d.name
547        )));
548    }
549
550    let chunks = raw_batches;
551    let arrow_sch = chunks[0].schema();
552
553    // Build DatasetSchema from the Arrow schema.
554    let columns: Vec<ColumnInfo> = arrow_sch
555        .fields()
556        .iter()
557        .map(|f| {
558            let dt = f.data_type();
559            ColumnInfo {
560                name: f.name().clone(),
561                logical: arrow_to_logical(dt),
562                sql_type: format!("{dt:?}"),
563                nullable: f.is_nullable(),
564            }
565        })
566        .collect();
567    let schema = DatasetSchema::new(&d.name, columns);
568
569    // Build the equality index per the per-dataset policy. Operates on the
570    // chunked representation directly so we never have to materialise a
571    // single concatenated batch (which would double peak RSS on wide
572    // schemas — see `DatasetState` docs).
573    let index = build_eq_index_with_policy(&chunks, &d.index);
574
575    // Partition for parallel scans by the SQL fallback path. We distribute
576    // the existing batches round-robin across `n_parts` partitions instead
577    // of re-slicing a concatenated batch — `clone()` on a RecordBatch is
578    // an Arc-clone of the column buffers, not a copy.
579    let n_parts = std::thread::available_parallelism()
580        .map(|n| n.get())
581        .unwrap_or(4);
582    let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
583    for (i, b) in chunks.iter().enumerate() {
584        if b.num_rows() == 0 {
585            continue;
586        }
587        parts[i % n_parts].push(b.clone());
588    }
589    parts.retain(|p| !p.is_empty());
590    let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
591
592    let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
593    let mem_mb: usize = chunks
594        .iter()
595        .flat_map(|b| b.columns().iter())
596        .map(|c| c.get_buffer_memory_size())
597        .sum::<usize>()
598        / 1_048_576;
599    log::info!(
600        "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
601        d.name,
602        d.source.kind.as_str(),
603        total_rows,
604        schema.columns.len(),
605        mem_mb,
606        chunks.len(),
607        index.len()
608    );
609
610    Ok((
611        DatasetState {
612            schema,
613            data: chunks,
614            arrow_schema: arrow_sch,
615            index,
616            lazy: false,
617        },
618        provider,
619    ))
620}
621
622/// Build a lazy state + `ListingTable` provider for a local parquet dataset.
623/// The dataset is never read into RAM; DataFusion streams row groups on
624/// each query. The returned `DatasetState.data` is an empty `Vec` —
625/// `arrow_schema` still carries the inferred Arrow schema for discovery.
626async fn build_lazy_local_parquet(
627    d: &DatasetConfig,
628    ctx: &SessionContext,
629) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
630    let (url, part_keys) = lazy_local_listing(d)?;
631
632    let mut opts =
633        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
634    if !part_keys.is_empty() {
635        opts = opts.with_table_partition_cols(
636            part_keys
637                .iter()
638                .map(|k| (k.clone(), DataType::Utf8))
639                .collect(),
640        );
641    }
642
643    let session_state = ctx.state();
644    // `infer_schema` returns the *file* schema (without partition columns);
645    // `ListingTable` appends the declared partition columns on top.
646    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
647        AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
648    })?;
649
650    let cfg = ListingTableConfig::new(url)
651        .with_listing_options(opts)
652        .with_schema(file_schema.clone());
653    let table = ListingTable::try_new(cfg).map_err(|e| {
654        AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
655    })?;
656    let provider: Arc<dyn TableProvider> = Arc::new(table);
657
658    // Discovery schema = file columns + partition columns (Utf8).
659    let mut fields: Vec<Field> = file_schema
660        .fields()
661        .iter()
662        .map(|f| f.as_ref().clone())
663        .collect();
664    for k in &part_keys {
665        if !fields.iter().any(|f| f.name() == k) {
666            fields.push(Field::new(k, DataType::Utf8, false));
667        }
668    }
669    let arrow_sch = Arc::new(Schema::new(fields));
670
671    let columns: Vec<ColumnInfo> = arrow_sch
672        .fields()
673        .iter()
674        .map(|f| {
675            let dt = f.data_type();
676            ColumnInfo {
677                name: f.name().clone(),
678                logical: arrow_to_logical(dt),
679                sql_type: format!("{dt:?}"),
680                nullable: f.is_nullable(),
681            }
682        })
683        .collect();
684    let schema = DatasetSchema::new(&d.name, columns);
685
686    log::info!(
687        "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
688        d.name,
689        d.source.kind.as_str(),
690        schema.columns.len(),
691        part_keys.len()
692    );
693
694    Ok((
695        DatasetState {
696            schema,
697            data: Vec::new(),
698            arrow_schema: arrow_sch,
699            index: EqIndex::new(),
700            lazy: true,
701        },
702        provider,
703    ))
704}
705
706/// Resolve a local lazy-parquet location into a single `ListingTableUrl`
707/// rooted at the dataset base plus the ordered hive partition keys (if any).
708/// Handles three shapes: a glob (`root/city=*/*.parquet`), a directory
709/// (hive root or flat folder of parquets), and a single `*.parquet` file.
710fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
711    let loc = &d.source.location;
712
713    if loc.contains('*') || loc.contains('?') || loc.contains('[') {
714        let parts: Vec<&str> = loc.split('/').collect();
715        let first_wild = parts
716            .iter()
717            .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
718            .unwrap_or(parts.len());
719        let base = parts[..first_wild].join("/");
720        let base = if base.is_empty() {
721            "/".to_string()
722        } else {
723            base
724        };
725        // Partition keys: `key=…` components between the base and the file
726        // pattern (the final component).
727        let upper = parts.len().saturating_sub(1);
728        let keys: Vec<String> = parts[first_wild.min(upper)..upper]
729            .iter()
730            .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
731            .filter(|k| !k.is_empty())
732            .collect();
733        return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
734    }
735
736    let path = std::path::Path::new(loc);
737    if path.is_dir() {
738        let keys = discover_hive_keys(path);
739        return Ok((dir_url(path, d)?, keys));
740    }
741
742    let url = ListingTableUrl::parse(loc)
743        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
744    Ok((url, Vec::new()))
745}
746
747/// Parse a directory path into a `ListingTableUrl` (trailing slash so
748/// DataFusion treats it as a directory root, not a single object).
749fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
750    let s = path.to_str().ok_or_else(|| {
751        AppError::Internal(format!(
752            "dataset '{}': non-utf8 path {}",
753            d.name,
754            path.display()
755        ))
756    })?;
757    let s = if s.ends_with('/') {
758        s.to_string()
759    } else {
760        format!("{s}/")
761    };
762    ListingTableUrl::parse(&s)
763        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
764}
765
766/// Walk down a directory following the first `key=value` subdirectory at
767/// each level to discover the ordered hive partition keys. Returns an empty
768/// vec for a flat (non-partitioned) folder.
769fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
770    let mut keys = Vec::new();
771    let mut cur = base.to_path_buf();
772    loop {
773        let Ok(rd) = std::fs::read_dir(&cur) else {
774            break;
775        };
776        let mut next: Option<(String, std::path::PathBuf)> = None;
777        for entry in rd.flatten() {
778            let p = entry.path();
779            if !p.is_dir() {
780                continue;
781            }
782            let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
783                continue;
784            };
785            if let Some((k, v)) = name.split_once('=')
786                && !k.is_empty()
787                && !v.is_empty()
788            {
789                next = Some((k.to_string(), p));
790                break;
791            }
792        }
793        match next {
794            Some((k, p)) => {
795                keys.push(k);
796                cur = p;
797            }
798            None => break,
799        }
800    }
801    keys
802}
803
804/// Lazy S3 parquet: register the dataset's S3 object store on `ctx`, then
805/// build a `ListingTable` rooted at the `s3://bucket/prefix/` location with
806/// any discovered hive partition columns. DataFusion does the directory
807/// listing through the registered store and streams row groups on each
808/// query — no local enumeration needed.
809async fn build_lazy_s3_parquet(
810    d: &DatasetConfig,
811    ctx: &SessionContext,
812) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
813    register_s3_object_store(d, ctx)?;
814
815    let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
816
817    // Discovery schema = file columns + partition columns (Utf8).
818    let mut fields: Vec<Field> = file_schema
819        .fields()
820        .iter()
821        .map(|f| f.as_ref().clone())
822        .collect();
823    for k in &part_keys {
824        if !fields.iter().any(|f| f.name() == k) {
825            fields.push(Field::new(k, DataType::Utf8, false));
826        }
827    }
828    let arrow_sch = Arc::new(Schema::new(fields));
829
830    let columns: Vec<ColumnInfo> = arrow_sch
831        .fields()
832        .iter()
833        .map(|f| {
834            let dt = f.data_type();
835            ColumnInfo {
836                name: f.name().clone(),
837                logical: arrow_to_logical(dt),
838                sql_type: format!("{dt:?}"),
839                nullable: f.is_nullable(),
840            }
841        })
842        .collect();
843    let schema = DatasetSchema::new(&d.name, columns);
844
845    log::info!(
846        "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
847        d.name,
848        d.source.kind.as_str(),
849        schema.columns.len(),
850        part_keys.len()
851    );
852
853    Ok((
854        DatasetState {
855            schema,
856            data: Vec::new(),
857            arrow_schema: arrow_sch,
858            index: EqIndex::new(),
859            lazy: true,
860        },
861        provider,
862    ))
863}
864
865/// Build a `ListingTable` provider for an S3 parquet source, resolving the
866/// base prefix and hive partition keys via [`s3_listing`]. The registered
867/// S3 object store must already be present on `ctx`. Returns the provider,
868/// the inferred *file* schema (without partition columns), and the ordered
869/// partition keys.
870async fn build_s3_listing_table(
871    d: &DatasetConfig,
872    ctx: &SessionContext,
873) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
874    let (url, part_keys) = s3_listing(d, ctx).await?;
875
876    let mut opts =
877        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
878    if !part_keys.is_empty() {
879        opts = opts.with_table_partition_cols(
880            part_keys
881                .iter()
882                .map(|k| (k.clone(), DataType::Utf8))
883                .collect(),
884        );
885    }
886
887    let session_state = ctx.state();
888    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
889        AppError::Internal(format!(
890            "dataset '{}': infer parquet schema on s3: {e}",
891            d.name
892        ))
893    })?;
894
895    let cfg = ListingTableConfig::new(url)
896        .with_listing_options(opts)
897        .with_schema(file_schema.clone());
898    let table = ListingTable::try_new(cfg).map_err(|e| {
899        AppError::Internal(format!(
900            "dataset '{}': ListingTable::try_new (s3): {e}",
901            d.name
902        ))
903    })?;
904    Ok((Arc::new(table), file_schema, part_keys))
905}
906
907/// Resolve an S3 parquet source into a `(base ListingTableUrl, partition
908/// keys)` pair. Hive keys come from the location glob when present
909/// (`s3://bucket/events/year=*/...`), otherwise — for a plain prefix — they
910/// are discovered by listing the registered object store. Honours the
911/// dataset's `partitioning` mode (`auto` / `hive` / `none`).
912async fn s3_listing(
913    d: &DatasetConfig,
914    ctx: &SessionContext,
915) -> Result<(ListingTableUrl, Vec<String>), AppError> {
916    let s3 = d.s3.clone().unwrap_or_default();
917    let want_partitions = !matches!(s3.partitioning, Partitioning::None);
918    let loc = &d.source.location;
919
920    if d.source.has_glob() {
921        let (base, keys) = split_glob_base_keys(loc);
922        let base = format!("{}/", base.trim_end_matches('/'));
923        let url = ListingTableUrl::parse(&base).map_err(|e| {
924            AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
925        })?;
926        let keys = if want_partitions { keys } else { Vec::new() };
927        return Ok((url, keys));
928    }
929
930    let base = if loc.ends_with('/') {
931        loc.clone()
932    } else {
933        format!("{loc}/")
934    };
935    let url = ListingTableUrl::parse(&base).map_err(|e| {
936        AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
937    })?;
938    let keys = if want_partitions {
939        discover_s3_hive_keys(ctx, &url).await
940    } else {
941        Vec::new()
942    };
943    Ok((url, keys))
944}
945
946/// Split a glob location into its non-wildcard base path and the ordered
947/// hive partition keys (`key=…` directory components between the base and
948/// the final file pattern). Works for both local and `s3://` paths.
949fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
950    let parts: Vec<&str> = loc.split('/').collect();
951    let first_wild = parts
952        .iter()
953        .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
954        .unwrap_or(parts.len());
955    let base = parts[..first_wild].join("/");
956    let base = if base.is_empty() {
957        "/".to_string()
958    } else {
959        base
960    };
961    let upper = parts.len().saturating_sub(1);
962    let keys: Vec<String> = parts[first_wild.min(upper)..upper]
963        .iter()
964        .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
965        .filter(|k| !k.is_empty())
966        .collect();
967    (base, keys)
968}
969
970/// Discover ordered hive partition keys for an S3 prefix by walking the
971/// object store one `key=value/` level at a time via delimiter listings.
972/// Best-effort: any listing error stops discovery and returns what was found
973/// so far (empty = treat as a flat folder).
974async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
975    let store = match ctx.runtime_env().object_store(url.object_store()) {
976        Ok(s) => s,
977        Err(_) => return Vec::new(),
978    };
979    let mut keys = Vec::new();
980    let mut prefix = url.prefix().clone();
981    loop {
982        let listing = match store.list_with_delimiter(Some(&prefix)).await {
983            Ok(l) => l,
984            Err(_) => break,
985        };
986        let mut next: Option<object_store::path::Path> = None;
987        for cp in &listing.common_prefixes {
988            if let Some(seg) = cp.parts().next_back() {
989                let seg = seg.as_ref().to_string();
990                if let Some((k, v)) = seg.split_once('=')
991                    && !k.is_empty()
992                    && !v.is_empty()
993                {
994                    keys.push(k.to_string());
995                    next = Some(cp.clone());
996                    break;
997                }
998            }
999        }
1000        match next {
1001            Some(p) => prefix = p,
1002            None => break,
1003        }
1004    }
1005    keys
1006}
1007
1008/// Original local-parquet code path — sync file I/O. We set a large reader
1009/// batch size so wide schemas (hundreds of columns) don't pay per-array
1010/// metadata overhead on thousands of small (default 1024-row) batches.
1011///
1012/// Two memory-saving knobs are applied here:
1013///
1014/// * **Column projection** — if `d.columns` is non-empty, only those
1015///   columns are decoded; everything else is skipped at the parquet reader
1016///   level (no Arrow array is ever allocated for the dropped columns).
1017/// * **Dictionary preservation** — Utf8 columns whose parquet column chunks
1018///   carry a dictionary page are materialised as Arrow
1019///   `Dictionary(Int32, Utf8)` instead of plain `Utf8`. Low-cardinality
1020///   string columns (state, country, severity, …) stay represented as
1021///   `n_unique` string slots plus an Int32 index per row instead of
1022///   `n_rows` independent strings — typically 10×–50× smaller for
1023///   real-world data.
1024fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1025    let files = d.resolve_local_parquet_files()?;
1026    let mut all = Vec::new();
1027    let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1028        None
1029    } else {
1030        Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1031    };
1032
1033    for f in &files {
1034        let file = std::fs::File::open(f)
1035            .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1036
1037        // First pass: peek the parquet metadata + default Arrow schema so we
1038        // can (a) decide a column projection and (b) override Utf8 columns
1039        // that are dictionary-encoded in the file so the reader materialises
1040        // them as Arrow Dictionary arrays instead of expanding to plain Utf8.
1041        let probe = ParquetRecordBatchReaderBuilder::try_new(
1042            file.try_clone()
1043                .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1044        )?;
1045        let parquet_schema = probe.parquet_schema().clone();
1046        let arrow_schema = probe.schema().clone();
1047        let metadata = probe.metadata().clone();
1048        drop(probe);
1049
1050        // Column projection (top-level / leaf indices for flat schemas).
1051        let projection = if let Some(w) = &wanted {
1052            let indices: Vec<usize> = arrow_schema
1053                .fields()
1054                .iter()
1055                .enumerate()
1056                .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1057                .map(|(i, _)| i)
1058                .collect();
1059            if indices.is_empty() {
1060                return Err(AppError::Internal(format!(
1061                    "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1062                    d.name,
1063                    d.columns,
1064                    f.display()
1065                )));
1066            }
1067            ProjectionMask::roots(&parquet_schema, indices)
1068        } else {
1069            ProjectionMask::all()
1070        };
1071
1072        // Dictionary override: any Utf8 column whose first row group carries
1073        // a dictionary page is re-typed to Dictionary(Int32, Utf8). The
1074        // override schema must still describe every column in the parquet
1075        // file (projection is applied separately). Skipped entirely when
1076        // the dataset has `dict_encode = false` — escape hatch for cases
1077        // where the override interacts badly with null propagation in the
1078        // downstream engine.
1079        let mut new_fields: Vec<Field> = arrow_schema
1080            .fields()
1081            .iter()
1082            .map(|f| f.as_ref().clone())
1083            .collect();
1084        if d.dict_encode
1085            && let Some(rg0) = metadata.row_groups().first()
1086        {
1087            for (i, fld) in arrow_schema.fields().iter().enumerate() {
1088                if !matches!(
1089                    fld.data_type(),
1090                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1091                ) {
1092                    continue;
1093                }
1094                if let Some(col) = rg0.columns().get(i)
1095                    && col.dictionary_page_offset().is_some()
1096                {
1097                    new_fields[i] = Field::new(
1098                        fld.name(),
1099                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1100                        fld.is_nullable(),
1101                    );
1102                }
1103            }
1104        }
1105        let forced_schema = Arc::new(Schema::new(new_fields));
1106
1107        let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1108        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1109            .with_batch_size(65_536)
1110            .with_projection(projection)
1111            .build()?;
1112        // Hive-style partition columns (`city=NYC/…`) live in the path, not
1113        // the file. Fold them in as constant Utf8 columns so they show up in
1114        // the schema and are queryable — matching the DuckDB backend.
1115        let pairs = hive_pairs(f);
1116        for batch in reader {
1117            let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1118            all.push(if pairs.is_empty() {
1119                batch
1120            } else {
1121                append_partition_cols(&batch, &pairs)?
1122            });
1123        }
1124    }
1125    if all.is_empty() {
1126        return Err(AppError::Internal(format!(
1127            "dataset '{}': parquet source is empty",
1128            d.name
1129        )));
1130    }
1131    Ok(all)
1132}
1133
1134/// Ordered hive-style partition `(key, value)` pairs encoded in a path, i.e.
1135/// directory components shaped like `key=value` (e.g. `year=2024/city=NYC`).
1136fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1137    path.components()
1138        .filter_map(|c| c.as_os_str().to_str())
1139        .filter_map(|seg| {
1140            let (k, v) = seg.split_once('=')?;
1141            if k.is_empty() || v.is_empty() || v.contains('=') {
1142                return None;
1143            }
1144            Some((k.to_string(), v.to_string()))
1145        })
1146        .collect()
1147}
1148
1149/// Append constant Utf8 columns for each hive partition pair. A partition
1150/// key that collides with a real file column is skipped (the file wins).
1151fn append_partition_cols(
1152    batch: &RecordBatch,
1153    pairs: &[(String, String)],
1154) -> Result<RecordBatch, AppError> {
1155    let n = batch.num_rows();
1156    let mut fields: Vec<Field> = batch
1157        .schema()
1158        .fields()
1159        .iter()
1160        .map(|f| f.as_ref().clone())
1161        .collect();
1162    let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1163    for (k, v) in pairs {
1164        if fields.iter().any(|f| f.name() == k) {
1165            continue;
1166        }
1167        fields.push(Field::new(k, DataType::Utf8, false));
1168        cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1169    }
1170    RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1171        .map_err(|e| AppError::Internal(e.to_string()))
1172}
1173
1174/// Register an `AmazonS3` object store on the SessionContext, build a
1175/// `ListingTable` (with any hive partition columns) over the dataset prefix,
1176/// and stream the whole dataset back through `DataFrame::collect`. Using a
1177/// ListingTable here — rather than a bare `read_parquet` — means S3 sources
1178/// get the same hive-partition handling as local parquet.
1179async fn read_s3_parquet(
1180    d: &DatasetConfig,
1181    ctx: &SessionContext,
1182) -> Result<Vec<RecordBatch>, AppError> {
1183    register_s3_object_store(d, ctx)?;
1184    let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1185    let df = ctx
1186        .read_table(provider)
1187        .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1188    Ok(df.collect().await?)
1189}
1190
1191/// Open a Delta table (local or S3) and stream every row back as a Vec of
1192/// `RecordBatch`. We materialise eagerly so the rest of the backend can
1193/// treat all datasets uniformly (single in-memory batch + eq-index).
1194async fn read_delta(
1195    d: &DatasetConfig,
1196    opts: HashMap<String, String>,
1197) -> Result<Vec<RecordBatch>, AppError> {
1198    let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1199        AppError::Internal(format!(
1200            "dataset '{}': bad delta location '{}': {e}",
1201            d.name, d.source.location
1202        ))
1203    })?;
1204    let table = deltalake::open_table_with_storage_options(url, opts)
1205        .await
1206        .map_err(|e| {
1207            AppError::Internal(format!(
1208                "dataset '{}': delta open '{}': {e}",
1209                d.name, d.source.location
1210            ))
1211        })?;
1212    let provider = table.table_provider().await.map_err(|e| {
1213        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1214    })?;
1215    // Drive a full scan via a throwaway SessionContext so we end up with
1216    // an in-memory Vec<RecordBatch> the shared materialise path can use.
1217    let scan_ctx = SessionContext::new();
1218    let df = scan_ctx
1219        .read_table(provider)
1220        .map_err(|e| AppError::Internal(format!("dataset '{}': delta read_table: {e}", d.name)))?;
1221    Ok(df.collect().await?)
1222}
1223
1224/// Build the storage-options HashMap that `deltalake::open_table_with_storage_options`
1225/// expects for S3 access. Keys mirror the AWS env-var names; deltalake
1226/// passes them through to object_store internally.
1227fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1228    let creds = d.resolved_creds();
1229    let region = d.resolved_region();
1230    let s3 = d.s3.clone().unwrap_or_default();
1231    let (bucket, _) = d.source.s3_bucket()?;
1232
1233    let mut opts = HashMap::new();
1234    opts.insert("AWS_REGION".into(), region);
1235    if let Some(ep) = s3.effective_endpoint(bucket) {
1236        opts.insert("AWS_ENDPOINT_URL".into(), ep);
1237    }
1238    if s3.allow_http {
1239        opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1240    }
1241    opts.insert(
1242        "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1243        (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1244    );
1245    if let Some(k) = creds.access_key_id {
1246        opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1247    }
1248    if let Some(s) = creds.secret_access_key {
1249        opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1250    }
1251    if let Some(t) = creds.session_token {
1252        opts.insert("AWS_SESSION_TOKEN".into(), t);
1253    }
1254    // Read-only paths don't need the S3 lock-provider plumbing.
1255    opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1256    Ok(opts)
1257}
1258
1259/// Construct an `AmazonS3` object_store from the dataset's `[dataset.s3]`
1260/// block + resolved credentials and register it on `ctx` under
1261/// `s3://bucket/`.
1262fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1263    let (bucket, _key) = d.source.s3_bucket()?;
1264    let creds = d.resolved_creds();
1265    let region = d.resolved_region();
1266    let s3 = d.s3.clone().unwrap_or_default();
1267
1268    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1269        AppError::Internal(format!(
1270            "dataset '{}': build S3 store for '{bucket}': {e}",
1271            d.name
1272        ))
1273    })?;
1274
1275    let url = Url::parse(&format!("s3://{bucket}"))
1276        .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1277    ctx.register_object_store(&url, Arc::new(store));
1278    Ok(())
1279}
1280
1281fn build_s3(
1282    bucket: &str,
1283    region: &str,
1284    s3: &S3Config,
1285    creds: &ResolvedCreds,
1286) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1287    let mut b = AmazonS3Builder::new()
1288        .with_bucket_name(bucket)
1289        .with_region(region)
1290        .with_allow_http(s3.allow_http)
1291        .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1292    if let Some(ep) = s3.effective_endpoint(bucket) {
1293        b = b.with_endpoint(ep);
1294    }
1295    if let Some(k) = creds.access_key_id.as_deref() {
1296        b = b.with_access_key_id(k);
1297    }
1298    if let Some(s) = creds.secret_access_key.as_deref() {
1299        b = b.with_secret_access_key(s);
1300    }
1301    if let Some(t) = creds.session_token.as_deref() {
1302        b = b.with_token(t);
1303    }
1304    b.build()
1305}
1306
1307fn arrow_to_logical(dt: &DataType) -> LogicalType {
1308    match dt {
1309        DataType::Boolean => LogicalType::Bool,
1310        DataType::Int8
1311        | DataType::Int16
1312        | DataType::Int32
1313        | DataType::Int64
1314        | DataType::UInt8
1315        | DataType::UInt16
1316        | DataType::UInt32
1317        | DataType::UInt64 => LogicalType::Int,
1318        DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1319        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1320        // Dictionary-encoded strings are reported as plain strings — clients
1321        // (and the rest of the backend) shouldn't have to care that we keep
1322        // a compressed representation in memory.
1323        DataType::Dictionary(_, v)
1324            if matches!(
1325                v.as_ref(),
1326                DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1327            ) =>
1328        {
1329            LogicalType::Utf8
1330        }
1331        DataType::Date32
1332        | DataType::Date64
1333        | DataType::Time32(_)
1334        | DataType::Time64(_)
1335        | DataType::Timestamp(_, _)
1336        | DataType::Duration(_)
1337        | DataType::Interval(_) => LogicalType::Temporal,
1338        _ => LogicalType::Other,
1339    }
1340}
1341
1342// ---------------------------------------------------------------------------
1343// Per-batch projection
1344// ---------------------------------------------------------------------------
1345
1346fn project(
1347    schema: &DatasetSchema,
1348    batch: RecordBatch,
1349    columns: &[String],
1350) -> Result<RecordBatch, AppError> {
1351    if columns.is_empty() {
1352        return Ok(batch);
1353    }
1354    let indices: Vec<usize> = columns
1355        .iter()
1356        .map(|c| {
1357            schema
1358                .find(c)
1359                .map(|info| schema.by_name[&info.name.to_lowercase()])
1360        })
1361        .collect::<Result<_, _>>()?;
1362    let fields: Vec<Field> = indices
1363        .iter()
1364        .map(|&i| batch.schema().field(i).clone())
1365        .collect();
1366    let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1367    Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1368}
1369
1370// ---------------------------------------------------------------------------
1371// SQL builder
1372// ---------------------------------------------------------------------------
1373
1374/// Accumulates the typed literal values for a parameterised query.
1375///
1376/// Predicate values are never interpolated into the SQL text. Instead each
1377/// value is pushed here and the builder emits a positional placeholder
1378/// (`$1`, `$2`, …) referencing it. The collected [`ScalarValue`]s are bound
1379/// to the logical plan via [`DataFrame::with_param_values`], so user input
1380/// reaches the engine as typed scalars and can never alter the query
1381/// structure (no SQL injection surface, no escaping to get wrong).
1382#[derive(Default)]
1383struct Params {
1384    values: Vec<ScalarValue>,
1385}
1386
1387impl Params {
1388    fn new() -> Self {
1389        Self::default()
1390    }
1391
1392    /// Bind `v` and return its `$N` placeholder token.
1393    fn bind(&mut self, v: ScalarValue) -> String {
1394        self.values.push(v);
1395        format!("${}", self.values.len())
1396    }
1397
1398    fn into_values(self) -> Vec<ScalarValue> {
1399        self.values
1400    }
1401}
1402
1403fn build_query_sql(
1404    schema: &DatasetSchema,
1405    req: &QueryRequest,
1406    max_page_size: u64,
1407) -> Result<(String, Vec<ScalarValue>), AppError> {
1408    let (limit, offset) = req.effective_limit_offset(max_page_size);
1409    build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1410}
1411
1412fn build_query_stream_sql(
1413    schema: &DatasetSchema,
1414    req: &QueryRequest,
1415) -> Result<(String, Vec<ScalarValue>), AppError> {
1416    let suffix = req
1417        .limit
1418        .map(|limit| format!(" LIMIT {limit}"))
1419        .unwrap_or_default();
1420    build_query_sql_with_suffix(schema, req, &suffix)
1421}
1422
1423fn build_query_sql_with_suffix(
1424    schema: &DatasetSchema,
1425    req: &QueryRequest,
1426    suffix: &str,
1427) -> Result<(String, Vec<ScalarValue>), AppError> {
1428    let agg_plan = req.agg_plan(schema)?;
1429
1430    let cols = if let Some(plan) = &agg_plan {
1431        // Group cols, then aggregations, each aliased to the JSON output key.
1432        let mut parts: Vec<String> = plan
1433            .group_cols
1434            .iter()
1435            .map(|c| DatasetSchema::quote_ident(c))
1436            .collect();
1437        for a in &plan.aggs {
1438            let expr = a.sql_expr()?;
1439            parts.push(format!(
1440                "{expr} AS {}",
1441                DatasetSchema::quote_ident(&a.alias)
1442            ));
1443        }
1444        parts.join(", ")
1445    } else if req.columns.is_empty() {
1446        if req.distinct {
1447            "DISTINCT *".to_string()
1448        } else {
1449            "*".to_string()
1450        }
1451    } else {
1452        let list = req
1453            .columns
1454            .iter()
1455            .map(|c| {
1456                schema
1457                    .find(c)
1458                    .map(|info| DatasetSchema::quote_ident(&info.name))
1459            })
1460            .collect::<Result<Vec<_>, _>>()?
1461            .join(", ");
1462        if req.distinct {
1463            format!("DISTINCT {list}")
1464        } else {
1465            list
1466        }
1467    };
1468
1469    let mut params = Params::new();
1470    let clauses: Vec<String> = req
1471        .predicates
1472        .iter()
1473        .map(|p| pred_to_sql(schema, p, &mut params))
1474        .collect::<Result<_, _>>()?;
1475
1476    let table = DatasetSchema::quote_ident(&schema.name);
1477    let where_clause = if clauses.is_empty() {
1478        String::new()
1479    } else {
1480        format!(" WHERE {}", clauses.join(" AND "))
1481    };
1482    let group_clause = match &agg_plan {
1483        Some(p) => format!(
1484            " GROUP BY {}",
1485            p.group_cols
1486                .iter()
1487                .map(|c| DatasetSchema::quote_ident(c))
1488                .collect::<Vec<_>>()
1489                .join(", "),
1490        ),
1491        None => String::new(),
1492    };
1493    let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
1494        Some(s) => format!(" ORDER BY {s}"),
1495        None => String::new(),
1496    };
1497    let sql =
1498        format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{order_clause}{suffix}");
1499    Ok((sql, params.into_values()))
1500}
1501
1502fn build_count_sql(
1503    schema: &DatasetSchema,
1504    predicates: &[Predicate],
1505) -> Result<(String, Vec<ScalarValue>), AppError> {
1506    let mut params = Params::new();
1507    let clauses: Vec<String> = predicates
1508        .iter()
1509        .map(|p| pred_to_sql(schema, p, &mut params))
1510        .collect::<Result<_, _>>()?;
1511    let table = DatasetSchema::quote_ident(&schema.name);
1512    let where_clause = if clauses.is_empty() {
1513        String::new()
1514    } else {
1515        format!(" WHERE {}", clauses.join(" AND "))
1516    };
1517    let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
1518    Ok((sql, params.into_values()))
1519}
1520
1521fn pred_to_sql(
1522    schema: &DatasetSchema,
1523    pred: &Predicate,
1524    params: &mut Params,
1525) -> Result<String, AppError> {
1526    let info = schema.find(&pred.col)?;
1527    let col = DatasetSchema::quote_ident(&info.name);
1528
1529    match pred.op.as_str() {
1530        "is_null" => return Ok(format!("{col} IS NULL")),
1531        "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
1532        _ => {}
1533    }
1534
1535    let val = pred
1536        .val
1537        .as_ref()
1538        .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
1539
1540    if pred.op == "in" {
1541        let items = val
1542            .as_array()
1543            .filter(|a| !a.is_empty())
1544            .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
1545        let placeholders: Vec<String> = items
1546            .iter()
1547            .map(|item| Ok(params.bind(json_to_scalar(item)?)))
1548            .collect::<Result<_, AppError>>()?;
1549        return Ok(format!("{col} IN ({})", placeholders.join(", ")));
1550    }
1551
1552    let sql_op = match pred.op.as_str() {
1553        "eq" => "=",
1554        "neq" => "!=",
1555        "gt" => ">",
1556        "gte" => ">=",
1557        "lt" => "<",
1558        "lte" => "<=",
1559        "like" => "LIKE",
1560        "ilike" => "ILIKE",
1561        other => return Err(AppError::UnknownOperator(other.into())),
1562    };
1563    let placeholder = params.bind(json_to_scalar(val)?);
1564    Ok(format!("{col} {sql_op} {placeholder}"))
1565}
1566
1567/// Convert a JSON predicate value into a typed Arrow [`ScalarValue`] for
1568/// binding as a query parameter. The engine applies the usual numeric
1569/// widening / comparison coercion against the target column type.
1570fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
1571    match val {
1572        JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
1573        JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
1574        JsonValue::Null => Ok(ScalarValue::Null),
1575        JsonValue::Number(n) => {
1576            if let Some(i) = n.as_i64() {
1577                Ok(ScalarValue::Int64(Some(i)))
1578            } else if let Some(u) = n.as_u64() {
1579                Ok(ScalarValue::UInt64(Some(u)))
1580            } else if let Some(f) = n.as_f64() {
1581                Ok(ScalarValue::Float64(Some(f)))
1582            } else {
1583                Err(AppError::InvalidValue(
1584                    "unsupported numeric literal in predicate".into(),
1585                ))
1586            }
1587        }
1588        _ => Err(AppError::InvalidValue(
1589            "unsupported literal type in predicate".into(),
1590        )),
1591    }
1592}
1593
1594// ---------------------------------------------------------------------------
1595// Equality index — built once at startup, queried on every predicate request
1596// ---------------------------------------------------------------------------
1597
1598fn json_index_key(val: &JsonValue) -> Option<String> {
1599    match val {
1600        JsonValue::String(s) => Some(s.clone()),
1601        JsonValue::Number(n) => Some(n.to_string()),
1602        JsonValue::Bool(b) => Some(b.to_string()),
1603        _ => None,
1604    }
1605}
1606
1607fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1608    let mut out = Vec::new();
1609    let (mut i, mut j) = (0, 0);
1610    while i < a.len() && j < b.len() {
1611        match a[i].cmp(&b[j]) {
1612            Ordering::Equal => {
1613                out.push(a[i]);
1614                i += 1;
1615                j += 1;
1616            }
1617            Ordering::Less => i += 1,
1618            Ordering::Greater => j += 1,
1619        }
1620    }
1621    out
1622}
1623
1624fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1625    let mut out = Vec::with_capacity(a.len() + b.len());
1626    let (mut i, mut j) = (0, 0);
1627    while i < a.len() && j < b.len() {
1628        match a[i].cmp(&b[j]) {
1629            Ordering::Less => {
1630                out.push(a[i]);
1631                i += 1;
1632            }
1633            Ordering::Greater => {
1634                out.push(b[j]);
1635                j += 1;
1636            }
1637            Ordering::Equal => {
1638                out.push(a[i]);
1639                i += 1;
1640                j += 1;
1641            }
1642        }
1643    }
1644    out.extend_from_slice(&a[i..]);
1645    out.extend_from_slice(&b[j..]);
1646    out
1647}
1648
1649fn try_index(index: &EqIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
1650    if predicates.is_empty() || index.is_empty() {
1651        return None;
1652    }
1653
1654    let mut result: Option<Vec<u32>> = None;
1655    for pred in predicates {
1656        let col_lower = pred.col.to_lowercase();
1657        let col_map = index.get(&col_lower)?;
1658
1659        let rows: Vec<u32> = match pred.op.as_str() {
1660            "eq" => {
1661                let key = json_index_key(pred.val.as_ref()?)?;
1662                col_map.get(&key).cloned().unwrap_or_default()
1663            }
1664            "in" => {
1665                let items = pred.val.as_ref()?.as_array()?;
1666                let mut merged: Vec<u32> = Vec::new();
1667                for item in items {
1668                    if let Some(r) = col_map.get(&json_index_key(item)?) {
1669                        merged = union_sorted(&merged, r);
1670                    }
1671                }
1672                merged
1673            }
1674            _ => return None,
1675        };
1676
1677        result = Some(match result {
1678            None => rows,
1679            Some(r) => intersect_sorted(&r, &rows),
1680        });
1681    }
1682    result
1683}
1684
1685/// Return rows `[offset, offset+limit)` from a chunked dataset by slicing
1686/// the underlying batches (zero-copy) and concatenating the (small) page.
1687fn slice_global(
1688    chunks: &[RecordBatch],
1689    schema: &Arc<Schema>,
1690    offset: usize,
1691    limit: usize,
1692) -> Result<RecordBatch, AppError> {
1693    if limit == 0 || chunks.is_empty() {
1694        return Ok(RecordBatch::new_empty(schema.clone()));
1695    }
1696    let mut out = Vec::new();
1697    let mut to_skip = offset;
1698    let mut remaining = limit;
1699    for b in chunks {
1700        if remaining == 0 {
1701            break;
1702        }
1703        let n = b.num_rows();
1704        if to_skip >= n {
1705            to_skip -= n;
1706            continue;
1707        }
1708        let take = remaining.min(n - to_skip);
1709        out.push(b.slice(to_skip, take));
1710        to_skip = 0;
1711        remaining -= take;
1712    }
1713    if out.is_empty() {
1714        return Ok(RecordBatch::new_empty(schema.clone()));
1715    }
1716    compute::concat_batches(schema, out.iter()).map_err(AppError::from)
1717}
1718
1719/// Materialise the page `rows[offset..offset+limit]` from a chunked dataset.
1720/// Row ids are global (across the concatenation of all chunks). We map each
1721/// requested row to its (chunk, local-index), `take` per chunk, then stitch
1722/// the per-chunk results back together preserving the original row order.
1723fn take_page(
1724    chunks: &[RecordBatch],
1725    schema: &Arc<Schema>,
1726    rows: &[u32],
1727    offset: usize,
1728    limit: usize,
1729) -> Result<RecordBatch, AppError> {
1730    let start = offset.min(rows.len());
1731    let len = limit.min(rows.len() - start);
1732    if len == 0 || chunks.is_empty() {
1733        return Ok(RecordBatch::new_empty(schema.clone()));
1734    }
1735
1736    // Prefix-sum table: `offsets[i]` is the first global row id of chunk `i`,
1737    // and `offsets.last()` is the total row count.
1738    let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
1739    let mut acc: u32 = 0;
1740    offsets.push(0);
1741    for b in chunks {
1742        acc = acc
1743            .checked_add(b.num_rows() as u32)
1744            .expect("row count exceeds u32::MAX");
1745        offsets.push(acc);
1746    }
1747
1748    // Bucket each global row id into the chunk that contains it, remembering
1749    // the original output position so we can restore page order at the end.
1750    let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
1751    for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
1752        let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
1753        let local = gid - offsets[bi];
1754        buckets[bi].push((out_pos as u32, local));
1755    }
1756
1757    // Per-chunk take, recording the destination index for each emitted row.
1758    let mut takens: Vec<RecordBatch> = Vec::new();
1759    let mut dest: Vec<u32> = Vec::with_capacity(len);
1760    for (bi, bucket) in buckets.iter().enumerate() {
1761        if bucket.is_empty() {
1762            continue;
1763        }
1764        let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
1765        let cols: Vec<ArrayRef> = chunks[bi]
1766            .columns()
1767            .iter()
1768            .map(|c| {
1769                arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
1770                    .map_err(AppError::from)
1771            })
1772            .collect::<Result<_, _>>()?;
1773        takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
1774        dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
1775    }
1776
1777    // Stitch per-chunk results then permute to restore the requested order.
1778    let stitched = compute::concat_batches(schema, takens.iter())?;
1779    let mut inv = vec![0u32; len];
1780    for (i, &d) in dest.iter().enumerate() {
1781        inv[d as usize] = i as u32;
1782    }
1783    let perm = UInt32Array::from(inv);
1784    let cols: Vec<ArrayRef> = stitched
1785        .columns()
1786        .iter()
1787        .map(|c| {
1788            arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
1789                .map_err(AppError::from)
1790        })
1791        .collect::<Result<_, _>>()?;
1792    RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
1793}
1794
1795/// Build the equality index per the dataset's policy, against the chunked
1796/// representation. Row ids are global across the concatenation of all
1797/// chunks (so they remain compatible with `take_page` / `slice_global`).
1798fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
1799    use rayon::prelude::*;
1800
1801    if cfg.mode == IndexMode::None || chunks.is_empty() {
1802        return EqIndex::new();
1803    }
1804
1805    let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
1806        Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
1807    } else {
1808        None
1809    };
1810
1811    let max_card = if cfg.mode == IndexMode::Auto {
1812        Some(cfg.max_cardinality)
1813    } else {
1814        None
1815    };
1816
1817    // Per-chunk starting global row id.
1818    let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
1819    let mut acc: u32 = 0;
1820    for b in chunks {
1821        batch_offsets.push(acc);
1822        acc = acc
1823            .checked_add(b.num_rows() as u32)
1824            .expect("row count exceeds u32::MAX");
1825    }
1826
1827    let schema = chunks[0].schema();
1828
1829    schema
1830        .fields()
1831        .par_iter()
1832        .enumerate()
1833        .filter_map(|(ci, field)| {
1834            let col_lower = field.name().to_lowercase();
1835            if let Some(a) = &allow
1836                && !a.contains_key(&col_lower)
1837            {
1838                return None;
1839            }
1840
1841            // Only build for index-friendly types; skip everything else
1842            // up-front so we don't pay the per-chunk dispatch cost.
1843            let dtype = field.data_type();
1844            let dict_utf8 = matches!(dtype,
1845                DataType::Dictionary(k, v)
1846                    if matches!(k.as_ref(), DataType::Int32)
1847                    && matches!(v.as_ref(), DataType::Utf8));
1848            match dtype {
1849                DataType::Utf8
1850                | DataType::Utf8View
1851                | DataType::Boolean
1852                | DataType::Int8
1853                | DataType::Int16
1854                | DataType::Int32
1855                | DataType::Int64 => {}
1856                _ if dict_utf8 => {}
1857                _ => return None,
1858            }
1859
1860            let mut map: HashMap<String, Vec<u32>> = HashMap::new();
1861
1862            for (bi, batch) in chunks.iter().enumerate() {
1863                let base = batch_offsets[bi];
1864                let col = batch.column(ci);
1865
1866                macro_rules! index_col {
1867                    ($arr_ty:ty) => {{
1868                        let arr = col.as_any().downcast_ref::<$arr_ty>()?;
1869                        for row in 0..arr.len() {
1870                            if arr.is_null(row) {
1871                                continue;
1872                            }
1873                            let key = arr.value(row).to_string();
1874                            let gid = base + row as u32;
1875                            if let Some(v) = map.get_mut(&key) {
1876                                v.push(gid);
1877                            } else {
1878                                if let Some(mc) = max_card {
1879                                    if map.len() >= mc {
1880                                        return None;
1881                                    }
1882                                }
1883                                map.insert(key, vec![gid]);
1884                            }
1885                        }
1886                    }};
1887                }
1888
1889                if dict_utf8 {
1890                    // Dictionary(Int32, Utf8): iterate keys + look up the
1891                    // string value from the (small) dictionary. We allocate
1892                    // the key string only when the value is new — repeated
1893                    // values reuse the existing HashMap entry by hash, but
1894                    // `HashMap::get_mut` still needs the key, so we use a
1895                    // borrowed lookup via `get` first to avoid the alloc.
1896                    let arr = col
1897                        .as_any()
1898                        .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
1899                        )?;
1900                    let keys = arr.keys();
1901                    let values = arr.values().as_any().downcast_ref::<StringArray>()?;
1902                    for row in 0..arr.len() {
1903                        if arr.is_null(row) {
1904                            continue;
1905                        }
1906                        let k = keys.value(row) as usize;
1907                        let s = values.value(k);
1908                        let gid = base + row as u32;
1909                        if let Some(v) = map.get_mut(s) {
1910                            v.push(gid);
1911                        } else {
1912                            if let Some(mc) = max_card
1913                                && map.len() >= mc
1914                            {
1915                                return None;
1916                            }
1917                            map.insert(s.to_string(), vec![gid]);
1918                        }
1919                    }
1920                } else {
1921                    match dtype {
1922                        DataType::Utf8 => index_col!(StringArray),
1923                        DataType::Utf8View => index_col!(StringViewArray),
1924                        DataType::Boolean => index_col!(BooleanArray),
1925                        DataType::Int8 => index_col!(Int8Array),
1926                        DataType::Int16 => index_col!(Int16Array),
1927                        DataType::Int32 => index_col!(Int32Array),
1928                        DataType::Int64 => index_col!(Int64Array),
1929                        _ => unreachable!(),
1930                    }
1931                }
1932            }
1933
1934            Some((col_lower, map))
1935        })
1936        .collect()
1937}
1938
1939// ---------------------------------------------------------------------------
1940// Serialise-time temporal cast: convert Timestamp/Date/Time columns to Utf8
1941// on the page batch right before JSON encoding. We deliberately do **not**
1942// pay this cost at load time — a `Date32` is 4 bytes per row, its ISO-8601
1943// rendering is ~10–24 bytes per row, and a wide dataset full of temporal
1944// columns would balloon resident RAM. The cast is applied per returned page
1945// after pagination, so the cost is paid only for rows the caller requested.
1946// ---------------------------------------------------------------------------
1947
1948/// Returns true for Arrow types that `write_value` can render directly. Any
1949/// type returning false is pre-cast to Utf8 in [`cast_for_serialize`] so the
1950/// JSON output is faithful rather than silently `null`.
1951fn writable_inline(dt: &DataType) -> bool {
1952    match dt {
1953        DataType::Utf8
1954        | DataType::LargeUtf8
1955        | DataType::Utf8View
1956        | DataType::Boolean
1957        | DataType::Int8
1958        | DataType::Int16
1959        | DataType::Int32
1960        | DataType::Int64
1961        | DataType::UInt8
1962        | DataType::UInt16
1963        | DataType::UInt32
1964        | DataType::UInt64
1965        | DataType::Float32
1966        | DataType::Float64
1967        | DataType::Decimal128(_, _)
1968        | DataType::Decimal256(_, _) => true,
1969        DataType::Dictionary(k, v)
1970            if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
1971        {
1972            true
1973        }
1974        _ => false,
1975    }
1976}
1977
1978/// Cast any column whose dtype isn't directly writable by `write_value` to
1979/// `Utf8`, on the bounded page batch. Covers temporals (Timestamp/Date/Time)
1980/// — kept native in resident memory to save RAM — and also any exotic dtype
1981/// (Float16, Binary, List, Struct, Decimal-with-unsupported-precision, …)
1982/// so the JSON serializer never falls back to writing literal `null`.
1983fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
1984    let schema = batch.schema();
1985    let to_cast: Vec<usize> = schema
1986        .fields()
1987        .iter()
1988        .enumerate()
1989        .filter_map(|(i, f)| {
1990            if writable_inline(f.data_type()) {
1991                None
1992            } else {
1993                Some(i)
1994            }
1995        })
1996        .collect();
1997    if to_cast.is_empty() {
1998        return Ok(batch.clone());
1999    }
2000    let new_fields: Vec<Field> = schema
2001        .fields()
2002        .iter()
2003        .enumerate()
2004        .map(|(i, f)| {
2005            if to_cast.contains(&i) {
2006                Field::new(f.name(), DataType::Utf8, f.is_nullable())
2007            } else {
2008                f.as_ref().clone()
2009            }
2010        })
2011        .collect();
2012    let new_schema = Arc::new(Schema::new(new_fields));
2013    let cols: Vec<ArrayRef> = batch
2014        .columns()
2015        .iter()
2016        .enumerate()
2017        .map(|(i, c)| {
2018            if to_cast.contains(&i) {
2019                compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2020            } else {
2021                Ok(c.clone())
2022            }
2023        })
2024        .collect::<Result<_, _>>()?;
2025    RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2026}
2027
2028// ---------------------------------------------------------------------------
2029// Compute helpers — retained for symmetry; reserved for future inline scan
2030// path. Currently the engine fallback handles all non-index queries.
2031// ---------------------------------------------------------------------------
2032
2033#[allow(dead_code)]
2034#[derive(Clone, Copy)]
2035enum CmpOp {
2036    Eq,
2037    Neq,
2038    Gt,
2039    Gte,
2040    Lt,
2041    Lte,
2042    Like,
2043    ILike,
2044}
2045
2046#[allow(dead_code)]
2047fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2048    let arr = col
2049        .as_any()
2050        .downcast_ref::<StringArray>()
2051        .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2052    let s = Scalar::new(StringArray::from(vec![val]));
2053    Ok(eq(arr, &s)?)
2054}
2055
2056#[allow(dead_code)]
2057fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2058    macro_rules! num_cmp {
2059        ($arr_type:ty, $cast:ty) => {{
2060            let n = val
2061                .as_f64()
2062                .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2063                as $cast;
2064            let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2065            let s = Scalar::new(<$arr_type>::from(vec![n]));
2066            Ok(match op {
2067                CmpOp::Eq => eq(arr, &s)?,
2068                CmpOp::Neq => neq(arr, &s)?,
2069                CmpOp::Gt => gt(arr, &s)?,
2070                CmpOp::Gte => gt_eq(arr, &s)?,
2071                CmpOp::Lt => lt(arr, &s)?,
2072                CmpOp::Lte => lt_eq(arr, &s)?,
2073                CmpOp::Like | CmpOp::ILike => {
2074                    return Err(AppError::InvalidValue(
2075                        "LIKE requires a string column".into(),
2076                    ));
2077                }
2078            })
2079        }};
2080    }
2081    match col.data_type() {
2082        DataType::Utf8 => {
2083            let s = val
2084                .as_str()
2085                .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2086            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2087            let sc = Scalar::new(StringArray::from(vec![s]));
2088            Ok(match op {
2089                CmpOp::Eq => eq(arr, &sc)?,
2090                CmpOp::Neq => neq(arr, &sc)?,
2091                CmpOp::Gt => gt(arr, &sc)?,
2092                CmpOp::Gte => gt_eq(arr, &sc)?,
2093                CmpOp::Lt => lt(arr, &sc)?,
2094                CmpOp::Lte => lt_eq(arr, &sc)?,
2095                CmpOp::Like => compute::like(arr, &sc)?,
2096                CmpOp::ILike => compute::ilike(arr, &sc)?,
2097            })
2098        }
2099        DataType::Int8 => num_cmp!(Int8Array, i8),
2100        DataType::Int16 => num_cmp!(Int16Array, i16),
2101        DataType::Int32 => num_cmp!(Int32Array, i32),
2102        DataType::Int64 => num_cmp!(Int64Array, i64),
2103        DataType::Float32 => num_cmp!(Float32Array, f32),
2104        DataType::Float64 => num_cmp!(Float64Array, f64),
2105        dt => Err(AppError::InvalidValue(format!(
2106            "unsupported type for comparison: {dt:?}"
2107        ))),
2108    }
2109}
2110
2111// ---------------------------------------------------------------------------
2112// Serialisation
2113// ---------------------------------------------------------------------------
2114
2115pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2116    // Temporal columns are kept native in resident memory (compact). Cast
2117    // them — plus any other dtype `write_value` can't render directly — to
2118    // Utf8 here, on the bounded page batch, so the JSON output is faithful
2119    // without paying the load-time RAM cost.
2120    let batch = cast_for_serialize(batch)?;
2121    let schema = batch.schema();
2122    let n_rows = batch.num_rows();
2123
2124    let keys: Vec<Vec<u8>> = schema
2125        .fields()
2126        .iter()
2127        .map(|f| {
2128            let mut k = Vec::with_capacity(f.name().len() + 3);
2129            k.push(b'"');
2130            k.extend_from_slice(f.name().as_bytes());
2131            k.extend_from_slice(b"\":");
2132            k
2133        })
2134        .collect();
2135
2136    let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2137    buf.push(b'[');
2138
2139    for row in 0..n_rows {
2140        if row > 0 {
2141            buf.push(b',');
2142        }
2143        buf.push(b'{');
2144        for (i, key) in keys.iter().enumerate() {
2145            if i > 0 {
2146                buf.push(b',');
2147            }
2148            buf.extend_from_slice(key);
2149            let col = batch.column(i);
2150            if col.is_null(row) {
2151                buf.extend_from_slice(b"null");
2152            } else {
2153                write_value(&mut buf, col.as_ref(), row);
2154            }
2155        }
2156        buf.push(b'}');
2157    }
2158
2159    buf.push(b']');
2160    Ok(unsafe { String::from_utf8_unchecked(buf) })
2161}
2162
2163#[inline]
2164fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
2165    match col.data_type() {
2166        DataType::Utf8 => write_str(
2167            buf,
2168            col.as_any()
2169                .downcast_ref::<StringArray>()
2170                .unwrap()
2171                .value(row),
2172        ),
2173        DataType::LargeUtf8 => write_str(
2174            buf,
2175            col.as_any()
2176                .downcast_ref::<LargeStringArray>()
2177                .unwrap()
2178                .value(row),
2179        ),
2180        DataType::Utf8View => write_str(
2181            buf,
2182            col.as_any()
2183                .downcast_ref::<StringViewArray>()
2184                .unwrap()
2185                .value(row),
2186        ),
2187        DataType::Dictionary(key, value)
2188            if matches!(key.as_ref(), DataType::Int32)
2189                && matches!(value.as_ref(), DataType::Utf8) =>
2190        {
2191            let dict = col
2192                .as_any()
2193                .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2194                .unwrap();
2195            let keys = dict.keys();
2196            let values = dict
2197                .values()
2198                .as_any()
2199                .downcast_ref::<StringArray>()
2200                .unwrap();
2201            let k = keys.value(row) as usize;
2202            write_str(buf, values.value(k));
2203        }
2204        DataType::Boolean => {
2205            let v = col
2206                .as_any()
2207                .downcast_ref::<BooleanArray>()
2208                .unwrap()
2209                .value(row);
2210            buf.extend_from_slice(if v { b"true" } else { b"false" });
2211        }
2212        DataType::Int8 => {
2213            let mut b = itoa::Buffer::new();
2214            buf.extend_from_slice(
2215                b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
2216                    .as_bytes(),
2217            );
2218        }
2219        DataType::Int16 => {
2220            let mut b = itoa::Buffer::new();
2221            buf.extend_from_slice(
2222                b.format(
2223                    col.as_any()
2224                        .downcast_ref::<Int16Array>()
2225                        .unwrap()
2226                        .value(row),
2227                )
2228                .as_bytes(),
2229            );
2230        }
2231        DataType::Int32 => {
2232            let mut b = itoa::Buffer::new();
2233            buf.extend_from_slice(
2234                b.format(
2235                    col.as_any()
2236                        .downcast_ref::<Int32Array>()
2237                        .unwrap()
2238                        .value(row),
2239                )
2240                .as_bytes(),
2241            );
2242        }
2243        DataType::Int64 => {
2244            let mut b = itoa::Buffer::new();
2245            buf.extend_from_slice(
2246                b.format(
2247                    col.as_any()
2248                        .downcast_ref::<Int64Array>()
2249                        .unwrap()
2250                        .value(row),
2251                )
2252                .as_bytes(),
2253            );
2254        }
2255        DataType::UInt8 => {
2256            let mut b = itoa::Buffer::new();
2257            buf.extend_from_slice(
2258                b.format(
2259                    col.as_any()
2260                        .downcast_ref::<UInt8Array>()
2261                        .unwrap()
2262                        .value(row),
2263                )
2264                .as_bytes(),
2265            );
2266        }
2267        DataType::UInt16 => {
2268            let mut b = itoa::Buffer::new();
2269            buf.extend_from_slice(
2270                b.format(
2271                    col.as_any()
2272                        .downcast_ref::<UInt16Array>()
2273                        .unwrap()
2274                        .value(row),
2275                )
2276                .as_bytes(),
2277            );
2278        }
2279        DataType::UInt32 => {
2280            let mut b = itoa::Buffer::new();
2281            buf.extend_from_slice(
2282                b.format(
2283                    col.as_any()
2284                        .downcast_ref::<UInt32Array>()
2285                        .unwrap()
2286                        .value(row),
2287                )
2288                .as_bytes(),
2289            );
2290        }
2291        DataType::UInt64 => {
2292            let mut b = itoa::Buffer::new();
2293            buf.extend_from_slice(
2294                b.format(
2295                    col.as_any()
2296                        .downcast_ref::<UInt64Array>()
2297                        .unwrap()
2298                        .value(row),
2299                )
2300                .as_bytes(),
2301            );
2302        }
2303        DataType::Decimal128(_, _) => {
2304            let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
2305            write_str(buf, &arr.value_as_string(row));
2306        }
2307        DataType::Decimal256(_, _) => {
2308            let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
2309            write_str(buf, &arr.value_as_string(row));
2310        }
2311        DataType::Float32 => {
2312            let v = col
2313                .as_any()
2314                .downcast_ref::<Float32Array>()
2315                .unwrap()
2316                .value(row);
2317            if v.is_finite() {
2318                let mut b = ryu::Buffer::new();
2319                buf.extend_from_slice(b.format_finite(v).as_bytes());
2320            } else {
2321                buf.extend_from_slice(b"null");
2322            }
2323        }
2324        DataType::Float64 => {
2325            let v = col
2326                .as_any()
2327                .downcast_ref::<Float64Array>()
2328                .unwrap()
2329                .value(row);
2330            if v.is_finite() {
2331                let mut b = ryu::Buffer::new();
2332                buf.extend_from_slice(b.format_finite(v).as_bytes());
2333            } else {
2334                buf.extend_from_slice(b"null");
2335            }
2336        }
2337        // Any dtype not handled above must have been pre-cast to Utf8 by
2338        // `cast_for_serialize`. Hitting this arm is a bug — surface it as a
2339        // visible JSON string rather than a silent null so it can't be
2340        // mistaken for a real NULL value.
2341        other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
2342    }
2343}
2344
2345#[inline]
2346fn write_str(buf: &mut Vec<u8>, s: &str) {
2347    buf.push(b'"');
2348    for &byte in s.as_bytes() {
2349        match byte {
2350            b'"' => buf.extend_from_slice(b"\\\""),
2351            b'\\' => buf.extend_from_slice(b"\\\\"),
2352            b'\n' => buf.extend_from_slice(b"\\n"),
2353            b'\r' => buf.extend_from_slice(b"\\r"),
2354            b'\t' => buf.extend_from_slice(b"\\t"),
2355            0x00..=0x1f => {
2356                buf.extend_from_slice(b"\\u00");
2357                const HEX: &[u8] = b"0123456789abcdef";
2358                buf.push(HEX[(byte >> 4) as usize]);
2359                buf.push(HEX[(byte & 0xf) as usize]);
2360            }
2361            b => buf.push(b),
2362        }
2363    }
2364    buf.push(b'"');
2365}
2366
2367// ---------------------------------------------------------------------------
2368// Backend trait impl — wires the store into the generic core handlers.
2369// ---------------------------------------------------------------------------
2370
2371#[async_trait]
2372impl Backend for Store {
2373    fn names(&self) -> Vec<String> {
2374        Store::names(self)
2375    }
2376
2377    fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
2378        let st = self.dataset(name)?;
2379        Ok(DatasetSummary {
2380            name: st.schema.name.clone(),
2381            columns: st.schema.columns.len(),
2382            rows: st.num_rows(),
2383        })
2384    }
2385
2386    fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
2387        let st = self.dataset(name)?;
2388        Ok(Arc::new(st.schema.clone()))
2389    }
2390
2391    fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
2392        let st = self.dataset(name)?;
2393        // Report indexed columns in the dataset's declared schema order
2394        // so the `/schema` response is deterministic.
2395        let mut cols: Vec<String> = st
2396            .schema
2397            .columns
2398            .iter()
2399            .map(|c| c.name.clone())
2400            .filter(|n| st.index.contains_key(n))
2401            .collect();
2402        // Any indexed columns not in `schema.columns` (shouldn't happen,
2403        // but be defensive) get appended sorted.
2404        let mut extras: Vec<String> = st
2405            .index
2406            .keys()
2407            .filter(|n| !cols.iter().any(|c| c == *n))
2408            .cloned()
2409            .collect();
2410        extras.sort();
2411        cols.extend(extras);
2412        Ok(cols)
2413    }
2414
2415    async fn sample(&self, name: &str) -> Result<String, AppError> {
2416        Store::sample(self, name).await
2417    }
2418
2419    async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
2420        Store::query(self, name, req).await
2421    }
2422
2423    async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
2424        Store::query_arrow(self, name, req).await
2425    }
2426
2427    async fn query_arrow_stream(
2428        &self,
2429        name: &str,
2430        req: &QueryRequest,
2431    ) -> Result<ArrowIpcStream, AppError> {
2432        Store::query_arrow_stream(self, name, req).await
2433    }
2434
2435    async fn query_arrow_stream_all(
2436        &self,
2437        name: &str,
2438        req: &QueryRequest,
2439    ) -> Result<ArrowIpcStream, AppError> {
2440        Store::query_arrow_stream_all(self, name, req).await
2441    }
2442
2443    async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
2444        Store::count(self, name, req).await
2445    }
2446
2447    async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
2448        Store::parquet(self, name).await
2449    }
2450
2451    async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
2452        Store::reload(self, name).await
2453    }
2454}