Skip to main content

datapress_datafusion/
store.rs

1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5
6use arc_swap::ArcSwap;
7use arrow::array::{
8    Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
9    Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
10    StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
11};
12use arrow::compute;
13use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
14use arrow::datatypes::{DataType, Field, Schema};
15use async_trait::async_trait;
16use parquet::arrow::ProjectionMask;
17use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
18use serde_json::Value as JsonValue;
19
20use datafusion::datasource::file_format::parquet::ParquetFormat;
21use datafusion::datasource::listing::{
22    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
23};
24use datafusion::datasource::{MemTable, TableProvider};
25use datafusion::prelude::SessionContext;
26use datafusion::scalar::ScalarValue;
27
28use object_store::aws::AmazonS3Builder;
29use url::Url;
30
31use datapress_core::backend::{
32    ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
33};
34use datapress_core::config::{
35    AddressingStyle, AppConfig, DatasetConfig, IndexConfig, IndexMode, Partitioning, ResolvedCreds,
36    S3Config, SourceKind,
37};
38use datapress_core::errors::AppError;
39use datapress_core::models::{CountRequest, Predicate, QueryRequest};
40use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
41
42// ---------------------------------------------------------------------------
43// Public types
44// ---------------------------------------------------------------------------
45
46/// Hasher-swapped map alias. The default `std::collections::HashMap` uses
47/// SipHash (DoS-resistant but slow). The equality index keys are our own
48/// column values, not untrusted hash-flood input, so we use `ahash` — a much
49/// faster non-cryptographic hasher — on this per-request hot path.
50type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
51
52/// Pre-built equality index: lowercase col name → string-encoded value → sorted row ids.
53type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
54
55/// Per-dataset state: schema metadata, the resident chunks, and the
56/// equality index built per the dataset's `[dataset.index]` policy.
57///
58/// `data` is the dataset as a `Vec<RecordBatch>` — exactly the chunks
59/// produced by the underlying reader, after temporal columns are cast to
60/// `Utf8`. We deliberately do **not** call `concat_batches` to fuse them
61/// into one batch: on wide schemas (hundreds of columns) that transiently
62/// allocates a second full copy of the decoded Arrow data, pushing peak
63/// RSS to ~2× the resident size and OOM-killing the process at startup.
64///
65/// When `lazy` is true the dataset is *not* materialised: `data` is empty,
66/// `index` is empty, and every query is dispatched to DataFusion SQL
67/// against a registered `ListingTable`. `arrow_schema` still carries the
68/// inferred schema so discovery endpoints work.
69pub struct DatasetState {
70    pub schema: DatasetSchema,
71    pub data: Vec<RecordBatch>,
72    pub arrow_schema: Arc<Schema>,
73    pub index: EqIndex,
74    pub lazy: bool,
75}
76
77impl DatasetState {
78    /// Sum of `num_rows()` across all resident chunks. `0` for lazy datasets.
79    pub fn num_rows(&self) -> usize {
80        self.data.iter().map(|b| b.num_rows()).sum()
81    }
82}
83
84/// Multi-dataset registry. Each dataset is registered in the shared
85/// `SessionContext` under its configured name. The per-dataset state is
86/// held behind `ArcSwap` so a reload can atomically replace it without
87/// blocking concurrent queries.
88pub struct Store {
89    ctx: SessionContext,
90    max_page_size: u64,
91    /// Original dataset configs, indexed by name. Reload reads the source
92    /// path from here — clients can't redirect a reload at an arbitrary file.
93    configs: HashMap<String, DatasetConfig>,
94    /// Hot-swappable snapshot of all currently loaded datasets.
95    datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
96    /// Per-name reload mutex. Serialises concurrent reloads of the same
97    /// dataset; reloads of different datasets proceed in parallel.
98    reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
99}
100
101impl Store {
102    /// Load every dataset declared in `cfg`.
103    pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
104        // One-shot init for the deltalake S3 backend. Safe to call more
105        // than once — the handlers are idempotent.
106        if cfg
107            .datasets
108            .iter()
109            .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
110        {
111            deltalake::aws::register_handlers(None);
112        }
113
114        // NOTE: identifier handling for the raw-SQL endpoint is done by
115        // rewriting schema column/table references to quoted canonical
116        // names before execution (see `query_sql`). DataFusion's default
117        // identifier normalization is left ON so unquoted *aliases* and
118        // CTE names stay case-insensitive, matching DuckDB.
119        let ctx = SessionContext::new();
120        let mut datasets = HashMap::with_capacity(cfg.datasets.len());
121        let mut configs = HashMap::with_capacity(cfg.datasets.len());
122
123        for d in &cfg.datasets {
124            log::info!(
125                "Loading dataset '{}' ({} @ {})",
126                d.name,
127                d.source.kind.as_str(),
128                d.source.location
129            );
130            let (state, provider) = build_dataset(d, &ctx).await?;
131            ctx.register_table(d.name.as_str(), provider)?;
132            datasets.insert(d.name.clone(), Arc::new(state));
133            configs.insert(d.name.clone(), d.clone());
134        }
135        Ok(Self {
136            ctx,
137            max_page_size: cfg.server.max_page_size.max(1),
138            configs,
139            datasets: ArcSwap::from_pointee(datasets),
140            reload_locks: Mutex::new(HashMap::new()),
141        })
142    }
143
144    /// Sorted list of dataset names.
145    pub fn names(&self) -> Vec<String> {
146        let snap = self.datasets.load();
147        let mut v: Vec<String> = snap.keys().cloned().collect();
148        v.sort();
149        v
150    }
151
152    pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
153        self.datasets
154            .load()
155            .get(name)
156            .cloned()
157            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
158    }
159
160    /// JSON for the first row of the dataset, or `null` if empty. Used by
161    /// `GET /api/datasets/{name}/schema` for discoverability.
162    pub async fn sample(&self, name: &str) -> Result<String, AppError> {
163        let st = self.dataset(name)?;
164
165        // Lazy datasets have no resident batch — pull one row via SQL.
166        if st.lazy {
167            let table = DatasetSchema::quote_ident(&st.schema.name);
168            let sql = format!("SELECT * FROM {table} LIMIT 1");
169            let df = self.ctx.sql(&sql).await?;
170            let batches = df.collect().await?;
171            if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
172                return Ok("null".into());
173            }
174            let arr = serialize(&batches[0].slice(0, 1))?;
175            let trimmed = arr.trim();
176            let inner = trimmed
177                .strip_prefix('[')
178                .and_then(|s| s.strip_suffix(']'))
179                .unwrap_or(trimmed);
180            return Ok(inner.to_string());
181        }
182
183        let first = match st.data.iter().find(|b| b.num_rows() > 0) {
184            Some(b) => b,
185            None => return Ok("null".into()),
186        };
187        let arr = serialize(&first.slice(0, 1))?;
188        // strip the outer [] to return a single object
189        let trimmed = arr.trim();
190        let inner = trimmed
191            .strip_prefix('[')
192            .and_then(|s| s.strip_suffix(']'))
193            .unwrap_or(trimmed);
194        Ok(inner.to_string())
195    }
196
197    /// Rebuild `name` from disk and atomically swap it in. Concurrent queries
198    /// against the same name continue to see the *old* `Arc<DatasetState>`
199    /// until they finish; the old data is dropped once the last reference
200    /// goes away.
201    pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
202        // 1. Look up the dataset config. Not finding it = 404.
203        let cfg = self
204            .configs
205            .get(name)
206            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
207            .clone();
208
209        // 2. Per-name lock: only one reload of this dataset at a time.
210        let lock = {
211            let mut locks = self.reload_locks.lock().unwrap();
212            locks
213                .entry(name.to_string())
214                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
215                .clone()
216        };
217        let _guard = lock.lock().await;
218
219        let started = std::time::Instant::now();
220
221        // 3. Heavy lifting (source read + index build). Parquet/delta
222        // readers are themselves async, so we don't wrap in `web::block`.
223        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
224        let rows = state.num_rows();
225
226        // 4. Atomic swap.
227        //   a) Replace the MemTable inside the SessionContext.
228        //   b) ArcSwap a new snapshot map with the updated Arc<DatasetState>.
229        // In-flight queries already hold the old provider + old Arc; they
230        // run to completion. New queries see the new data.
231        let _ = self.ctx.deregister_table(name)?;
232        self.ctx.register_table(name, provider)?;
233
234        let mut new_map = (**self.datasets.load()).clone();
235        new_map.insert(name.to_string(), Arc::new(state));
236        self.datasets.store(Arc::new(new_map));
237
238        let elapsed_ms = started.elapsed().as_millis();
239        log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
240        Ok(ReloadStats { rows, elapsed_ms })
241    }
242
243    /// Run a `QueryRequest` against `name`. Empty predicates → O(1) Arrow
244    /// slice. Otherwise → DataFusion SQL on the single registered table.
245    /// Lazy datasets skip the in-memory hot paths and always dispatch to SQL.
246    pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
247        let batch = self.query_batch(name, req).await?;
248        if batch.num_rows() == 0 {
249            return Ok("[]".to_string());
250        }
251        serialize(&batch)
252    }
253
254    /// Rewrite registered table / column references in a raw-SQL string to
255    /// their canonical, quoted spelling so matching is case-insensitive
256    /// (like DuckDB). Builds the lookup from every currently loaded
257    /// dataset; unknown identifiers (aliases, CTE names) are left for the
258    /// engine's default normalization to handle.
259    fn canonicalize_sql(&self, sql: &str) -> String {
260        let snap = self.datasets.load();
261        let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
262        let mut columns: HashMap<String, String> = HashMap::new();
263        for (name, state) in snap.iter() {
264            tables.insert(name.to_lowercase(), name.clone());
265            for col in &state.schema.columns {
266                columns
267                    .entry(col.name.to_lowercase())
268                    .or_insert_with(|| col.name.clone());
269            }
270        }
271        datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
272    }
273
274    /// Execute a pre-validated raw `SELECT` and return the JSON `data`
275    /// array. The statement has already passed
276    /// [`datapress_core::sql::validate`]; here it is wrapped in an outer
277    /// `LIMIT max_rows` so the result is bounded regardless of the user's
278    /// own clauses, executed through the shared `SessionContext`, and run
279    /// through the same fast JSON encoder as [`Self::query`].
280    pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
281        let cap = max_rows.max(1);
282        let sql = self.canonicalize_sql(sql);
283        let wrapped = format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}");
284        let df = self.ctx.sql(&wrapped).await?;
285        let batches = df.collect().await?;
286        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
287            return Ok("[]".to_string());
288        }
289        let batch = if batches.len() == 1 {
290            batches.into_iter().next().expect("checked len")
291        } else {
292            compute::concat_batches(&batches[0].schema(), batches.iter())?
293        };
294        // Defence in depth: the outer LIMIT already bounds the row count,
295        // but slice anyway so a planning quirk can never blow the cap.
296        let batch = if batch.num_rows() as u64 > cap {
297            batch.slice(0, cap as usize)
298        } else {
299            batch
300        };
301        serialize(&batch)
302    }
303
304    /// Same plan as [`Self::query_sql`], but encode the bounded result as
305    /// an Arrow IPC stream instead of JSON. Backs the Arrow
306    /// content-negotiated branch of `POST /api/v1/sql`.
307    pub async fn query_sql_arrow_stream(
308        &self,
309        sql: &str,
310        max_rows: u64,
311    ) -> Result<ArrowIpcStream, AppError> {
312        let cap = max_rows.max(1);
313        let sql = self.canonicalize_sql(sql);
314        let wrapped = format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}");
315        let df = self.ctx.sql(&wrapped).await?;
316        let batches = df.collect().await?;
317        Ok(stream_arrow_batches(batches))
318    }
319
320    /// Same plan as [`Self::query`], but encode the result page as an
321    /// Arrow IPC stream (one schema message + one batch + EOS). Empty
322    /// results still produce a valid, self-describing zero-batch stream.
323    pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
324        let batch = self.query_batch(name, req).await?;
325        let schema = batch.schema();
326        let mut buf = Vec::with_capacity(8 * 1024);
327        {
328            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
329            if batch.num_rows() > 0 {
330                w.write(&batch)?;
331            }
332            w.finish()?;
333        }
334        Ok(buf)
335    }
336
337    pub async fn query_arrow_stream(
338        &self,
339        name: &str,
340        req: &QueryRequest,
341    ) -> Result<ArrowIpcStream, AppError> {
342        let batches = self.query_batches(name, req).await?;
343        Ok(stream_arrow_batches(batches))
344    }    pub async fn query_arrow_stream_all(
345        &self,
346        name: &str,
347        req: &QueryRequest,
348    ) -> Result<ArrowIpcStream, AppError> {
349        let batches = self.query_batches_all(name, req).await?;
350        Ok(stream_arrow_batches(batches))
351    }
352
353    /// Encode the entire dataset as a single self-contained Parquet file.
354    ///
355    /// Collects every row (all columns, no predicates, no paging) and runs
356    /// it through a single [`parquet::arrow::ArrowWriter`], so the result
357    /// carries the row-group + footer metadata a Parquet reader needs to
358    /// answer `count(*)` straight from the footer. Powers the cached
359    /// `GET /datasets/{name}/parquet` HTTP endpoint.
360    pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
361        // All rows, all columns, no predicates / ordering / limit.
362        let req = QueryRequest {
363            columns: Vec::new(),
364            predicates: Vec::new(),
365            group_by: Vec::new(),
366            aggregations: Vec::new(),
367            distinct: false,
368            order_by: Vec::new(),
369            limit: None,
370            page: 1,
371            page_size: 1,
372        };
373        let st = self.dataset(name)?;
374        let batches = self.query_batches_all(name, &req).await?;
375        // Use the actual batch schema when we have rows so the writer schema
376        // matches exactly (projection/nullability); fall back to the
377        // dataset schema for an empty dataset.
378        let schema = batches
379            .first()
380            .map(|b| b.schema())
381            .unwrap_or_else(|| st.arrow_schema.clone());
382
383        let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
384        {
385            let props = parquet::file::properties::WriterProperties::builder()
386                .set_compression(parquet::basic::Compression::SNAPPY)
387                .build();
388            let mut writer =
389                parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
390                    .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
391            for batch in &batches {
392                if batch.num_rows() > 0 {
393                    writer
394                        .write(batch)
395                        .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
396                }
397            }
398            writer
399                .close()
400                .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
401        }
402        Ok(bytes::Bytes::from(buf))
403    }
404
405    /// Compute the result page as a single `RecordBatch`. Shared between
406    /// the JSON and Arrow IPC encoders.
407    async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
408        let batches = self.query_batches(name, req).await?;
409        if batches.is_empty() {
410            return Ok(RecordBatch::new_empty(Arc::new(
411                arrow::datatypes::Schema::empty(),
412            )));
413        }
414        if batches.len() == 1 {
415            return Ok(batches.into_iter().next().expect("checked len"));
416        }
417        if batches.iter().all(|b| b.num_rows() == 0) {
418            return Ok(RecordBatch::new_empty(batches[0].schema()));
419        }
420        let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
421        Ok(batch)
422    }
423
424    /// Compute the result page as Arrow batches. Arrow IPC responses can
425    /// write these directly, while JSON callers concatenate via
426    /// [`Self::query_batch`] for the existing row conversion path.
427    async fn query_batches(
428        &self,
429        name: &str,
430        req: &QueryRequest,
431    ) -> Result<Vec<RecordBatch>, AppError> {
432        let st = self.dataset(name)?;
433
434        let page = req.page.max(1);
435        let page_size = req.page_size.clamp(1, self.max_page_size);
436        let offset = ((page - 1) * page_size) as usize;
437        let limit = page_size as usize;
438
439        self.query_batches_inner(st, req, Some((offset, limit)))
440            .await
441    }
442
443    /// Compute all matching rows as Arrow batches for the one-request
444    /// streaming endpoint. `page` and `page_size` are intentionally ignored;
445    /// optional `limit` still caps the total result size.
446    async fn query_batches_all(
447        &self,
448        name: &str,
449        req: &QueryRequest,
450    ) -> Result<Vec<RecordBatch>, AppError> {
451        let st = self.dataset(name)?;
452        self.query_batches_inner(st, req, None).await
453    }
454
455    async fn query_batches_inner(
456        &self,
457        st: Arc<DatasetState>,
458        req: &QueryRequest,
459        page_window: Option<(usize, usize)>,
460    ) -> Result<Vec<RecordBatch>, AppError> {
461        let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
462
463        // In-memory hot paths only fire when:
464        //   - the dataset is materialised,
465        //   - the caller did not ask for ordering,
466        //   - and did not ask for a hard `limit` cap on a paged request.
467        // Both of the latter two require sorting / capping that the SQL
468        // engine handles uniformly across all data types.
469        let can_fast_path = !st.lazy
470            && req.order_by.is_empty()
471            && (page_window.is_none() || req.limit.is_none())
472            && req.group_by.is_empty()
473            && !req.distinct;
474
475        if can_fast_path {
476            let total = st.num_rows();
477
478            // No predicates -> O(1) raw Arrow slices over resident batches,
479            // no engine overhead.
480            if req.predicates.is_empty() {
481                if page_window.is_none() && req.limit.is_none() {
482                    return st
483                        .data
484                        .iter()
485                        .cloned()
486                        .map(|batch| project(&st.schema, batch, &req.columns))
487                        .collect();
488                }
489                let start = offset.min(total);
490                let len = limit.min(total - start);
491                let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
492                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
493            }
494
495            // Index fast path: if every predicate is eq/in on an indexed column,
496            // resolve via the pre-built equality index.
497            if let Some(rows) = try_index(&st.index, &req.predicates) {
498                let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
499                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
500            }
501        }
502
503        // Fallback (and only path for lazy datasets): DataFusion SQL.
504        let (sql, params) = match page_window {
505            Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
506            None => build_query_stream_sql(&st.schema, req)?,
507        };
508        let mut df = self.ctx.sql(&sql).await?;
509        if !params.is_empty() {
510            df = df.with_param_values(params)?;
511        }
512        let batches = df.collect().await?;
513        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
514            let schema = batches
515                .first()
516                .map(|b| b.schema())
517                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
518            return Ok(vec![RecordBatch::new_empty(schema)]);
519        }
520        Ok(batches)
521    }
522}
523
524fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
525    let schema = batches
526        .first()
527        .map(|batch| batch.schema())
528        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
529    let (mut writer, stream) = arrow_ipc_stream_channel(8);
530
531    tokio::task::spawn_blocking(move || {
532        let result = (|| -> Result<(), AppError> {
533            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
534            for batch in batches {
535                if batch.num_rows() > 0 {
536                    w.write(&batch)?;
537                }
538            }
539            w.finish()?;
540            Ok(())
541        })();
542        if let Err(err) = result {
543            log::error!("datafusion arrow stream failed: {err}");
544            writer.send_error(err);
545        }
546    });
547
548    stream
549}
550
551impl Store {
552    /// Return the number of rows matching `req.predicates`. With no
553    /// predicates this is a cheap metadata lookup on materialised datasets
554    /// and a `SELECT COUNT(*)` on lazy ones.
555    pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
556        let st = self.dataset(name)?;
557
558        if !st.lazy {
559            // No predicates → resident row count, no scan.
560            if req.predicates.is_empty() {
561                return Ok(st.num_rows() as i64);
562            }
563            // Index fast path: same eligibility rules as `query`.
564            if let Some(rows) = try_index(&st.index, &req.predicates) {
565                return Ok(rows.len() as i64);
566            }
567        }
568
569        // Fallback: DataFusion SQL — same predicate translation as `query`,
570        // with predicate values bound as typed parameters.
571        let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
572        let mut df = self.ctx.sql(&sql).await?;
573        if !params.is_empty() {
574            df = df.with_param_values(params)?;
575        }
576        let batches = df.collect().await?;
577        let n = batches
578            .first()
579            .and_then(|b| {
580                b.column(0)
581                    .as_any()
582                    .downcast_ref::<arrow::array::Int64Array>()
583            })
584            .filter(|a| !a.is_empty())
585            .map(|a| a.value(0))
586            .unwrap_or(0);
587        Ok(n)
588    }
589}
590
591// ---------------------------------------------------------------------------
592// Dataset loading
593// ---------------------------------------------------------------------------
594
595async fn build_dataset(
596    d: &DatasetConfig,
597    ctx: &SessionContext,
598) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
599    // Lazy datasets: register a ListingTable straight against the source
600    // and skip the materialise / index / partition pipeline below. Delta
601    // is rejected — deltalake reads the transaction log eagerly to know
602    // which parquet files are current, so "lazy delta" doesn't map onto
603    // ListingTable cleanly.
604    if d.lazy {
605        match (d.source.kind, d.source.is_s3()) {
606            (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
607            (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
608            (SourceKind::Delta, _) => {
609                return Err(AppError::Internal(format!(
610                    "dataset '{}': lazy mode is not supported for delta sources",
611                    d.name
612                )));
613            }
614        }
615    }
616
617    // Fetch raw RecordBatches from whichever backing store the dataset
618    // is configured to use. All four (parquet, delta) x (local, s3)
619    // combinations converge into one Vec<RecordBatch>; the materialisation
620    // / indexing / partitioning logic below is shared.
621    let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
622        (SourceKind::Parquet, false) => read_local_parquet(d)?,
623        (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
624        (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
625        (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
626    };
627    if raw_batches.is_empty() {
628        return Err(AppError::Internal(format!(
629            "dataset '{}': source produced no batches",
630            d.name
631        )));
632    }
633
634    let chunks = raw_batches;
635    let arrow_sch = chunks[0].schema();
636
637    // Build DatasetSchema from the Arrow schema.
638    let columns: Vec<ColumnInfo> = arrow_sch
639        .fields()
640        .iter()
641        .map(|f| {
642            let dt = f.data_type();
643            ColumnInfo {
644                name: f.name().clone(),
645                logical: arrow_to_logical(dt),
646                sql_type: format!("{dt:?}"),
647                nullable: f.is_nullable(),
648            }
649        })
650        .collect();
651    let schema = DatasetSchema::new(&d.name, columns);
652
653    // Build the equality index per the per-dataset policy. Operates on the
654    // chunked representation directly so we never have to materialise a
655    // single concatenated batch (which would double peak RSS on wide
656    // schemas — see `DatasetState` docs).
657    let index = build_eq_index_with_policy(&chunks, &d.index);
658
659    // Partition for parallel scans by the SQL fallback path. We distribute
660    // the existing batches round-robin across `n_parts` partitions instead
661    // of re-slicing a concatenated batch — `clone()` on a RecordBatch is
662    // an Arc-clone of the column buffers, not a copy.
663    let n_parts = std::thread::available_parallelism()
664        .map(|n| n.get())
665        .unwrap_or(4);
666    let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
667    for (i, b) in chunks.iter().enumerate() {
668        if b.num_rows() == 0 {
669            continue;
670        }
671        parts[i % n_parts].push(b.clone());
672    }
673    parts.retain(|p| !p.is_empty());
674    let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
675
676    let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
677    let mem_mb: usize = chunks
678        .iter()
679        .flat_map(|b| b.columns().iter())
680        .map(|c| c.get_buffer_memory_size())
681        .sum::<usize>()
682        / 1_048_576;
683    log::info!(
684        "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
685        d.name,
686        d.source.kind.as_str(),
687        total_rows,
688        schema.columns.len(),
689        mem_mb,
690        chunks.len(),
691        index.len()
692    );
693
694    Ok((
695        DatasetState {
696            schema,
697            data: chunks,
698            arrow_schema: arrow_sch,
699            index,
700            lazy: false,
701        },
702        provider,
703    ))
704}
705
706/// Build a lazy state + `ListingTable` provider for a local parquet dataset.
707/// The dataset is never read into RAM; DataFusion streams row groups on
708/// each query. The returned `DatasetState.data` is an empty `Vec` —
709/// `arrow_schema` still carries the inferred Arrow schema for discovery.
710async fn build_lazy_local_parquet(
711    d: &DatasetConfig,
712    ctx: &SessionContext,
713) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
714    let (url, part_keys) = lazy_local_listing(d)?;
715
716    let mut opts =
717        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
718    if !part_keys.is_empty() {
719        opts = opts.with_table_partition_cols(
720            part_keys
721                .iter()
722                .map(|k| (k.clone(), DataType::Utf8))
723                .collect(),
724        );
725    }
726
727    let session_state = ctx.state();
728    // `infer_schema` returns the *file* schema (without partition columns);
729    // `ListingTable` appends the declared partition columns on top.
730    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
731        AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
732    })?;
733
734    let cfg = ListingTableConfig::new(url)
735        .with_listing_options(opts)
736        .with_schema(file_schema.clone());
737    let table = ListingTable::try_new(cfg).map_err(|e| {
738        AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
739    })?;
740    let provider: Arc<dyn TableProvider> = Arc::new(table);
741
742    // Discovery schema = file columns + partition columns (Utf8).
743    let mut fields: Vec<Field> = file_schema
744        .fields()
745        .iter()
746        .map(|f| f.as_ref().clone())
747        .collect();
748    for k in &part_keys {
749        if !fields.iter().any(|f| f.name() == k) {
750            fields.push(Field::new(k, DataType::Utf8, false));
751        }
752    }
753    let arrow_sch = Arc::new(Schema::new(fields));
754
755    let columns: Vec<ColumnInfo> = arrow_sch
756        .fields()
757        .iter()
758        .map(|f| {
759            let dt = f.data_type();
760            ColumnInfo {
761                name: f.name().clone(),
762                logical: arrow_to_logical(dt),
763                sql_type: format!("{dt:?}"),
764                nullable: f.is_nullable(),
765            }
766        })
767        .collect();
768    let schema = DatasetSchema::new(&d.name, columns);
769
770    log::info!(
771        "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
772        d.name,
773        d.source.kind.as_str(),
774        schema.columns.len(),
775        part_keys.len()
776    );
777
778    Ok((
779        DatasetState {
780            schema,
781            data: Vec::new(),
782            arrow_schema: arrow_sch,
783            index: EqIndex::default(),
784            lazy: true,
785        },
786        provider,
787    ))
788}
789
790/// Resolve a local lazy-parquet location into a single `ListingTableUrl`
791/// rooted at the dataset base plus the ordered hive partition keys (if any).
792/// Handles three shapes: a glob (`root/city=*/*.parquet`), a directory
793/// (hive root or flat folder of parquets), and a single `*.parquet` file.
794fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
795    let loc = &d.source.location;
796
797    if loc.contains('*') || loc.contains('?') || loc.contains('[') {
798        let parts: Vec<&str> = loc.split('/').collect();
799        let first_wild = parts
800            .iter()
801            .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
802            .unwrap_or(parts.len());
803        let base = parts[..first_wild].join("/");
804        let base = if base.is_empty() {
805            "/".to_string()
806        } else {
807            base
808        };
809        // Partition keys: `key=…` components between the base and the file
810        // pattern (the final component).
811        let upper = parts.len().saturating_sub(1);
812        let keys: Vec<String> = parts[first_wild.min(upper)..upper]
813            .iter()
814            .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
815            .filter(|k| !k.is_empty())
816            .collect();
817        return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
818    }
819
820    let path = std::path::Path::new(loc);
821    if path.is_dir() {
822        let keys = discover_hive_keys(path);
823        return Ok((dir_url(path, d)?, keys));
824    }
825
826    let url = ListingTableUrl::parse(loc)
827        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
828    Ok((url, Vec::new()))
829}
830
831/// Parse a directory path into a `ListingTableUrl` (trailing slash so
832/// DataFusion treats it as a directory root, not a single object).
833fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
834    let s = path.to_str().ok_or_else(|| {
835        AppError::Internal(format!(
836            "dataset '{}': non-utf8 path {}",
837            d.name,
838            path.display()
839        ))
840    })?;
841    let s = if s.ends_with('/') {
842        s.to_string()
843    } else {
844        format!("{s}/")
845    };
846    ListingTableUrl::parse(&s)
847        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
848}
849
850/// Walk down a directory following the first `key=value` subdirectory at
851/// each level to discover the ordered hive partition keys. Returns an empty
852/// vec for a flat (non-partitioned) folder.
853fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
854    let mut keys = Vec::new();
855    let mut cur = base.to_path_buf();
856    loop {
857        let Ok(rd) = std::fs::read_dir(&cur) else {
858            break;
859        };
860        let mut next: Option<(String, std::path::PathBuf)> = None;
861        for entry in rd.flatten() {
862            let p = entry.path();
863            if !p.is_dir() {
864                continue;
865            }
866            let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
867                continue;
868            };
869            if let Some((k, v)) = name.split_once('=')
870                && !k.is_empty()
871                && !v.is_empty()
872            {
873                next = Some((k.to_string(), p));
874                break;
875            }
876        }
877        match next {
878            Some((k, p)) => {
879                keys.push(k);
880                cur = p;
881            }
882            None => break,
883        }
884    }
885    keys
886}
887
888/// Lazy S3 parquet: register the dataset's S3 object store on `ctx`, then
889/// build a `ListingTable` rooted at the `s3://bucket/prefix/` location with
890/// any discovered hive partition columns. DataFusion does the directory
891/// listing through the registered store and streams row groups on each
892/// query — no local enumeration needed.
893async fn build_lazy_s3_parquet(
894    d: &DatasetConfig,
895    ctx: &SessionContext,
896) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
897    register_s3_object_store(d, ctx)?;
898
899    let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
900
901    // Discovery schema = file columns + partition columns (Utf8).
902    let mut fields: Vec<Field> = file_schema
903        .fields()
904        .iter()
905        .map(|f| f.as_ref().clone())
906        .collect();
907    for k in &part_keys {
908        if !fields.iter().any(|f| f.name() == k) {
909            fields.push(Field::new(k, DataType::Utf8, false));
910        }
911    }
912    let arrow_sch = Arc::new(Schema::new(fields));
913
914    let columns: Vec<ColumnInfo> = arrow_sch
915        .fields()
916        .iter()
917        .map(|f| {
918            let dt = f.data_type();
919            ColumnInfo {
920                name: f.name().clone(),
921                logical: arrow_to_logical(dt),
922                sql_type: format!("{dt:?}"),
923                nullable: f.is_nullable(),
924            }
925        })
926        .collect();
927    let schema = DatasetSchema::new(&d.name, columns);
928
929    log::info!(
930        "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
931        d.name,
932        d.source.kind.as_str(),
933        schema.columns.len(),
934        part_keys.len()
935    );
936
937    Ok((
938        DatasetState {
939            schema,
940            data: Vec::new(),
941            arrow_schema: arrow_sch,
942            index: EqIndex::default(),
943            lazy: true,
944        },
945        provider,
946    ))
947}
948
949/// Build a `ListingTable` provider for an S3 parquet source, resolving the
950/// base prefix and hive partition keys via [`s3_listing`]. The registered
951/// S3 object store must already be present on `ctx`. Returns the provider,
952/// the inferred *file* schema (without partition columns), and the ordered
953/// partition keys.
954async fn build_s3_listing_table(
955    d: &DatasetConfig,
956    ctx: &SessionContext,
957) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
958    let (url, part_keys) = s3_listing(d, ctx).await?;
959
960    let mut opts =
961        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
962    if !part_keys.is_empty() {
963        opts = opts.with_table_partition_cols(
964            part_keys
965                .iter()
966                .map(|k| (k.clone(), DataType::Utf8))
967                .collect(),
968        );
969    }
970
971    let session_state = ctx.state();
972    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
973        AppError::Internal(format!(
974            "dataset '{}': infer parquet schema on s3: {e}",
975            d.name
976        ))
977    })?;
978
979    let cfg = ListingTableConfig::new(url)
980        .with_listing_options(opts)
981        .with_schema(file_schema.clone());
982    let table = ListingTable::try_new(cfg).map_err(|e| {
983        AppError::Internal(format!(
984            "dataset '{}': ListingTable::try_new (s3): {e}",
985            d.name
986        ))
987    })?;
988    Ok((Arc::new(table), file_schema, part_keys))
989}
990
991/// Resolve an S3 parquet source into a `(base ListingTableUrl, partition
992/// keys)` pair. Hive keys come from the location glob when present
993/// (`s3://bucket/events/year=*/...`), otherwise — for a plain prefix — they
994/// are discovered by listing the registered object store. Honours the
995/// dataset's `partitioning` mode (`auto` / `hive` / `none`).
996async fn s3_listing(
997    d: &DatasetConfig,
998    ctx: &SessionContext,
999) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1000    let s3 = d.s3.clone().unwrap_or_default();
1001    let want_partitions = !matches!(s3.partitioning, Partitioning::None);
1002    let loc = &d.source.location;
1003
1004    if d.source.has_glob() {
1005        let (base, keys) = split_glob_base_keys(loc);
1006        let base = format!("{}/", base.trim_end_matches('/'));
1007        let url = ListingTableUrl::parse(&base).map_err(|e| {
1008            AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1009        })?;
1010        let keys = if want_partitions { keys } else { Vec::new() };
1011        return Ok((url, keys));
1012    }
1013
1014    let base = if loc.ends_with('/') {
1015        loc.clone()
1016    } else {
1017        format!("{loc}/")
1018    };
1019    let url = ListingTableUrl::parse(&base).map_err(|e| {
1020        AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1021    })?;
1022    let keys = if want_partitions {
1023        discover_s3_hive_keys(ctx, &url).await
1024    } else {
1025        Vec::new()
1026    };
1027    Ok((url, keys))
1028}
1029
1030/// Split a glob location into its non-wildcard base path and the ordered
1031/// hive partition keys (`key=…` directory components between the base and
1032/// the final file pattern). Works for both local and `s3://` paths.
1033fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1034    let parts: Vec<&str> = loc.split('/').collect();
1035    let first_wild = parts
1036        .iter()
1037        .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1038        .unwrap_or(parts.len());
1039    let base = parts[..first_wild].join("/");
1040    let base = if base.is_empty() {
1041        "/".to_string()
1042    } else {
1043        base
1044    };
1045    let upper = parts.len().saturating_sub(1);
1046    let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1047        .iter()
1048        .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1049        .filter(|k| !k.is_empty())
1050        .collect();
1051    (base, keys)
1052}
1053
1054/// Discover ordered hive partition keys for an S3 prefix by walking the
1055/// object store one `key=value/` level at a time via delimiter listings.
1056/// Best-effort: any listing error stops discovery and returns what was found
1057/// so far (empty = treat as a flat folder).
1058async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1059    let store = match ctx.runtime_env().object_store(url.object_store()) {
1060        Ok(s) => s,
1061        Err(_) => return Vec::new(),
1062    };
1063    let mut keys = Vec::new();
1064    let mut prefix = url.prefix().clone();
1065    loop {
1066        let listing = match store.list_with_delimiter(Some(&prefix)).await {
1067            Ok(l) => l,
1068            Err(_) => break,
1069        };
1070        let mut next: Option<object_store::path::Path> = None;
1071        for cp in &listing.common_prefixes {
1072            if let Some(seg) = cp.parts().next_back() {
1073                let seg = seg.as_ref().to_string();
1074                if let Some((k, v)) = seg.split_once('=')
1075                    && !k.is_empty()
1076                    && !v.is_empty()
1077                {
1078                    keys.push(k.to_string());
1079                    next = Some(cp.clone());
1080                    break;
1081                }
1082            }
1083        }
1084        match next {
1085            Some(p) => prefix = p,
1086            None => break,
1087        }
1088    }
1089    keys
1090}
1091
1092/// Original local-parquet code path — sync file I/O. We set a large reader
1093/// batch size so wide schemas (hundreds of columns) don't pay per-array
1094/// metadata overhead on thousands of small (default 1024-row) batches.
1095///
1096/// Two memory-saving knobs are applied here:
1097///
1098/// * **Column projection** — if `d.columns` is non-empty, only those
1099///   columns are decoded; everything else is skipped at the parquet reader
1100///   level (no Arrow array is ever allocated for the dropped columns).
1101/// * **Dictionary preservation** — Utf8 columns whose parquet column chunks
1102///   carry a dictionary page are materialised as Arrow
1103///   `Dictionary(Int32, Utf8)` instead of plain `Utf8`. Low-cardinality
1104///   string columns (state, country, severity, …) stay represented as
1105///   `n_unique` string slots plus an Int32 index per row instead of
1106///   `n_rows` independent strings — typically 10×–50× smaller for
1107///   real-world data.
1108fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1109    let files = d.resolve_local_parquet_files()?;
1110    let mut all = Vec::new();
1111    let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1112        None
1113    } else {
1114        Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1115    };
1116
1117    for f in &files {
1118        let file = std::fs::File::open(f)
1119            .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1120
1121        // First pass: peek the parquet metadata + default Arrow schema so we
1122        // can (a) decide a column projection and (b) override Utf8 columns
1123        // that are dictionary-encoded in the file so the reader materialises
1124        // them as Arrow Dictionary arrays instead of expanding to plain Utf8.
1125        let probe = ParquetRecordBatchReaderBuilder::try_new(
1126            file.try_clone()
1127                .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1128        )?;
1129        let parquet_schema = probe.parquet_schema().clone();
1130        let arrow_schema = probe.schema().clone();
1131        let metadata = probe.metadata().clone();
1132        drop(probe);
1133
1134        // Column projection (top-level / leaf indices for flat schemas).
1135        let projection = if let Some(w) = &wanted {
1136            let indices: Vec<usize> = arrow_schema
1137                .fields()
1138                .iter()
1139                .enumerate()
1140                .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1141                .map(|(i, _)| i)
1142                .collect();
1143            if indices.is_empty() {
1144                return Err(AppError::Internal(format!(
1145                    "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1146                    d.name,
1147                    d.columns,
1148                    f.display()
1149                )));
1150            }
1151            ProjectionMask::roots(&parquet_schema, indices)
1152        } else {
1153            ProjectionMask::all()
1154        };
1155
1156        // Dictionary override: any Utf8 column whose first row group carries
1157        // a dictionary page is re-typed to Dictionary(Int32, Utf8). The
1158        // override schema must still describe every column in the parquet
1159        // file (projection is applied separately). Skipped entirely when
1160        // the dataset has `dict_encode = false` — escape hatch for cases
1161        // where the override interacts badly with null propagation in the
1162        // downstream engine.
1163        let mut new_fields: Vec<Field> = arrow_schema
1164            .fields()
1165            .iter()
1166            .map(|f| f.as_ref().clone())
1167            .collect();
1168        if d.dict_encode
1169            && let Some(rg0) = metadata.row_groups().first()
1170        {
1171            for (i, fld) in arrow_schema.fields().iter().enumerate() {
1172                if !matches!(
1173                    fld.data_type(),
1174                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1175                ) {
1176                    continue;
1177                }
1178                if let Some(col) = rg0.columns().get(i)
1179                    && col.dictionary_page_offset().is_some()
1180                {
1181                    new_fields[i] = Field::new(
1182                        fld.name(),
1183                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1184                        fld.is_nullable(),
1185                    );
1186                }
1187            }
1188        }
1189        let forced_schema = Arc::new(Schema::new(new_fields));
1190
1191        let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1192        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1193            .with_batch_size(65_536)
1194            .with_projection(projection)
1195            .build()?;
1196        // Hive-style partition columns (`city=NYC/…`) live in the path, not
1197        // the file. Fold them in as constant Utf8 columns so they show up in
1198        // the schema and are queryable — matching the DuckDB backend.
1199        let pairs = hive_pairs(f);
1200        for batch in reader {
1201            let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1202            all.push(if pairs.is_empty() {
1203                batch
1204            } else {
1205                append_partition_cols(&batch, &pairs)?
1206            });
1207        }
1208    }
1209    if all.is_empty() {
1210        return Err(AppError::Internal(format!(
1211            "dataset '{}': parquet source is empty",
1212            d.name
1213        )));
1214    }
1215    Ok(all)
1216}
1217
1218/// Ordered hive-style partition `(key, value)` pairs encoded in a path, i.e.
1219/// directory components shaped like `key=value` (e.g. `year=2024/city=NYC`).
1220fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1221    path.components()
1222        .filter_map(|c| c.as_os_str().to_str())
1223        .filter_map(|seg| {
1224            let (k, v) = seg.split_once('=')?;
1225            if k.is_empty() || v.is_empty() || v.contains('=') {
1226                return None;
1227            }
1228            Some((k.to_string(), v.to_string()))
1229        })
1230        .collect()
1231}
1232
1233/// Append constant Utf8 columns for each hive partition pair. A partition
1234/// key that collides with a real file column is skipped (the file wins).
1235fn append_partition_cols(
1236    batch: &RecordBatch,
1237    pairs: &[(String, String)],
1238) -> Result<RecordBatch, AppError> {
1239    let n = batch.num_rows();
1240    let mut fields: Vec<Field> = batch
1241        .schema()
1242        .fields()
1243        .iter()
1244        .map(|f| f.as_ref().clone())
1245        .collect();
1246    let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1247    for (k, v) in pairs {
1248        if fields.iter().any(|f| f.name() == k) {
1249            continue;
1250        }
1251        fields.push(Field::new(k, DataType::Utf8, false));
1252        cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1253    }
1254    RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1255        .map_err(|e| AppError::Internal(e.to_string()))
1256}
1257
1258/// Register an `AmazonS3` object store on the SessionContext, build a
1259/// `ListingTable` (with any hive partition columns) over the dataset prefix,
1260/// and stream the whole dataset back through `DataFrame::collect`. Using a
1261/// ListingTable here — rather than a bare `read_parquet` — means S3 sources
1262/// get the same hive-partition handling as local parquet.
1263async fn read_s3_parquet(
1264    d: &DatasetConfig,
1265    ctx: &SessionContext,
1266) -> Result<Vec<RecordBatch>, AppError> {
1267    register_s3_object_store(d, ctx)?;
1268    let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1269    let df = ctx
1270        .read_table(provider)
1271        .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1272    Ok(df.collect().await?)
1273}
1274
1275/// Open a Delta table (local or S3) and stream every row back as a Vec of
1276/// `RecordBatch`. We materialise eagerly so the rest of the backend can
1277/// treat all datasets uniformly (single in-memory batch + eq-index).
1278async fn read_delta(
1279    d: &DatasetConfig,
1280    opts: HashMap<String, String>,
1281) -> Result<Vec<RecordBatch>, AppError> {
1282    let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1283        AppError::Internal(format!(
1284            "dataset '{}': bad delta location '{}': {e}",
1285            d.name, d.source.location
1286        ))
1287    })?;
1288    let table = deltalake::open_table_with_storage_options(url, opts)
1289        .await
1290        .map_err(|e| {
1291            AppError::Internal(format!(
1292                "dataset '{}': delta open '{}': {e}",
1293                d.name, d.source.location
1294            ))
1295        })?;
1296    let provider = table.table_provider().await.map_err(|e| {
1297        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1298    })?;
1299    // Drive a full scan via a throwaway SessionContext so we end up with
1300    // an in-memory Vec<RecordBatch> the shared materialise path can use.
1301    let scan_ctx = SessionContext::new();
1302    let df = scan_ctx
1303        .read_table(provider)
1304        .map_err(|e| AppError::Internal(format!("dataset '{}': delta read_table: {e}", d.name)))?;
1305    Ok(df.collect().await?)
1306}
1307
1308/// Build the storage-options HashMap that `deltalake::open_table_with_storage_options`
1309/// expects for S3 access. Keys mirror the AWS env-var names; deltalake
1310/// passes them through to object_store internally.
1311fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1312    let creds = d.resolved_creds();
1313    let region = d.resolved_region();
1314    let s3 = d.s3.clone().unwrap_or_default();
1315    let (bucket, _) = d.source.s3_bucket()?;
1316
1317    let mut opts = HashMap::new();
1318    opts.insert("AWS_REGION".into(), region);
1319    if let Some(ep) = s3.effective_endpoint(bucket) {
1320        opts.insert("AWS_ENDPOINT_URL".into(), ep);
1321    }
1322    if s3.allow_http {
1323        opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1324    }
1325    opts.insert(
1326        "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1327        (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1328    );
1329    if let Some(k) = creds.access_key_id {
1330        opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1331    }
1332    if let Some(s) = creds.secret_access_key {
1333        opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1334    }
1335    if let Some(t) = creds.session_token {
1336        opts.insert("AWS_SESSION_TOKEN".into(), t);
1337    }
1338    // Read-only paths don't need the S3 lock-provider plumbing.
1339    opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1340    Ok(opts)
1341}
1342
1343/// Construct an `AmazonS3` object_store from the dataset's `[dataset.s3]`
1344/// block + resolved credentials and register it on `ctx` under
1345/// `s3://bucket/`.
1346fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1347    let (bucket, _key) = d.source.s3_bucket()?;
1348    let creds = d.resolved_creds();
1349    let region = d.resolved_region();
1350    let s3 = d.s3.clone().unwrap_or_default();
1351
1352    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1353        AppError::Internal(format!(
1354            "dataset '{}': build S3 store for '{bucket}': {e}",
1355            d.name
1356        ))
1357    })?;
1358
1359    let url = Url::parse(&format!("s3://{bucket}"))
1360        .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1361    ctx.register_object_store(&url, Arc::new(store));
1362    Ok(())
1363}
1364
1365fn build_s3(
1366    bucket: &str,
1367    region: &str,
1368    s3: &S3Config,
1369    creds: &ResolvedCreds,
1370) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1371    let mut b = AmazonS3Builder::new()
1372        .with_bucket_name(bucket)
1373        .with_region(region)
1374        .with_allow_http(s3.allow_http)
1375        .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1376    if let Some(ep) = s3.effective_endpoint(bucket) {
1377        b = b.with_endpoint(ep);
1378    }
1379    if let Some(k) = creds.access_key_id.as_deref() {
1380        b = b.with_access_key_id(k);
1381    }
1382    if let Some(s) = creds.secret_access_key.as_deref() {
1383        b = b.with_secret_access_key(s);
1384    }
1385    if let Some(t) = creds.session_token.as_deref() {
1386        b = b.with_token(t);
1387    }
1388    b.build()
1389}
1390
1391fn arrow_to_logical(dt: &DataType) -> LogicalType {
1392    match dt {
1393        DataType::Boolean => LogicalType::Bool,
1394        DataType::Int8
1395        | DataType::Int16
1396        | DataType::Int32
1397        | DataType::Int64
1398        | DataType::UInt8
1399        | DataType::UInt16
1400        | DataType::UInt32
1401        | DataType::UInt64 => LogicalType::Int,
1402        DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1403        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1404        // Dictionary-encoded strings are reported as plain strings — clients
1405        // (and the rest of the backend) shouldn't have to care that we keep
1406        // a compressed representation in memory.
1407        DataType::Dictionary(_, v)
1408            if matches!(
1409                v.as_ref(),
1410                DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1411            ) =>
1412        {
1413            LogicalType::Utf8
1414        }
1415        DataType::Date32
1416        | DataType::Date64
1417        | DataType::Time32(_)
1418        | DataType::Time64(_)
1419        | DataType::Timestamp(_, _)
1420        | DataType::Duration(_)
1421        | DataType::Interval(_) => LogicalType::Temporal,
1422        _ => LogicalType::Other,
1423    }
1424}
1425
1426// ---------------------------------------------------------------------------
1427// Per-batch projection
1428// ---------------------------------------------------------------------------
1429
1430fn project(
1431    schema: &DatasetSchema,
1432    batch: RecordBatch,
1433    columns: &[String],
1434) -> Result<RecordBatch, AppError> {
1435    if columns.is_empty() {
1436        return Ok(batch);
1437    }
1438    let indices: Vec<usize> = columns
1439        .iter()
1440        .map(|c| {
1441            schema
1442                .find(c)
1443                .map(|info| schema.by_name[&info.name.to_lowercase()])
1444        })
1445        .collect::<Result<_, _>>()?;
1446    let fields: Vec<Field> = indices
1447        .iter()
1448        .map(|&i| batch.schema().field(i).clone())
1449        .collect();
1450    let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1451    Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1452}
1453
1454// ---------------------------------------------------------------------------
1455// SQL builder
1456// ---------------------------------------------------------------------------
1457
1458/// Accumulates the typed literal values for a parameterised query.
1459///
1460/// Predicate values are never interpolated into the SQL text. Instead each
1461/// value is pushed here and the builder emits a positional placeholder
1462/// (`$1`, `$2`, …) referencing it. The collected [`ScalarValue`]s are bound
1463/// to the logical plan via [`DataFrame::with_param_values`], so user input
1464/// reaches the engine as typed scalars and can never alter the query
1465/// structure (no SQL injection surface, no escaping to get wrong).
1466#[derive(Default)]
1467struct Params {
1468    values: Vec<ScalarValue>,
1469}
1470
1471impl Params {
1472    fn new() -> Self {
1473        Self::default()
1474    }
1475
1476    /// Bind `v` and return its `$N` placeholder token.
1477    fn bind(&mut self, v: ScalarValue) -> String {
1478        self.values.push(v);
1479        format!("${}", self.values.len())
1480    }
1481
1482    fn into_values(self) -> Vec<ScalarValue> {
1483        self.values
1484    }
1485}
1486
1487fn build_query_sql(
1488    schema: &DatasetSchema,
1489    req: &QueryRequest,
1490    max_page_size: u64,
1491) -> Result<(String, Vec<ScalarValue>), AppError> {
1492    let (limit, offset) = req.effective_limit_offset(max_page_size);
1493    build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1494}
1495
1496fn build_query_stream_sql(
1497    schema: &DatasetSchema,
1498    req: &QueryRequest,
1499) -> Result<(String, Vec<ScalarValue>), AppError> {
1500    let suffix = req
1501        .limit
1502        .map(|limit| format!(" LIMIT {limit}"))
1503        .unwrap_or_default();
1504    build_query_sql_with_suffix(schema, req, &suffix)
1505}
1506
1507fn build_query_sql_with_suffix(
1508    schema: &DatasetSchema,
1509    req: &QueryRequest,
1510    suffix: &str,
1511) -> Result<(String, Vec<ScalarValue>), AppError> {
1512    let agg_plan = req.agg_plan(schema)?;
1513
1514    let cols = if let Some(plan) = &agg_plan {
1515        // Group cols, then aggregations, each aliased to the JSON output key.
1516        let mut parts: Vec<String> = plan
1517            .group_cols
1518            .iter()
1519            .map(|c| DatasetSchema::quote_ident(c))
1520            .collect();
1521        for a in &plan.aggs {
1522            let expr = a.sql_expr()?;
1523            parts.push(format!(
1524                "{expr} AS {}",
1525                DatasetSchema::quote_ident(&a.alias)
1526            ));
1527        }
1528        parts.join(", ")
1529    } else if req.columns.is_empty() {
1530        if req.distinct {
1531            "DISTINCT *".to_string()
1532        } else {
1533            "*".to_string()
1534        }
1535    } else {
1536        let list = req
1537            .columns
1538            .iter()
1539            .map(|c| {
1540                schema
1541                    .find(c)
1542                    .map(|info| DatasetSchema::quote_ident(&info.name))
1543            })
1544            .collect::<Result<Vec<_>, _>>()?
1545            .join(", ");
1546        if req.distinct {
1547            format!("DISTINCT {list}")
1548        } else {
1549            list
1550        }
1551    };
1552
1553    let mut params = Params::new();
1554    let clauses: Vec<String> = req
1555        .predicates
1556        .iter()
1557        .map(|p| pred_to_sql(schema, p, &mut params))
1558        .collect::<Result<_, _>>()?;
1559
1560    let table = DatasetSchema::quote_ident(&schema.name);
1561    let where_clause = if clauses.is_empty() {
1562        String::new()
1563    } else {
1564        format!(" WHERE {}", clauses.join(" AND "))
1565    };
1566    let group_clause = match &agg_plan {
1567        Some(p) => format!(
1568            " GROUP BY {}",
1569            p.group_cols
1570                .iter()
1571                .map(|c| DatasetSchema::quote_ident(c))
1572                .collect::<Vec<_>>()
1573                .join(", "),
1574        ),
1575        None => String::new(),
1576    };
1577    let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
1578        Some(s) => format!(" ORDER BY {s}"),
1579        None => String::new(),
1580    };
1581    let sql =
1582        format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{order_clause}{suffix}");
1583    Ok((sql, params.into_values()))
1584}
1585
1586fn build_count_sql(
1587    schema: &DatasetSchema,
1588    predicates: &[Predicate],
1589) -> Result<(String, Vec<ScalarValue>), AppError> {
1590    let mut params = Params::new();
1591    let clauses: Vec<String> = predicates
1592        .iter()
1593        .map(|p| pred_to_sql(schema, p, &mut params))
1594        .collect::<Result<_, _>>()?;
1595    let table = DatasetSchema::quote_ident(&schema.name);
1596    let where_clause = if clauses.is_empty() {
1597        String::new()
1598    } else {
1599        format!(" WHERE {}", clauses.join(" AND "))
1600    };
1601    let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
1602    Ok((sql, params.into_values()))
1603}
1604
1605fn pred_to_sql(
1606    schema: &DatasetSchema,
1607    pred: &Predicate,
1608    params: &mut Params,
1609) -> Result<String, AppError> {
1610    let info = schema.find(&pred.col)?;
1611    let col = DatasetSchema::quote_ident(&info.name);
1612
1613    match pred.op.as_str() {
1614        "is_null" => return Ok(format!("{col} IS NULL")),
1615        "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
1616        _ => {}
1617    }
1618
1619    let val = pred
1620        .val
1621        .as_ref()
1622        .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
1623
1624    if pred.op == "in" {
1625        let items = val
1626            .as_array()
1627            .filter(|a| !a.is_empty())
1628            .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
1629        let placeholders: Vec<String> = items
1630            .iter()
1631            .map(|item| Ok(params.bind(json_to_scalar(item)?)))
1632            .collect::<Result<_, AppError>>()?;
1633        return Ok(format!("{col} IN ({})", placeholders.join(", ")));
1634    }
1635
1636    let sql_op = match pred.op.as_str() {
1637        "eq" => "=",
1638        "neq" => "!=",
1639        "gt" => ">",
1640        "gte" => ">=",
1641        "lt" => "<",
1642        "lte" => "<=",
1643        "like" => "LIKE",
1644        "ilike" => "ILIKE",
1645        other => return Err(AppError::UnknownOperator(other.into())),
1646    };
1647    let placeholder = params.bind(json_to_scalar(val)?);
1648    Ok(format!("{col} {sql_op} {placeholder}"))
1649}
1650
1651/// Convert a JSON predicate value into a typed Arrow [`ScalarValue`] for
1652/// binding as a query parameter. The engine applies the usual numeric
1653/// widening / comparison coercion against the target column type.
1654fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
1655    match val {
1656        JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
1657        JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
1658        JsonValue::Null => Ok(ScalarValue::Null),
1659        JsonValue::Number(n) => {
1660            if let Some(i) = n.as_i64() {
1661                Ok(ScalarValue::Int64(Some(i)))
1662            } else if let Some(u) = n.as_u64() {
1663                Ok(ScalarValue::UInt64(Some(u)))
1664            } else if let Some(f) = n.as_f64() {
1665                Ok(ScalarValue::Float64(Some(f)))
1666            } else {
1667                Err(AppError::InvalidValue(
1668                    "unsupported numeric literal in predicate".into(),
1669                ))
1670            }
1671        }
1672        _ => Err(AppError::InvalidValue(
1673            "unsupported literal type in predicate".into(),
1674        )),
1675    }
1676}
1677
1678// ---------------------------------------------------------------------------
1679// Equality index — built once at startup, queried on every predicate request
1680// ---------------------------------------------------------------------------
1681
1682fn json_index_key(val: &JsonValue) -> Option<String> {
1683    match val {
1684        JsonValue::String(s) => Some(s.clone()),
1685        JsonValue::Number(n) => Some(n.to_string()),
1686        JsonValue::Bool(b) => Some(b.to_string()),
1687        _ => None,
1688    }
1689}
1690
1691fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1692    let mut out = Vec::new();
1693    let (mut i, mut j) = (0, 0);
1694    while i < a.len() && j < b.len() {
1695        match a[i].cmp(&b[j]) {
1696            Ordering::Equal => {
1697                out.push(a[i]);
1698                i += 1;
1699                j += 1;
1700            }
1701            Ordering::Less => i += 1,
1702            Ordering::Greater => j += 1,
1703        }
1704    }
1705    out
1706}
1707
1708fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1709    let mut out = Vec::with_capacity(a.len() + b.len());
1710    let (mut i, mut j) = (0, 0);
1711    while i < a.len() && j < b.len() {
1712        match a[i].cmp(&b[j]) {
1713            Ordering::Less => {
1714                out.push(a[i]);
1715                i += 1;
1716            }
1717            Ordering::Greater => {
1718                out.push(b[j]);
1719                j += 1;
1720            }
1721            Ordering::Equal => {
1722                out.push(a[i]);
1723                i += 1;
1724                j += 1;
1725            }
1726        }
1727    }
1728    out.extend_from_slice(&a[i..]);
1729    out.extend_from_slice(&b[j..]);
1730    out
1731}
1732
1733fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1734    if predicates.is_empty() || index.is_empty() {
1735        return None;
1736    }
1737
1738    // Fast path: a single `eq` predicate resolves to exactly one index bucket,
1739    // so borrow it directly instead of cloning the row-id vector.
1740    if let [pred] = predicates
1741        && pred.op.as_str() == "eq"
1742    {
1743        let col_lower = pred.col.to_lowercase();
1744        let col_map = index.get(&col_lower)?;
1745        let key = json_index_key(pred.val.as_ref()?)?;
1746        return Some(match col_map.get(&key) {
1747            Some(rows) => Cow::Borrowed(rows.as_slice()),
1748            None => Cow::Owned(Vec::new()),
1749        });
1750    }
1751
1752    let mut result: Option<Vec<u32>> = None;
1753    for pred in predicates {
1754        let col_lower = pred.col.to_lowercase();
1755        let col_map = index.get(&col_lower)?;
1756
1757        let rows: Vec<u32> = match pred.op.as_str() {
1758            "eq" => {
1759                let key = json_index_key(pred.val.as_ref()?)?;
1760                col_map.get(&key).cloned().unwrap_or_default()
1761            }
1762            "in" => {
1763                let items = pred.val.as_ref()?.as_array()?;
1764                let mut merged: Vec<u32> = Vec::new();
1765                for item in items {
1766                    if let Some(r) = col_map.get(&json_index_key(item)?) {
1767                        merged = union_sorted(&merged, r);
1768                    }
1769                }
1770                merged
1771            }
1772            _ => return None,
1773        };
1774
1775        result = Some(match result {
1776            None => rows,
1777            Some(r) => intersect_sorted(&r, &rows),
1778        });
1779    }
1780    result.map(Cow::Owned)
1781}
1782
1783/// Benchmark-only hooks for the equality-index hot path. Hidden from docs and
1784/// not part of the stable API — exposed solely so `benches/index.rs` can build
1785/// an `EqIndex` and exercise `try_index` directly.
1786#[doc(hidden)]
1787pub mod bench {
1788    use super::{EqIndex, FastMap, json_index_key, try_index};
1789    use datapress_core::models::Predicate;
1790    use serde_json::Value as JsonValue;
1791    use std::borrow::Cow;
1792
1793    /// Opaque equality index wrapper for benches (the inner alias is private).
1794    pub struct BenchIndex(EqIndex);
1795
1796    /// Build an index with a single `col` whose `val` bucket holds `rows`.
1797    /// The bucket key is derived with the same `json_index_key` the query path
1798    /// uses, so a matching `eq` predicate is guaranteed to hit.
1799    pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
1800        let key = json_index_key(val).expect("benchable index key");
1801        let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
1802        col_map.insert(key, rows);
1803        let mut index: EqIndex = EqIndex::default();
1804        index.insert(col.to_string(), col_map);
1805        BenchIndex(index)
1806    }
1807
1808    /// Resolve `predicates` against the index — the timed operation.
1809    pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1810        try_index(&idx.0, predicates)
1811    }
1812
1813    /// Reference implementation of the pre-`Cow` behaviour: always clones the
1814    /// matched bucket into an owned `Vec` (what `try_index` did before the
1815    /// single-`eq` borrow fast path). Used as the `clone-before` baseline so
1816    /// the borrow win is measured against the old allocation cost in-process.
1817    pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
1818        let [pred] = predicates else { return None };
1819        if pred.op.as_str() != "eq" {
1820            return None;
1821        }
1822        let col_lower = pred.col.to_lowercase();
1823        let col_map = idx.0.get(&col_lower)?;
1824        let key = json_index_key(pred.val.as_ref()?)?;
1825        Some(col_map.get(&key).cloned().unwrap_or_default())
1826    }
1827}
1828
1829/// Return rows `[offset, offset+limit)` from a chunked dataset by slicing
1830/// the underlying batches (zero-copy) and concatenating the (small) page.
1831fn slice_global(
1832    chunks: &[RecordBatch],
1833    schema: &Arc<Schema>,
1834    offset: usize,
1835    limit: usize,
1836) -> Result<RecordBatch, AppError> {
1837    if limit == 0 || chunks.is_empty() {
1838        return Ok(RecordBatch::new_empty(schema.clone()));
1839    }
1840    let mut out = Vec::new();
1841    let mut to_skip = offset;
1842    let mut remaining = limit;
1843    for b in chunks {
1844        if remaining == 0 {
1845            break;
1846        }
1847        let n = b.num_rows();
1848        if to_skip >= n {
1849            to_skip -= n;
1850            continue;
1851        }
1852        let take = remaining.min(n - to_skip);
1853        out.push(b.slice(to_skip, take));
1854        to_skip = 0;
1855        remaining -= take;
1856    }
1857    if out.is_empty() {
1858        return Ok(RecordBatch::new_empty(schema.clone()));
1859    }
1860    compute::concat_batches(schema, out.iter()).map_err(AppError::from)
1861}
1862
1863/// Materialise the page `rows[offset..offset+limit]` from a chunked dataset.
1864/// Row ids are global (across the concatenation of all chunks). We map each
1865/// requested row to its (chunk, local-index), `take` per chunk, then stitch
1866/// the per-chunk results back together preserving the original row order.
1867fn take_page(
1868    chunks: &[RecordBatch],
1869    schema: &Arc<Schema>,
1870    rows: &[u32],
1871    offset: usize,
1872    limit: usize,
1873) -> Result<RecordBatch, AppError> {
1874    let start = offset.min(rows.len());
1875    let len = limit.min(rows.len() - start);
1876    if len == 0 || chunks.is_empty() {
1877        return Ok(RecordBatch::new_empty(schema.clone()));
1878    }
1879
1880    // Prefix-sum table: `offsets[i]` is the first global row id of chunk `i`,
1881    // and `offsets.last()` is the total row count.
1882    let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
1883    let mut acc: u32 = 0;
1884    offsets.push(0);
1885    for b in chunks {
1886        acc = acc
1887            .checked_add(b.num_rows() as u32)
1888            .expect("row count exceeds u32::MAX");
1889        offsets.push(acc);
1890    }
1891
1892    // Bucket each global row id into the chunk that contains it, remembering
1893    // the original output position so we can restore page order at the end.
1894    let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
1895    for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
1896        let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
1897        let local = gid - offsets[bi];
1898        buckets[bi].push((out_pos as u32, local));
1899    }
1900
1901    // Per-chunk take, recording the destination index for each emitted row.
1902    let mut takens: Vec<RecordBatch> = Vec::new();
1903    let mut dest: Vec<u32> = Vec::with_capacity(len);
1904    for (bi, bucket) in buckets.iter().enumerate() {
1905        if bucket.is_empty() {
1906            continue;
1907        }
1908        let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
1909        let cols: Vec<ArrayRef> = chunks[bi]
1910            .columns()
1911            .iter()
1912            .map(|c| {
1913                arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
1914                    .map_err(AppError::from)
1915            })
1916            .collect::<Result<_, _>>()?;
1917        takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
1918        dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
1919    }
1920
1921    // Stitch per-chunk results then permute to restore the requested order.
1922    let stitched = compute::concat_batches(schema, takens.iter())?;
1923    let mut inv = vec![0u32; len];
1924    for (i, &d) in dest.iter().enumerate() {
1925        inv[d as usize] = i as u32;
1926    }
1927    let perm = UInt32Array::from(inv);
1928    let cols: Vec<ArrayRef> = stitched
1929        .columns()
1930        .iter()
1931        .map(|c| {
1932            arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
1933                .map_err(AppError::from)
1934        })
1935        .collect::<Result<_, _>>()?;
1936    RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
1937}
1938
1939/// Build the equality index per the dataset's policy, against the chunked
1940/// representation. Row ids are global across the concatenation of all
1941/// chunks (so they remain compatible with `take_page` / `slice_global`).
1942fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
1943    use rayon::prelude::*;
1944
1945    if cfg.mode == IndexMode::None || chunks.is_empty() {
1946        return EqIndex::default();
1947    }
1948
1949    let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
1950        Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
1951    } else {
1952        None
1953    };
1954
1955    let max_card = if cfg.mode == IndexMode::Auto {
1956        Some(cfg.max_cardinality)
1957    } else {
1958        None
1959    };
1960
1961    // Per-chunk starting global row id.
1962    let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
1963    let mut acc: u32 = 0;
1964    for b in chunks {
1965        batch_offsets.push(acc);
1966        acc = acc
1967            .checked_add(b.num_rows() as u32)
1968            .expect("row count exceeds u32::MAX");
1969    }
1970
1971    let schema = chunks[0].schema();
1972
1973    schema
1974        .fields()
1975        .par_iter()
1976        .enumerate()
1977        .filter_map(|(ci, field)| {
1978            let col_lower = field.name().to_lowercase();
1979            if let Some(a) = &allow
1980                && !a.contains_key(&col_lower)
1981            {
1982                return None;
1983            }
1984
1985            // Only build for index-friendly types; skip everything else
1986            // up-front so we don't pay the per-chunk dispatch cost.
1987            let dtype = field.data_type();
1988            let dict_utf8 = matches!(dtype,
1989                DataType::Dictionary(k, v)
1990                    if matches!(k.as_ref(), DataType::Int32)
1991                    && matches!(v.as_ref(), DataType::Utf8));
1992            match dtype {
1993                DataType::Utf8
1994                | DataType::Utf8View
1995                | DataType::Boolean
1996                | DataType::Int8
1997                | DataType::Int16
1998                | DataType::Int32
1999                | DataType::Int64 => {}
2000                _ if dict_utf8 => {}
2001                _ => return None,
2002            }
2003
2004            let mut map: FastMap<String, Vec<u32>> = FastMap::default();
2005
2006            for (bi, batch) in chunks.iter().enumerate() {
2007                let base = batch_offsets[bi];
2008                let col = batch.column(ci);
2009
2010                macro_rules! index_col {
2011                    ($arr_ty:ty) => {{
2012                        let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2013                        for row in 0..arr.len() {
2014                            if arr.is_null(row) {
2015                                continue;
2016                            }
2017                            let key = arr.value(row).to_string();
2018                            let gid = base + row as u32;
2019                            if let Some(v) = map.get_mut(&key) {
2020                                v.push(gid);
2021                            } else {
2022                                if let Some(mc) = max_card {
2023                                    if map.len() >= mc {
2024                                        return None;
2025                                    }
2026                                }
2027                                map.insert(key, vec![gid]);
2028                            }
2029                        }
2030                    }};
2031                }
2032
2033                if dict_utf8 {
2034                    // Dictionary(Int32, Utf8): iterate keys + look up the
2035                    // string value from the (small) dictionary. We allocate
2036                    // the key string only when the value is new — repeated
2037                    // values reuse the existing HashMap entry by hash, but
2038                    // `HashMap::get_mut` still needs the key, so we use a
2039                    // borrowed lookup via `get` first to avoid the alloc.
2040                    let arr = col
2041                        .as_any()
2042                        .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2043                        )?;
2044                    let keys = arr.keys();
2045                    let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2046                    for row in 0..arr.len() {
2047                        if arr.is_null(row) {
2048                            continue;
2049                        }
2050                        let k = keys.value(row) as usize;
2051                        let s = values.value(k);
2052                        let gid = base + row as u32;
2053                        if let Some(v) = map.get_mut(s) {
2054                            v.push(gid);
2055                        } else {
2056                            if let Some(mc) = max_card
2057                                && map.len() >= mc
2058                            {
2059                                return None;
2060                            }
2061                            map.insert(s.to_string(), vec![gid]);
2062                        }
2063                    }
2064                } else {
2065                    match dtype {
2066                        DataType::Utf8 => index_col!(StringArray),
2067                        DataType::Utf8View => index_col!(StringViewArray),
2068                        DataType::Boolean => index_col!(BooleanArray),
2069                        DataType::Int8 => index_col!(Int8Array),
2070                        DataType::Int16 => index_col!(Int16Array),
2071                        DataType::Int32 => index_col!(Int32Array),
2072                        DataType::Int64 => index_col!(Int64Array),
2073                        _ => unreachable!(),
2074                    }
2075                }
2076            }
2077
2078            Some((col_lower, map))
2079        })
2080        .collect()
2081}
2082
2083// ---------------------------------------------------------------------------
2084// Serialise-time temporal cast: convert Timestamp/Date/Time columns to Utf8
2085// on the page batch right before JSON encoding. We deliberately do **not**
2086// pay this cost at load time — a `Date32` is 4 bytes per row, its ISO-8601
2087// rendering is ~10–24 bytes per row, and a wide dataset full of temporal
2088// columns would balloon resident RAM. The cast is applied per returned page
2089// after pagination, so the cost is paid only for rows the caller requested.
2090// ---------------------------------------------------------------------------
2091
2092/// Returns true for Arrow types that `write_value` can render directly. Any
2093/// type returning false is pre-cast to Utf8 in [`cast_for_serialize`] so the
2094/// JSON output is faithful rather than silently `null`.
2095fn writable_inline(dt: &DataType) -> bool {
2096    match dt {
2097        DataType::Utf8
2098        | DataType::LargeUtf8
2099        | DataType::Utf8View
2100        | DataType::Boolean
2101        | DataType::Int8
2102        | DataType::Int16
2103        | DataType::Int32
2104        | DataType::Int64
2105        | DataType::UInt8
2106        | DataType::UInt16
2107        | DataType::UInt32
2108        | DataType::UInt64
2109        | DataType::Float32
2110        | DataType::Float64
2111        | DataType::Decimal128(_, _)
2112        | DataType::Decimal256(_, _) => true,
2113        DataType::Dictionary(k, v)
2114            if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2115        {
2116            true
2117        }
2118        _ => false,
2119    }
2120}
2121
2122/// Cast any column whose dtype isn't directly writable by `write_value` to
2123/// `Utf8`, on the bounded page batch. Covers temporals (Timestamp/Date/Time)
2124/// — kept native in resident memory to save RAM — and also any exotic dtype
2125/// (Float16, Binary, List, Struct, Decimal-with-unsupported-precision, …)
2126/// so the JSON serializer never falls back to writing literal `null`.
2127fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2128    let schema = batch.schema();
2129    let to_cast: Vec<usize> = schema
2130        .fields()
2131        .iter()
2132        .enumerate()
2133        .filter_map(|(i, f)| {
2134            if writable_inline(f.data_type()) {
2135                None
2136            } else {
2137                Some(i)
2138            }
2139        })
2140        .collect();
2141    if to_cast.is_empty() {
2142        return Ok(batch.clone());
2143    }
2144    let new_fields: Vec<Field> = schema
2145        .fields()
2146        .iter()
2147        .enumerate()
2148        .map(|(i, f)| {
2149            if to_cast.contains(&i) {
2150                Field::new(f.name(), DataType::Utf8, f.is_nullable())
2151            } else {
2152                f.as_ref().clone()
2153            }
2154        })
2155        .collect();
2156    let new_schema = Arc::new(Schema::new(new_fields));
2157    let cols: Vec<ArrayRef> = batch
2158        .columns()
2159        .iter()
2160        .enumerate()
2161        .map(|(i, c)| {
2162            if to_cast.contains(&i) {
2163                compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2164            } else {
2165                Ok(c.clone())
2166            }
2167        })
2168        .collect::<Result<_, _>>()?;
2169    RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2170}
2171
2172// ---------------------------------------------------------------------------
2173// Compute helpers — retained for symmetry; reserved for future inline scan
2174// path. Currently the engine fallback handles all non-index queries.
2175// ---------------------------------------------------------------------------
2176
2177#[allow(dead_code)]
2178#[derive(Clone, Copy)]
2179enum CmpOp {
2180    Eq,
2181    Neq,
2182    Gt,
2183    Gte,
2184    Lt,
2185    Lte,
2186    Like,
2187    ILike,
2188}
2189
2190#[allow(dead_code)]
2191fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2192    let arr = col
2193        .as_any()
2194        .downcast_ref::<StringArray>()
2195        .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2196    let s = Scalar::new(StringArray::from(vec![val]));
2197    Ok(eq(arr, &s)?)
2198}
2199
2200#[allow(dead_code)]
2201fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2202    macro_rules! num_cmp {
2203        ($arr_type:ty, $cast:ty) => {{
2204            let n = val
2205                .as_f64()
2206                .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2207                as $cast;
2208            let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2209            let s = Scalar::new(<$arr_type>::from(vec![n]));
2210            Ok(match op {
2211                CmpOp::Eq => eq(arr, &s)?,
2212                CmpOp::Neq => neq(arr, &s)?,
2213                CmpOp::Gt => gt(arr, &s)?,
2214                CmpOp::Gte => gt_eq(arr, &s)?,
2215                CmpOp::Lt => lt(arr, &s)?,
2216                CmpOp::Lte => lt_eq(arr, &s)?,
2217                CmpOp::Like | CmpOp::ILike => {
2218                    return Err(AppError::InvalidValue(
2219                        "LIKE requires a string column".into(),
2220                    ));
2221                }
2222            })
2223        }};
2224    }
2225    match col.data_type() {
2226        DataType::Utf8 => {
2227            let s = val
2228                .as_str()
2229                .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2230            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2231            let sc = Scalar::new(StringArray::from(vec![s]));
2232            Ok(match op {
2233                CmpOp::Eq => eq(arr, &sc)?,
2234                CmpOp::Neq => neq(arr, &sc)?,
2235                CmpOp::Gt => gt(arr, &sc)?,
2236                CmpOp::Gte => gt_eq(arr, &sc)?,
2237                CmpOp::Lt => lt(arr, &sc)?,
2238                CmpOp::Lte => lt_eq(arr, &sc)?,
2239                CmpOp::Like => compute::like(arr, &sc)?,
2240                CmpOp::ILike => compute::ilike(arr, &sc)?,
2241            })
2242        }
2243        DataType::Int8 => num_cmp!(Int8Array, i8),
2244        DataType::Int16 => num_cmp!(Int16Array, i16),
2245        DataType::Int32 => num_cmp!(Int32Array, i32),
2246        DataType::Int64 => num_cmp!(Int64Array, i64),
2247        DataType::Float32 => num_cmp!(Float32Array, f32),
2248        DataType::Float64 => num_cmp!(Float64Array, f64),
2249        dt => Err(AppError::InvalidValue(format!(
2250            "unsupported type for comparison: {dt:?}"
2251        ))),
2252    }
2253}
2254
2255// ---------------------------------------------------------------------------
2256// Serialisation
2257// ---------------------------------------------------------------------------
2258
2259pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2260    // Temporal columns are kept native in resident memory (compact). Cast
2261    // them — plus any other dtype `write_value` can't render directly — to
2262    // Utf8 here, on the bounded page batch, so the JSON output is faithful
2263    // without paying the load-time RAM cost.
2264    let batch = cast_for_serialize(batch)?;
2265    let schema = batch.schema();
2266    let n_rows = batch.num_rows();
2267
2268    let keys: Vec<Vec<u8>> = schema
2269        .fields()
2270        .iter()
2271        .map(|f| {
2272            let mut k = Vec::with_capacity(f.name().len() + 3);
2273            k.push(b'"');
2274            k.extend_from_slice(f.name().as_bytes());
2275            k.extend_from_slice(b"\":");
2276            k
2277        })
2278        .collect();
2279
2280    // Resolve every column's concrete Arrow array type exactly ONCE here,
2281    // instead of paying a `data_type()` match + `downcast_ref` for every
2282    // single cell. The inner row loop then dispatches over a small enum,
2283    // which the compiler turns into a cheap jump table.
2284    let encoders: Vec<ColEnc> = batch
2285        .columns()
2286        .iter()
2287        .map(|c| ColEnc::new(c.as_ref()))
2288        .collect();
2289
2290    let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2291    let mut itoa_buf = itoa::Buffer::new();
2292    let mut ryu_buf = ryu::Buffer::new();
2293    buf.push(b'[');
2294
2295    for row in 0..n_rows {
2296        if row > 0 {
2297            buf.push(b',');
2298        }
2299        buf.push(b'{');
2300        for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2301            if i > 0 {
2302                buf.push(b',');
2303            }
2304            buf.extend_from_slice(key);
2305            enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2306        }
2307        buf.push(b'}');
2308    }
2309
2310    buf.push(b']');
2311    Ok(unsafe { String::from_utf8_unchecked(buf) })
2312}
2313
2314/// Per-column JSON encoder. The concrete Arrow array type is resolved once
2315/// (in [`ColEnc::new`]) so the hot row loop dispatches over this small enum
2316/// instead of repeating `data_type()` matching + `downcast_ref` for every
2317/// cell. Each variant borrows the already-downcast typed array.
2318enum ColEnc<'a> {
2319    Utf8(&'a StringArray),
2320    LargeUtf8(&'a LargeStringArray),
2321    Utf8View(&'a StringViewArray),
2322    /// Dictionary-encoded UTF-8 (Int32 keys). Holds the keys array and the
2323    /// pre-downcast string values so neither is re-resolved per row.
2324    DictI32Utf8(
2325        &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2326        &'a StringArray,
2327    ),
2328    Bool(&'a BooleanArray),
2329    I8(&'a Int8Array),
2330    I16(&'a Int16Array),
2331    I32(&'a Int32Array),
2332    I64(&'a Int64Array),
2333    U8(&'a UInt8Array),
2334    U16(&'a UInt16Array),
2335    U32(&'a UInt32Array),
2336    U64(&'a UInt64Array),
2337    Dec128(&'a Decimal128Array),
2338    Dec256(&'a Decimal256Array),
2339    F32(&'a Float32Array),
2340    F64(&'a Float64Array),
2341    /// Anything else: fall back to the generic `write_value` dispatch.
2342    Other(&'a dyn Array),
2343}
2344
2345impl<'a> ColEnc<'a> {
2346    fn new(col: &'a dyn Array) -> ColEnc<'a> {
2347        macro_rules! dc {
2348            ($t:ty) => {
2349                col.as_any().downcast_ref::<$t>().unwrap()
2350            };
2351        }
2352        match col.data_type() {
2353            DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2354            DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2355            DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2356            DataType::Dictionary(key, value)
2357                if matches!(key.as_ref(), DataType::Int32)
2358                    && matches!(value.as_ref(), DataType::Utf8) =>
2359            {
2360                let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2361                let values = dict
2362                    .values()
2363                    .as_any()
2364                    .downcast_ref::<StringArray>()
2365                    .unwrap();
2366                ColEnc::DictI32Utf8(dict, values)
2367            }
2368            DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2369            DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2370            DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2371            DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2372            DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2373            DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2374            DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2375            DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2376            DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2377            DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2378            DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2379            DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2380            DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2381            _ => ColEnc::Other(col),
2382        }
2383    }
2384
2385    #[inline]
2386    fn write(
2387        &self,
2388        buf: &mut Vec<u8>,
2389        row: usize,
2390        itoa_buf: &mut itoa::Buffer,
2391        ryu_buf: &mut ryu::Buffer,
2392    ) {
2393        macro_rules! int {
2394            ($arr:expr) => {{
2395                if $arr.is_null(row) {
2396                    buf.extend_from_slice(b"null");
2397                } else {
2398                    buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
2399                }
2400            }};
2401        }
2402        match self {
2403            ColEnc::Utf8(a) => {
2404                if a.is_null(row) {
2405                    buf.extend_from_slice(b"null");
2406                } else {
2407                    write_str(buf, a.value(row));
2408                }
2409            }
2410            ColEnc::LargeUtf8(a) => {
2411                if a.is_null(row) {
2412                    buf.extend_from_slice(b"null");
2413                } else {
2414                    write_str(buf, a.value(row));
2415                }
2416            }
2417            ColEnc::Utf8View(a) => {
2418                if a.is_null(row) {
2419                    buf.extend_from_slice(b"null");
2420                } else {
2421                    write_str(buf, a.value(row));
2422                }
2423            }
2424            ColEnc::DictI32Utf8(keys, values) => {
2425                if keys.is_null(row) {
2426                    buf.extend_from_slice(b"null");
2427                } else {
2428                    let k = keys.keys().value(row) as usize;
2429                    write_str(buf, values.value(k));
2430                }
2431            }
2432            ColEnc::Bool(a) => {
2433                if a.is_null(row) {
2434                    buf.extend_from_slice(b"null");
2435                } else {
2436                    buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
2437                }
2438            }
2439            ColEnc::I8(a) => int!(a),
2440            ColEnc::I16(a) => int!(a),
2441            ColEnc::I32(a) => int!(a),
2442            ColEnc::I64(a) => int!(a),
2443            ColEnc::U8(a) => int!(a),
2444            ColEnc::U16(a) => int!(a),
2445            ColEnc::U32(a) => int!(a),
2446            ColEnc::U64(a) => int!(a),
2447            ColEnc::Dec128(a) => {
2448                if a.is_null(row) {
2449                    buf.extend_from_slice(b"null");
2450                } else {
2451                    write_str(buf, &a.value_as_string(row));
2452                }
2453            }
2454            ColEnc::Dec256(a) => {
2455                if a.is_null(row) {
2456                    buf.extend_from_slice(b"null");
2457                } else {
2458                    write_str(buf, &a.value_as_string(row));
2459                }
2460            }
2461            ColEnc::F32(a) => {
2462                if a.is_null(row) {
2463                    buf.extend_from_slice(b"null");
2464                } else {
2465                    let v = a.value(row);
2466                    if v.is_finite() {
2467                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2468                    } else {
2469                        buf.extend_from_slice(b"null");
2470                    }
2471                }
2472            }
2473            ColEnc::F64(a) => {
2474                if a.is_null(row) {
2475                    buf.extend_from_slice(b"null");
2476                } else {
2477                    let v = a.value(row);
2478                    if v.is_finite() {
2479                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2480                    } else {
2481                        buf.extend_from_slice(b"null");
2482                    }
2483                }
2484            }
2485            ColEnc::Other(col) => {
2486                if col.is_null(row) {
2487                    buf.extend_from_slice(b"null");
2488                } else {
2489                    write_value(buf, *col, row);
2490                }
2491            }
2492        }
2493    }
2494}
2495
2496#[inline]
2497fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
2498    match col.data_type() {
2499        DataType::Utf8 => write_str(
2500            buf,
2501            col.as_any()
2502                .downcast_ref::<StringArray>()
2503                .unwrap()
2504                .value(row),
2505        ),
2506        DataType::LargeUtf8 => write_str(
2507            buf,
2508            col.as_any()
2509                .downcast_ref::<LargeStringArray>()
2510                .unwrap()
2511                .value(row),
2512        ),
2513        DataType::Utf8View => write_str(
2514            buf,
2515            col.as_any()
2516                .downcast_ref::<StringViewArray>()
2517                .unwrap()
2518                .value(row),
2519        ),
2520        DataType::Dictionary(key, value)
2521            if matches!(key.as_ref(), DataType::Int32)
2522                && matches!(value.as_ref(), DataType::Utf8) =>
2523        {
2524            let dict = col
2525                .as_any()
2526                .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2527                .unwrap();
2528            let keys = dict.keys();
2529            let values = dict
2530                .values()
2531                .as_any()
2532                .downcast_ref::<StringArray>()
2533                .unwrap();
2534            let k = keys.value(row) as usize;
2535            write_str(buf, values.value(k));
2536        }
2537        DataType::Boolean => {
2538            let v = col
2539                .as_any()
2540                .downcast_ref::<BooleanArray>()
2541                .unwrap()
2542                .value(row);
2543            buf.extend_from_slice(if v { b"true" } else { b"false" });
2544        }
2545        DataType::Int8 => {
2546            let mut b = itoa::Buffer::new();
2547            buf.extend_from_slice(
2548                b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
2549                    .as_bytes(),
2550            );
2551        }
2552        DataType::Int16 => {
2553            let mut b = itoa::Buffer::new();
2554            buf.extend_from_slice(
2555                b.format(
2556                    col.as_any()
2557                        .downcast_ref::<Int16Array>()
2558                        .unwrap()
2559                        .value(row),
2560                )
2561                .as_bytes(),
2562            );
2563        }
2564        DataType::Int32 => {
2565            let mut b = itoa::Buffer::new();
2566            buf.extend_from_slice(
2567                b.format(
2568                    col.as_any()
2569                        .downcast_ref::<Int32Array>()
2570                        .unwrap()
2571                        .value(row),
2572                )
2573                .as_bytes(),
2574            );
2575        }
2576        DataType::Int64 => {
2577            let mut b = itoa::Buffer::new();
2578            buf.extend_from_slice(
2579                b.format(
2580                    col.as_any()
2581                        .downcast_ref::<Int64Array>()
2582                        .unwrap()
2583                        .value(row),
2584                )
2585                .as_bytes(),
2586            );
2587        }
2588        DataType::UInt8 => {
2589            let mut b = itoa::Buffer::new();
2590            buf.extend_from_slice(
2591                b.format(
2592                    col.as_any()
2593                        .downcast_ref::<UInt8Array>()
2594                        .unwrap()
2595                        .value(row),
2596                )
2597                .as_bytes(),
2598            );
2599        }
2600        DataType::UInt16 => {
2601            let mut b = itoa::Buffer::new();
2602            buf.extend_from_slice(
2603                b.format(
2604                    col.as_any()
2605                        .downcast_ref::<UInt16Array>()
2606                        .unwrap()
2607                        .value(row),
2608                )
2609                .as_bytes(),
2610            );
2611        }
2612        DataType::UInt32 => {
2613            let mut b = itoa::Buffer::new();
2614            buf.extend_from_slice(
2615                b.format(
2616                    col.as_any()
2617                        .downcast_ref::<UInt32Array>()
2618                        .unwrap()
2619                        .value(row),
2620                )
2621                .as_bytes(),
2622            );
2623        }
2624        DataType::UInt64 => {
2625            let mut b = itoa::Buffer::new();
2626            buf.extend_from_slice(
2627                b.format(
2628                    col.as_any()
2629                        .downcast_ref::<UInt64Array>()
2630                        .unwrap()
2631                        .value(row),
2632                )
2633                .as_bytes(),
2634            );
2635        }
2636        DataType::Decimal128(_, _) => {
2637            let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
2638            write_str(buf, &arr.value_as_string(row));
2639        }
2640        DataType::Decimal256(_, _) => {
2641            let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
2642            write_str(buf, &arr.value_as_string(row));
2643        }
2644        DataType::Float32 => {
2645            let v = col
2646                .as_any()
2647                .downcast_ref::<Float32Array>()
2648                .unwrap()
2649                .value(row);
2650            if v.is_finite() {
2651                let mut b = ryu::Buffer::new();
2652                buf.extend_from_slice(b.format_finite(v).as_bytes());
2653            } else {
2654                buf.extend_from_slice(b"null");
2655            }
2656        }
2657        DataType::Float64 => {
2658            let v = col
2659                .as_any()
2660                .downcast_ref::<Float64Array>()
2661                .unwrap()
2662                .value(row);
2663            if v.is_finite() {
2664                let mut b = ryu::Buffer::new();
2665                buf.extend_from_slice(b.format_finite(v).as_bytes());
2666            } else {
2667                buf.extend_from_slice(b"null");
2668            }
2669        }
2670        // Any dtype not handled above must have been pre-cast to Utf8 by
2671        // `cast_for_serialize`. Hitting this arm is a bug — surface it as a
2672        // visible JSON string rather than a silent null so it can't be
2673        // mistaken for a real NULL value.
2674        other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
2675    }
2676}
2677
2678#[inline]
2679fn write_str(buf: &mut Vec<u8>, s: &str) {
2680    buf.push(b'"');
2681    for &byte in s.as_bytes() {
2682        match byte {
2683            b'"' => buf.extend_from_slice(b"\\\""),
2684            b'\\' => buf.extend_from_slice(b"\\\\"),
2685            b'\n' => buf.extend_from_slice(b"\\n"),
2686            b'\r' => buf.extend_from_slice(b"\\r"),
2687            b'\t' => buf.extend_from_slice(b"\\t"),
2688            0x00..=0x1f => {
2689                buf.extend_from_slice(b"\\u00");
2690                const HEX: &[u8] = b"0123456789abcdef";
2691                buf.push(HEX[(byte >> 4) as usize]);
2692                buf.push(HEX[(byte & 0xf) as usize]);
2693            }
2694            b => buf.push(b),
2695        }
2696    }
2697    buf.push(b'"');
2698}
2699
2700// ---------------------------------------------------------------------------
2701// Backend trait impl — wires the store into the generic core handlers.
2702// ---------------------------------------------------------------------------
2703
2704#[async_trait]
2705impl Backend for Store {
2706    fn names(&self) -> Vec<String> {
2707        Store::names(self)
2708    }
2709
2710    fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
2711        let st = self.dataset(name)?;
2712        Ok(DatasetSummary {
2713            name: st.schema.name.clone(),
2714            columns: st.schema.columns.len(),
2715            rows: st.num_rows(),
2716        })
2717    }
2718
2719    fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
2720        let st = self.dataset(name)?;
2721        Ok(Arc::new(st.schema.clone()))
2722    }
2723
2724    fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
2725        let st = self.dataset(name)?;
2726        // Report indexed columns in the dataset's declared schema order
2727        // so the `/schema` response is deterministic.
2728        let mut cols: Vec<String> = st
2729            .schema
2730            .columns
2731            .iter()
2732            .map(|c| c.name.clone())
2733            .filter(|n| st.index.contains_key(n))
2734            .collect();
2735        // Any indexed columns not in `schema.columns` (shouldn't happen,
2736        // but be defensive) get appended sorted.
2737        let mut extras: Vec<String> = st
2738            .index
2739            .keys()
2740            .filter(|n| !cols.iter().any(|c| c == *n))
2741            .cloned()
2742            .collect();
2743        extras.sort();
2744        cols.extend(extras);
2745        Ok(cols)
2746    }
2747
2748    async fn sample(&self, name: &str) -> Result<String, AppError> {
2749        Store::sample(self, name).await
2750    }
2751
2752    async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
2753        Store::query(self, name, req).await
2754    }
2755
2756    async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
2757        Store::query_arrow(self, name, req).await
2758    }
2759
2760    async fn query_arrow_stream(
2761        &self,
2762        name: &str,
2763        req: &QueryRequest,
2764    ) -> Result<ArrowIpcStream, AppError> {
2765        Store::query_arrow_stream(self, name, req).await
2766    }
2767
2768    async fn query_arrow_stream_all(
2769        &self,
2770        name: &str,
2771        req: &QueryRequest,
2772    ) -> Result<ArrowIpcStream, AppError> {
2773        Store::query_arrow_stream_all(self, name, req).await
2774    }
2775
2776    async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
2777        Store::count(self, name, req).await
2778    }
2779
2780    async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
2781        Store::query_sql(self, sql, max_rows).await
2782    }
2783
2784    async fn query_sql_arrow_stream(
2785        &self,
2786        sql: &str,
2787        max_rows: u64,
2788    ) -> Result<ArrowIpcStream, AppError> {
2789        Store::query_sql_arrow_stream(self, sql, max_rows).await
2790    }
2791
2792    async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
2793        Store::parquet(self, name).await
2794    }
2795
2796    async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
2797        Store::reload(self, name).await
2798    }
2799}