Skip to main content

datapress_datafusion/
store.rs

1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5
6use arc_swap::ArcSwap;
7use arrow::array::{
8    Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
9    Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
10    StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
11};
12use arrow::compute;
13use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
14use arrow::datatypes::{DataType, Field, Schema};
15use async_trait::async_trait;
16use parquet::arrow::ProjectionMask;
17use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
18use serde_json::Value as JsonValue;
19
20use datafusion::datasource::file_format::parquet::ParquetFormat;
21use datafusion::datasource::listing::{
22    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
23};
24use datafusion::datasource::{MemTable, TableProvider};
25use datafusion::prelude::SessionContext;
26use datafusion::scalar::ScalarValue;
27
28use object_store::aws::AmazonS3Builder;
29use url::Url;
30
31use datapress_core::backend::{
32    ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
33};
34use datapress_core::config::{
35    AddressingStyle, AppConfig, DatasetConfig, IndexConfig, IndexMode, Partitioning, ResolvedCreds,
36    S3Config, SourceKind,
37};
38use datapress_core::errors::AppError;
39use datapress_core::models::{CountRequest, Predicate, QueryRequest};
40use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
41
42// ---------------------------------------------------------------------------
43// Public types
44// ---------------------------------------------------------------------------
45
46/// Hasher-swapped map alias. The default `std::collections::HashMap` uses
47/// SipHash (DoS-resistant but slow). The equality index keys are our own
48/// column values, not untrusted hash-flood input, so we use `ahash` — a much
49/// faster non-cryptographic hasher — on this per-request hot path.
50type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
51
52/// Pre-built equality index: lowercase col name → string-encoded value → sorted row ids.
53type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
54
55/// Per-dataset state: schema metadata, the resident chunks, and the
56/// equality index built per the dataset's `[dataset.index]` policy.
57///
58/// `data` is the dataset as a `Vec<RecordBatch>` — exactly the chunks
59/// produced by the underlying reader, after temporal columns are cast to
60/// `Utf8`. We deliberately do **not** call `concat_batches` to fuse them
61/// into one batch: on wide schemas (hundreds of columns) that transiently
62/// allocates a second full copy of the decoded Arrow data, pushing peak
63/// RSS to ~2× the resident size and OOM-killing the process at startup.
64///
65/// When `lazy` is true the dataset is *not* materialised: `data` is empty,
66/// `index` is empty, and every query is dispatched to DataFusion SQL
67/// against a registered `ListingTable`. `arrow_schema` still carries the
68/// inferred schema so discovery endpoints work.
69pub struct DatasetState {
70    pub schema: DatasetSchema,
71    pub data: Vec<RecordBatch>,
72    pub arrow_schema: Arc<Schema>,
73    pub index: EqIndex,
74    pub lazy: bool,
75}
76
77impl DatasetState {
78    /// Sum of `num_rows()` across all resident chunks. `0` for lazy datasets.
79    pub fn num_rows(&self) -> usize {
80        self.data.iter().map(|b| b.num_rows()).sum()
81    }
82}
83
84/// Multi-dataset registry. Each dataset is registered in the shared
85/// `SessionContext` under its configured name. The per-dataset state is
86/// held behind `ArcSwap` so a reload can atomically replace it without
87/// blocking concurrent queries.
88pub struct Store {
89    ctx: SessionContext,
90    max_page_size: u64,
91    /// Original dataset configs, indexed by name. Reload reads the source
92    /// path from here — clients can't redirect a reload at an arbitrary file.
93    configs: HashMap<String, DatasetConfig>,
94    /// Hot-swappable snapshot of all currently loaded datasets.
95    datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
96    /// Per-name reload mutex. Serialises concurrent reloads of the same
97    /// dataset; reloads of different datasets proceed in parallel.
98    reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
99}
100
101impl Store {
102    /// Load every dataset declared in `cfg`.
103    pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
104        // One-shot init for the deltalake S3 backend. Safe to call more
105        // than once — the handlers are idempotent.
106        if cfg
107            .datasets
108            .iter()
109            .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
110        {
111            deltalake::aws::register_handlers(None);
112        }
113
114        // NOTE: identifier handling for the raw-SQL endpoint is done by
115        // rewriting schema column/table references to quoted canonical
116        // names before execution (see `query_sql`). DataFusion's default
117        // identifier normalization is left ON so unquoted *aliases* and
118        // CTE names stay case-insensitive, matching DuckDB.
119        let ctx = SessionContext::new();
120        let mut datasets = HashMap::with_capacity(cfg.datasets.len());
121        let mut configs = HashMap::with_capacity(cfg.datasets.len());
122
123        for d in &cfg.datasets {
124            let (state, provider) = build_dataset(d, &ctx).await?;
125            ctx.register_table(d.name.as_str(), provider)?;
126            datasets.insert(d.name.clone(), Arc::new(state));
127            configs.insert(d.name.clone(), d.clone());
128        }
129        Ok(Self {
130            ctx,
131            max_page_size: cfg.server.max_page_size.max(1),
132            configs,
133            datasets: ArcSwap::from_pointee(datasets),
134            reload_locks: Mutex::new(HashMap::new()),
135        })
136    }
137
138    /// Sorted list of dataset names.
139    pub fn names(&self) -> Vec<String> {
140        let snap = self.datasets.load();
141        let mut v: Vec<String> = snap.keys().cloned().collect();
142        v.sort();
143        v
144    }
145
146    pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
147        self.datasets
148            .load()
149            .get(name)
150            .cloned()
151            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
152    }
153
154    /// JSON for the first row of the dataset, or `null` if empty. Used by
155    /// `GET /api/datasets/{name}/schema` for discoverability.
156    pub async fn sample(&self, name: &str) -> Result<String, AppError> {
157        let st = self.dataset(name)?;
158
159        // Lazy datasets have no resident batch — pull one row via SQL.
160        if st.lazy {
161            let table = DatasetSchema::quote_ident(&st.schema.name);
162            let sql = format!("SELECT * FROM {table} LIMIT 1");
163            let df = self.ctx.sql(&sql).await?;
164            let batches = df.collect().await?;
165            if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
166                return Ok("null".into());
167            }
168            let arr = serialize(&batches[0].slice(0, 1))?;
169            let trimmed = arr.trim();
170            let inner = trimmed
171                .strip_prefix('[')
172                .and_then(|s| s.strip_suffix(']'))
173                .unwrap_or(trimmed);
174            return Ok(inner.to_string());
175        }
176
177        let first = match st.data.iter().find(|b| b.num_rows() > 0) {
178            Some(b) => b,
179            None => return Ok("null".into()),
180        };
181        let arr = serialize(&first.slice(0, 1))?;
182        // strip the outer [] to return a single object
183        let trimmed = arr.trim();
184        let inner = trimmed
185            .strip_prefix('[')
186            .and_then(|s| s.strip_suffix(']'))
187            .unwrap_or(trimmed);
188        Ok(inner.to_string())
189    }
190
191    /// Rebuild `name` from disk and atomically swap it in. Concurrent queries
192    /// against the same name continue to see the *old* `Arc<DatasetState>`
193    /// until they finish; the old data is dropped once the last reference
194    /// goes away.
195    pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
196        // 1. Look up the dataset config. Not finding it = 404.
197        let cfg = self
198            .configs
199            .get(name)
200            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
201            .clone();
202
203        // 2. Per-name lock: only one reload of this dataset at a time.
204        let lock = {
205            let mut locks = self.reload_locks.lock().unwrap();
206            locks
207                .entry(name.to_string())
208                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
209                .clone()
210        };
211        let _guard = lock.lock().await;
212
213        let started = std::time::Instant::now();
214
215        // 3. Heavy lifting (source read + index build). Parquet/delta
216        // readers are themselves async, so we don't wrap in `web::block`.
217        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
218        let rows = state.num_rows();
219
220        // 4. Atomic swap.
221        //   a) Replace the MemTable inside the SessionContext.
222        //   b) ArcSwap a new snapshot map with the updated Arc<DatasetState>.
223        // In-flight queries already hold the old provider + old Arc; they
224        // run to completion. New queries see the new data.
225        let _ = self.ctx.deregister_table(name)?;
226        self.ctx.register_table(name, provider)?;
227
228        let mut new_map = (**self.datasets.load()).clone();
229        new_map.insert(name.to_string(), Arc::new(state));
230        self.datasets.store(Arc::new(new_map));
231
232        let elapsed_ms = started.elapsed().as_millis();
233        log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
234        Ok(ReloadStats { rows, elapsed_ms })
235    }
236
237    /// Run a `QueryRequest` against `name`. Empty predicates → O(1) Arrow
238    /// slice. Otherwise → DataFusion SQL on the single registered table.
239    /// Lazy datasets skip the in-memory hot paths and always dispatch to SQL.
240    pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
241        let batch = self.query_batch(name, req).await?;
242        if batch.num_rows() == 0 {
243            return Ok("[]".to_string());
244        }
245        serialize(&batch)
246    }
247
248    /// Rewrite registered table / column references in a raw-SQL string to
249    /// their canonical, quoted spelling so matching is case-insensitive
250    /// (like DuckDB). Builds the lookup from every currently loaded
251    /// dataset; unknown identifiers (aliases, CTE names) are left for the
252    /// engine's default normalization to handle.
253    fn canonicalize_sql(&self, sql: &str) -> String {
254        let snap = self.datasets.load();
255        let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
256        let mut columns: HashMap<String, String> = HashMap::new();
257        for (name, state) in snap.iter() {
258            tables.insert(name.to_lowercase(), name.clone());
259            for col in &state.schema.columns {
260                columns
261                    .entry(col.name.to_lowercase())
262                    .or_insert_with(|| col.name.clone());
263            }
264        }
265        datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
266    }
267
268    /// Execute a pre-validated raw `SELECT` and return the JSON `data`
269    /// array. The statement has already passed
270    /// [`datapress_core::sql::validate`]; here it is wrapped in an outer
271    /// `LIMIT max_rows` so the result is bounded regardless of the user's
272    /// own clauses, executed through the shared `SessionContext`, and run
273    /// through the same fast JSON encoder as [`Self::query`].
274    pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
275        let cap = max_rows.max(1);
276        let sql = self.canonicalize_sql(sql);
277        let wrapped = format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}");
278        let df = self.ctx.sql(&wrapped).await?;
279        let batches = df.collect().await?;
280        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
281            return Ok("[]".to_string());
282        }
283        let batch = if batches.len() == 1 {
284            batches.into_iter().next().expect("checked len")
285        } else {
286            compute::concat_batches(&batches[0].schema(), batches.iter())?
287        };
288        // Defence in depth: the outer LIMIT already bounds the row count,
289        // but slice anyway so a planning quirk can never blow the cap.
290        let batch = if batch.num_rows() as u64 > cap {
291            batch.slice(0, cap as usize)
292        } else {
293            batch
294        };
295        serialize(&batch)
296    }
297
298    /// Same plan as [`Self::query_sql`], but encode the bounded result as
299    /// an Arrow IPC stream instead of JSON. Backs the Arrow
300    /// content-negotiated branch of `POST /api/v1/sql`.
301    pub async fn query_sql_arrow_stream(
302        &self,
303        sql: &str,
304        max_rows: u64,
305    ) -> Result<ArrowIpcStream, AppError> {
306        let cap = max_rows.max(1);
307        let sql = self.canonicalize_sql(sql);
308        let wrapped = format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}");
309        let df = self.ctx.sql(&wrapped).await?;
310        let batches = df.collect().await?;
311        Ok(stream_arrow_batches(batches))
312    }
313
314    /// Same plan as [`Self::query`], but encode the result page as an
315    /// Arrow IPC stream (one schema message + one batch + EOS). Empty
316    /// results still produce a valid, self-describing zero-batch stream.
317    pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
318        let batch = self.query_batch(name, req).await?;
319        let schema = batch.schema();
320        let mut buf = Vec::with_capacity(8 * 1024);
321        {
322            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
323            if batch.num_rows() > 0 {
324                w.write(&batch)?;
325            }
326            w.finish()?;
327        }
328        Ok(buf)
329    }
330
331    pub async fn query_arrow_stream(
332        &self,
333        name: &str,
334        req: &QueryRequest,
335    ) -> Result<ArrowIpcStream, AppError> {
336        let batches = self.query_batches(name, req).await?;
337        Ok(stream_arrow_batches(batches))
338    }    pub async fn query_arrow_stream_all(
339        &self,
340        name: &str,
341        req: &QueryRequest,
342    ) -> Result<ArrowIpcStream, AppError> {
343        let batches = self.query_batches_all(name, req).await?;
344        Ok(stream_arrow_batches(batches))
345    }
346
347    /// Encode the entire dataset as a single self-contained Parquet file.
348    ///
349    /// Collects every row (all columns, no predicates, no paging) and runs
350    /// it through a single [`parquet::arrow::ArrowWriter`], so the result
351    /// carries the row-group + footer metadata a Parquet reader needs to
352    /// answer `count(*)` straight from the footer. Powers the cached
353    /// `GET /datasets/{name}/parquet` HTTP endpoint.
354    pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
355        // All rows, all columns, no predicates / ordering / limit.
356        let req = QueryRequest {
357            columns: Vec::new(),
358            predicates: Vec::new(),
359            group_by: Vec::new(),
360            aggregations: Vec::new(),
361            distinct: false,
362            order_by: Vec::new(),
363            limit: None,
364            page: 1,
365            page_size: 1,
366        };
367        let st = self.dataset(name)?;
368        let batches = self.query_batches_all(name, &req).await?;
369        // Use the actual batch schema when we have rows so the writer schema
370        // matches exactly (projection/nullability); fall back to the
371        // dataset schema for an empty dataset.
372        let schema = batches
373            .first()
374            .map(|b| b.schema())
375            .unwrap_or_else(|| st.arrow_schema.clone());
376
377        let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
378        {
379            let props = parquet::file::properties::WriterProperties::builder()
380                .set_compression(parquet::basic::Compression::SNAPPY)
381                .build();
382            let mut writer =
383                parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
384                    .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
385            for batch in &batches {
386                if batch.num_rows() > 0 {
387                    writer
388                        .write(batch)
389                        .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
390                }
391            }
392            writer
393                .close()
394                .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
395        }
396        Ok(bytes::Bytes::from(buf))
397    }
398
399    /// Compute the result page as a single `RecordBatch`. Shared between
400    /// the JSON and Arrow IPC encoders.
401    async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
402        let batches = self.query_batches(name, req).await?;
403        if batches.is_empty() {
404            return Ok(RecordBatch::new_empty(Arc::new(
405                arrow::datatypes::Schema::empty(),
406            )));
407        }
408        if batches.len() == 1 {
409            return Ok(batches.into_iter().next().expect("checked len"));
410        }
411        if batches.iter().all(|b| b.num_rows() == 0) {
412            return Ok(RecordBatch::new_empty(batches[0].schema()));
413        }
414        let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
415        Ok(batch)
416    }
417
418    /// Compute the result page as Arrow batches. Arrow IPC responses can
419    /// write these directly, while JSON callers concatenate via
420    /// [`Self::query_batch`] for the existing row conversion path.
421    async fn query_batches(
422        &self,
423        name: &str,
424        req: &QueryRequest,
425    ) -> Result<Vec<RecordBatch>, AppError> {
426        let st = self.dataset(name)?;
427
428        let page = req.page.max(1);
429        let page_size = req.page_size.clamp(1, self.max_page_size);
430        let offset = ((page - 1) * page_size) as usize;
431        let limit = page_size as usize;
432
433        self.query_batches_inner(st, req, Some((offset, limit)))
434            .await
435    }
436
437    /// Compute all matching rows as Arrow batches for the one-request
438    /// streaming endpoint. `page` and `page_size` are intentionally ignored;
439    /// optional `limit` still caps the total result size.
440    async fn query_batches_all(
441        &self,
442        name: &str,
443        req: &QueryRequest,
444    ) -> Result<Vec<RecordBatch>, AppError> {
445        let st = self.dataset(name)?;
446        self.query_batches_inner(st, req, None).await
447    }
448
449    async fn query_batches_inner(
450        &self,
451        st: Arc<DatasetState>,
452        req: &QueryRequest,
453        page_window: Option<(usize, usize)>,
454    ) -> Result<Vec<RecordBatch>, AppError> {
455        let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
456
457        // In-memory hot paths only fire when:
458        //   - the dataset is materialised,
459        //   - the caller did not ask for ordering,
460        //   - and did not ask for a hard `limit` cap on a paged request.
461        // Both of the latter two require sorting / capping that the SQL
462        // engine handles uniformly across all data types.
463        let can_fast_path = !st.lazy
464            && req.order_by.is_empty()
465            && (page_window.is_none() || req.limit.is_none())
466            && req.group_by.is_empty()
467            && !req.distinct;
468
469        if can_fast_path {
470            let total = st.num_rows();
471
472            // No predicates -> O(1) raw Arrow slices over resident batches,
473            // no engine overhead.
474            if req.predicates.is_empty() {
475                if page_window.is_none() && req.limit.is_none() {
476                    return st
477                        .data
478                        .iter()
479                        .cloned()
480                        .map(|batch| project(&st.schema, batch, &req.columns))
481                        .collect();
482                }
483                let start = offset.min(total);
484                let len = limit.min(total - start);
485                let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
486                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
487            }
488
489            // Index fast path: if every predicate is eq/in on an indexed column,
490            // resolve via the pre-built equality index.
491            if let Some(rows) = try_index(&st.index, &req.predicates) {
492                let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
493                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
494            }
495        }
496
497        // Fallback (and only path for lazy datasets): DataFusion SQL.
498        let (sql, params) = match page_window {
499            Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
500            None => build_query_stream_sql(&st.schema, req)?,
501        };
502        let mut df = self.ctx.sql(&sql).await?;
503        if !params.is_empty() {
504            df = df.with_param_values(params)?;
505        }
506        let batches = df.collect().await?;
507        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
508            let schema = batches
509                .first()
510                .map(|b| b.schema())
511                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
512            return Ok(vec![RecordBatch::new_empty(schema)]);
513        }
514        Ok(batches)
515    }
516}
517
518fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
519    let schema = batches
520        .first()
521        .map(|batch| batch.schema())
522        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
523    let (mut writer, stream) = arrow_ipc_stream_channel(8);
524
525    tokio::task::spawn_blocking(move || {
526        let result = (|| -> Result<(), AppError> {
527            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
528            for batch in batches {
529                if batch.num_rows() > 0 {
530                    w.write(&batch)?;
531                }
532            }
533            w.finish()?;
534            Ok(())
535        })();
536        if let Err(err) = result {
537            log::error!("datafusion arrow stream failed: {err}");
538            writer.send_error(err);
539        }
540    });
541
542    stream
543}
544
545impl Store {
546    /// Return the number of rows matching `req.predicates`. With no
547    /// predicates this is a cheap metadata lookup on materialised datasets
548    /// and a `SELECT COUNT(*)` on lazy ones.
549    pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
550        let st = self.dataset(name)?;
551
552        if !st.lazy {
553            // No predicates → resident row count, no scan.
554            if req.predicates.is_empty() {
555                return Ok(st.num_rows() as i64);
556            }
557            // Index fast path: same eligibility rules as `query`.
558            if let Some(rows) = try_index(&st.index, &req.predicates) {
559                return Ok(rows.len() as i64);
560            }
561        }
562
563        // Fallback: DataFusion SQL — same predicate translation as `query`,
564        // with predicate values bound as typed parameters.
565        let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
566        let mut df = self.ctx.sql(&sql).await?;
567        if !params.is_empty() {
568            df = df.with_param_values(params)?;
569        }
570        let batches = df.collect().await?;
571        let n = batches
572            .first()
573            .and_then(|b| {
574                b.column(0)
575                    .as_any()
576                    .downcast_ref::<arrow::array::Int64Array>()
577            })
578            .filter(|a| !a.is_empty())
579            .map(|a| a.value(0))
580            .unwrap_or(0);
581        Ok(n)
582    }
583}
584
585// ---------------------------------------------------------------------------
586// Dataset loading
587// ---------------------------------------------------------------------------
588
589async fn build_dataset(
590    d: &DatasetConfig,
591    ctx: &SessionContext,
592) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
593    // Lazy datasets: register a ListingTable straight against the source
594    // and skip the materialise / index / partition pipeline below. Delta
595    // is rejected — deltalake reads the transaction log eagerly to know
596    // which parquet files are current, so "lazy delta" doesn't map onto
597    // ListingTable cleanly.
598    if d.lazy {
599        match (d.source.kind, d.source.is_s3()) {
600            (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
601            (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
602            (SourceKind::Delta, _) => {
603                return Err(AppError::Internal(format!(
604                    "dataset '{}': lazy mode is not supported for delta sources",
605                    d.name
606                )));
607            }
608        }
609    }
610
611    // Fetch raw RecordBatches from whichever backing store the dataset
612    // is configured to use. All four (parquet, delta) x (local, s3)
613    // combinations converge into one Vec<RecordBatch>; the materialisation
614    // / indexing / partitioning logic below is shared.
615    let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
616        (SourceKind::Parquet, false) => read_local_parquet(d)?,
617        (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
618        (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
619        (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
620    };
621    if raw_batches.is_empty() {
622        return Err(AppError::Internal(format!(
623            "dataset '{}': source produced no batches",
624            d.name
625        )));
626    }
627
628    let chunks = raw_batches;
629    let arrow_sch = chunks[0].schema();
630
631    // Build DatasetSchema from the Arrow schema.
632    let columns: Vec<ColumnInfo> = arrow_sch
633        .fields()
634        .iter()
635        .map(|f| {
636            let dt = f.data_type();
637            ColumnInfo {
638                name: f.name().clone(),
639                logical: arrow_to_logical(dt),
640                sql_type: format!("{dt:?}"),
641                nullable: f.is_nullable(),
642            }
643        })
644        .collect();
645    let schema = DatasetSchema::new(&d.name, columns);
646
647    // Build the equality index per the per-dataset policy. Operates on the
648    // chunked representation directly so we never have to materialise a
649    // single concatenated batch (which would double peak RSS on wide
650    // schemas — see `DatasetState` docs).
651    let index = build_eq_index_with_policy(&chunks, &d.index);
652
653    // Partition for parallel scans by the SQL fallback path. We distribute
654    // the existing batches round-robin across `n_parts` partitions instead
655    // of re-slicing a concatenated batch — `clone()` on a RecordBatch is
656    // an Arc-clone of the column buffers, not a copy.
657    let n_parts = std::thread::available_parallelism()
658        .map(|n| n.get())
659        .unwrap_or(4);
660    let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
661    for (i, b) in chunks.iter().enumerate() {
662        if b.num_rows() == 0 {
663            continue;
664        }
665        parts[i % n_parts].push(b.clone());
666    }
667    parts.retain(|p| !p.is_empty());
668    let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
669
670    let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
671    let mem_mb: usize = chunks
672        .iter()
673        .flat_map(|b| b.columns().iter())
674        .map(|c| c.get_buffer_memory_size())
675        .sum::<usize>()
676        / 1_048_576;
677    log::info!(
678        "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
679        d.name,
680        d.source.kind.as_str(),
681        total_rows,
682        schema.columns.len(),
683        mem_mb,
684        chunks.len(),
685        index.len()
686    );
687
688    Ok((
689        DatasetState {
690            schema,
691            data: chunks,
692            arrow_schema: arrow_sch,
693            index,
694            lazy: false,
695        },
696        provider,
697    ))
698}
699
700/// Build a lazy state + `ListingTable` provider for a local parquet dataset.
701/// The dataset is never read into RAM; DataFusion streams row groups on
702/// each query. The returned `DatasetState.data` is an empty `Vec` —
703/// `arrow_schema` still carries the inferred Arrow schema for discovery.
704async fn build_lazy_local_parquet(
705    d: &DatasetConfig,
706    ctx: &SessionContext,
707) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
708    let (url, part_keys) = lazy_local_listing(d)?;
709
710    let mut opts =
711        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
712    if !part_keys.is_empty() {
713        opts = opts.with_table_partition_cols(
714            part_keys
715                .iter()
716                .map(|k| (k.clone(), DataType::Utf8))
717                .collect(),
718        );
719    }
720
721    let session_state = ctx.state();
722    // `infer_schema` returns the *file* schema (without partition columns);
723    // `ListingTable` appends the declared partition columns on top.
724    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
725        AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
726    })?;
727
728    let cfg = ListingTableConfig::new(url)
729        .with_listing_options(opts)
730        .with_schema(file_schema.clone());
731    let table = ListingTable::try_new(cfg).map_err(|e| {
732        AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
733    })?;
734    let provider: Arc<dyn TableProvider> = Arc::new(table);
735
736    // Discovery schema = file columns + partition columns (Utf8).
737    let mut fields: Vec<Field> = file_schema
738        .fields()
739        .iter()
740        .map(|f| f.as_ref().clone())
741        .collect();
742    for k in &part_keys {
743        if !fields.iter().any(|f| f.name() == k) {
744            fields.push(Field::new(k, DataType::Utf8, false));
745        }
746    }
747    let arrow_sch = Arc::new(Schema::new(fields));
748
749    let columns: Vec<ColumnInfo> = arrow_sch
750        .fields()
751        .iter()
752        .map(|f| {
753            let dt = f.data_type();
754            ColumnInfo {
755                name: f.name().clone(),
756                logical: arrow_to_logical(dt),
757                sql_type: format!("{dt:?}"),
758                nullable: f.is_nullable(),
759            }
760        })
761        .collect();
762    let schema = DatasetSchema::new(&d.name, columns);
763
764    log::info!(
765        "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
766        d.name,
767        d.source.kind.as_str(),
768        schema.columns.len(),
769        part_keys.len()
770    );
771
772    Ok((
773        DatasetState {
774            schema,
775            data: Vec::new(),
776            arrow_schema: arrow_sch,
777            index: EqIndex::default(),
778            lazy: true,
779        },
780        provider,
781    ))
782}
783
784/// Resolve a local lazy-parquet location into a single `ListingTableUrl`
785/// rooted at the dataset base plus the ordered hive partition keys (if any).
786/// Handles three shapes: a glob (`root/city=*/*.parquet`), a directory
787/// (hive root or flat folder of parquets), and a single `*.parquet` file.
788fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
789    let loc = &d.source.location;
790
791    if loc.contains('*') || loc.contains('?') || loc.contains('[') {
792        let parts: Vec<&str> = loc.split('/').collect();
793        let first_wild = parts
794            .iter()
795            .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
796            .unwrap_or(parts.len());
797        let base = parts[..first_wild].join("/");
798        let base = if base.is_empty() {
799            "/".to_string()
800        } else {
801            base
802        };
803        // Partition keys: `key=…` components between the base and the file
804        // pattern (the final component).
805        let upper = parts.len().saturating_sub(1);
806        let keys: Vec<String> = parts[first_wild.min(upper)..upper]
807            .iter()
808            .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
809            .filter(|k| !k.is_empty())
810            .collect();
811        return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
812    }
813
814    let path = std::path::Path::new(loc);
815    if path.is_dir() {
816        let keys = discover_hive_keys(path);
817        return Ok((dir_url(path, d)?, keys));
818    }
819
820    let url = ListingTableUrl::parse(loc)
821        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
822    Ok((url, Vec::new()))
823}
824
825/// Parse a directory path into a `ListingTableUrl` (trailing slash so
826/// DataFusion treats it as a directory root, not a single object).
827fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
828    let s = path.to_str().ok_or_else(|| {
829        AppError::Internal(format!(
830            "dataset '{}': non-utf8 path {}",
831            d.name,
832            path.display()
833        ))
834    })?;
835    let s = if s.ends_with('/') {
836        s.to_string()
837    } else {
838        format!("{s}/")
839    };
840    ListingTableUrl::parse(&s)
841        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
842}
843
844/// Walk down a directory following the first `key=value` subdirectory at
845/// each level to discover the ordered hive partition keys. Returns an empty
846/// vec for a flat (non-partitioned) folder.
847fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
848    let mut keys = Vec::new();
849    let mut cur = base.to_path_buf();
850    loop {
851        let Ok(rd) = std::fs::read_dir(&cur) else {
852            break;
853        };
854        let mut next: Option<(String, std::path::PathBuf)> = None;
855        for entry in rd.flatten() {
856            let p = entry.path();
857            if !p.is_dir() {
858                continue;
859            }
860            let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
861                continue;
862            };
863            if let Some((k, v)) = name.split_once('=')
864                && !k.is_empty()
865                && !v.is_empty()
866            {
867                next = Some((k.to_string(), p));
868                break;
869            }
870        }
871        match next {
872            Some((k, p)) => {
873                keys.push(k);
874                cur = p;
875            }
876            None => break,
877        }
878    }
879    keys
880}
881
882/// Lazy S3 parquet: register the dataset's S3 object store on `ctx`, then
883/// build a `ListingTable` rooted at the `s3://bucket/prefix/` location with
884/// any discovered hive partition columns. DataFusion does the directory
885/// listing through the registered store and streams row groups on each
886/// query — no local enumeration needed.
887async fn build_lazy_s3_parquet(
888    d: &DatasetConfig,
889    ctx: &SessionContext,
890) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
891    register_s3_object_store(d, ctx)?;
892
893    let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
894
895    // Discovery schema = file columns + partition columns (Utf8).
896    let mut fields: Vec<Field> = file_schema
897        .fields()
898        .iter()
899        .map(|f| f.as_ref().clone())
900        .collect();
901    for k in &part_keys {
902        if !fields.iter().any(|f| f.name() == k) {
903            fields.push(Field::new(k, DataType::Utf8, false));
904        }
905    }
906    let arrow_sch = Arc::new(Schema::new(fields));
907
908    let columns: Vec<ColumnInfo> = arrow_sch
909        .fields()
910        .iter()
911        .map(|f| {
912            let dt = f.data_type();
913            ColumnInfo {
914                name: f.name().clone(),
915                logical: arrow_to_logical(dt),
916                sql_type: format!("{dt:?}"),
917                nullable: f.is_nullable(),
918            }
919        })
920        .collect();
921    let schema = DatasetSchema::new(&d.name, columns);
922
923    log::info!(
924        "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
925        d.name,
926        d.source.kind.as_str(),
927        schema.columns.len(),
928        part_keys.len()
929    );
930
931    Ok((
932        DatasetState {
933            schema,
934            data: Vec::new(),
935            arrow_schema: arrow_sch,
936            index: EqIndex::default(),
937            lazy: true,
938        },
939        provider,
940    ))
941}
942
943/// Build a `ListingTable` provider for an S3 parquet source, resolving the
944/// base prefix and hive partition keys via [`s3_listing`]. The registered
945/// S3 object store must already be present on `ctx`. Returns the provider,
946/// the inferred *file* schema (without partition columns), and the ordered
947/// partition keys.
948async fn build_s3_listing_table(
949    d: &DatasetConfig,
950    ctx: &SessionContext,
951) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
952    let (url, part_keys) = s3_listing(d, ctx).await?;
953
954    let mut opts =
955        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
956    if !part_keys.is_empty() {
957        opts = opts.with_table_partition_cols(
958            part_keys
959                .iter()
960                .map(|k| (k.clone(), DataType::Utf8))
961                .collect(),
962        );
963    }
964
965    let session_state = ctx.state();
966    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
967        AppError::Internal(format!(
968            "dataset '{}': infer parquet schema on s3: {e}",
969            d.name
970        ))
971    })?;
972
973    let cfg = ListingTableConfig::new(url)
974        .with_listing_options(opts)
975        .with_schema(file_schema.clone());
976    let table = ListingTable::try_new(cfg).map_err(|e| {
977        AppError::Internal(format!(
978            "dataset '{}': ListingTable::try_new (s3): {e}",
979            d.name
980        ))
981    })?;
982    Ok((Arc::new(table), file_schema, part_keys))
983}
984
985/// Resolve an S3 parquet source into a `(base ListingTableUrl, partition
986/// keys)` pair. Hive keys come from the location glob when present
987/// (`s3://bucket/events/year=*/...`), otherwise — for a plain prefix — they
988/// are discovered by listing the registered object store. Honours the
989/// dataset's `partitioning` mode (`auto` / `hive` / `none`).
990async fn s3_listing(
991    d: &DatasetConfig,
992    ctx: &SessionContext,
993) -> Result<(ListingTableUrl, Vec<String>), AppError> {
994    let s3 = d.s3.clone().unwrap_or_default();
995    let want_partitions = !matches!(s3.partitioning, Partitioning::None);
996    let loc = &d.source.location;
997
998    if d.source.has_glob() {
999        let (base, keys) = split_glob_base_keys(loc);
1000        let base = format!("{}/", base.trim_end_matches('/'));
1001        let url = ListingTableUrl::parse(&base).map_err(|e| {
1002            AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1003        })?;
1004        let keys = if want_partitions { keys } else { Vec::new() };
1005        return Ok((url, keys));
1006    }
1007
1008    let base = if loc.ends_with('/') {
1009        loc.clone()
1010    } else {
1011        format!("{loc}/")
1012    };
1013    let url = ListingTableUrl::parse(&base).map_err(|e| {
1014        AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1015    })?;
1016    let keys = if want_partitions {
1017        discover_s3_hive_keys(ctx, &url).await
1018    } else {
1019        Vec::new()
1020    };
1021    Ok((url, keys))
1022}
1023
1024/// Split a glob location into its non-wildcard base path and the ordered
1025/// hive partition keys (`key=…` directory components between the base and
1026/// the final file pattern). Works for both local and `s3://` paths.
1027fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1028    let parts: Vec<&str> = loc.split('/').collect();
1029    let first_wild = parts
1030        .iter()
1031        .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1032        .unwrap_or(parts.len());
1033    let base = parts[..first_wild].join("/");
1034    let base = if base.is_empty() {
1035        "/".to_string()
1036    } else {
1037        base
1038    };
1039    let upper = parts.len().saturating_sub(1);
1040    let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1041        .iter()
1042        .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1043        .filter(|k| !k.is_empty())
1044        .collect();
1045    (base, keys)
1046}
1047
1048/// Discover ordered hive partition keys for an S3 prefix by walking the
1049/// object store one `key=value/` level at a time via delimiter listings.
1050/// Best-effort: any listing error stops discovery and returns what was found
1051/// so far (empty = treat as a flat folder).
1052async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1053    let store = match ctx.runtime_env().object_store(url.object_store()) {
1054        Ok(s) => s,
1055        Err(_) => return Vec::new(),
1056    };
1057    let mut keys = Vec::new();
1058    let mut prefix = url.prefix().clone();
1059    loop {
1060        let listing = match store.list_with_delimiter(Some(&prefix)).await {
1061            Ok(l) => l,
1062            Err(_) => break,
1063        };
1064        let mut next: Option<object_store::path::Path> = None;
1065        for cp in &listing.common_prefixes {
1066            if let Some(seg) = cp.parts().next_back() {
1067                let seg = seg.as_ref().to_string();
1068                if let Some((k, v)) = seg.split_once('=')
1069                    && !k.is_empty()
1070                    && !v.is_empty()
1071                {
1072                    keys.push(k.to_string());
1073                    next = Some(cp.clone());
1074                    break;
1075                }
1076            }
1077        }
1078        match next {
1079            Some(p) => prefix = p,
1080            None => break,
1081        }
1082    }
1083    keys
1084}
1085
1086/// Original local-parquet code path — sync file I/O. We set a large reader
1087/// batch size so wide schemas (hundreds of columns) don't pay per-array
1088/// metadata overhead on thousands of small (default 1024-row) batches.
1089///
1090/// Two memory-saving knobs are applied here:
1091///
1092/// * **Column projection** — if `d.columns` is non-empty, only those
1093///   columns are decoded; everything else is skipped at the parquet reader
1094///   level (no Arrow array is ever allocated for the dropped columns).
1095/// * **Dictionary preservation** — Utf8 columns whose parquet column chunks
1096///   carry a dictionary page are materialised as Arrow
1097///   `Dictionary(Int32, Utf8)` instead of plain `Utf8`. Low-cardinality
1098///   string columns (state, country, severity, …) stay represented as
1099///   `n_unique` string slots plus an Int32 index per row instead of
1100///   `n_rows` independent strings — typically 10×–50× smaller for
1101///   real-world data.
1102fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1103    let files = d.resolve_local_parquet_files()?;
1104    let mut all = Vec::new();
1105    let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1106        None
1107    } else {
1108        Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1109    };
1110
1111    for f in &files {
1112        let file = std::fs::File::open(f)
1113            .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1114
1115        // First pass: peek the parquet metadata + default Arrow schema so we
1116        // can (a) decide a column projection and (b) override Utf8 columns
1117        // that are dictionary-encoded in the file so the reader materialises
1118        // them as Arrow Dictionary arrays instead of expanding to plain Utf8.
1119        let probe = ParquetRecordBatchReaderBuilder::try_new(
1120            file.try_clone()
1121                .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1122        )?;
1123        let parquet_schema = probe.parquet_schema().clone();
1124        let arrow_schema = probe.schema().clone();
1125        let metadata = probe.metadata().clone();
1126        drop(probe);
1127
1128        // Column projection (top-level / leaf indices for flat schemas).
1129        let projection = if let Some(w) = &wanted {
1130            let indices: Vec<usize> = arrow_schema
1131                .fields()
1132                .iter()
1133                .enumerate()
1134                .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1135                .map(|(i, _)| i)
1136                .collect();
1137            if indices.is_empty() {
1138                return Err(AppError::Internal(format!(
1139                    "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1140                    d.name,
1141                    d.columns,
1142                    f.display()
1143                )));
1144            }
1145            ProjectionMask::roots(&parquet_schema, indices)
1146        } else {
1147            ProjectionMask::all()
1148        };
1149
1150        // Dictionary override: any Utf8 column whose first row group carries
1151        // a dictionary page is re-typed to Dictionary(Int32, Utf8). The
1152        // override schema must still describe every column in the parquet
1153        // file (projection is applied separately). Skipped entirely when
1154        // the dataset has `dict_encode = false` — escape hatch for cases
1155        // where the override interacts badly with null propagation in the
1156        // downstream engine.
1157        let mut new_fields: Vec<Field> = arrow_schema
1158            .fields()
1159            .iter()
1160            .map(|f| f.as_ref().clone())
1161            .collect();
1162        if d.dict_encode
1163            && let Some(rg0) = metadata.row_groups().first()
1164        {
1165            for (i, fld) in arrow_schema.fields().iter().enumerate() {
1166                if !matches!(
1167                    fld.data_type(),
1168                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1169                ) {
1170                    continue;
1171                }
1172                if let Some(col) = rg0.columns().get(i)
1173                    && col.dictionary_page_offset().is_some()
1174                {
1175                    new_fields[i] = Field::new(
1176                        fld.name(),
1177                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1178                        fld.is_nullable(),
1179                    );
1180                }
1181            }
1182        }
1183        let forced_schema = Arc::new(Schema::new(new_fields));
1184
1185        let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1186        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1187            .with_batch_size(65_536)
1188            .with_projection(projection)
1189            .build()?;
1190        // Hive-style partition columns (`city=NYC/…`) live in the path, not
1191        // the file. Fold them in as constant Utf8 columns so they show up in
1192        // the schema and are queryable — matching the DuckDB backend.
1193        let pairs = hive_pairs(f);
1194        for batch in reader {
1195            let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1196            all.push(if pairs.is_empty() {
1197                batch
1198            } else {
1199                append_partition_cols(&batch, &pairs)?
1200            });
1201        }
1202    }
1203    if all.is_empty() {
1204        return Err(AppError::Internal(format!(
1205            "dataset '{}': parquet source is empty",
1206            d.name
1207        )));
1208    }
1209    Ok(all)
1210}
1211
1212/// Ordered hive-style partition `(key, value)` pairs encoded in a path, i.e.
1213/// directory components shaped like `key=value` (e.g. `year=2024/city=NYC`).
1214fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1215    path.components()
1216        .filter_map(|c| c.as_os_str().to_str())
1217        .filter_map(|seg| {
1218            let (k, v) = seg.split_once('=')?;
1219            if k.is_empty() || v.is_empty() || v.contains('=') {
1220                return None;
1221            }
1222            Some((k.to_string(), v.to_string()))
1223        })
1224        .collect()
1225}
1226
1227/// Append constant Utf8 columns for each hive partition pair. A partition
1228/// key that collides with a real file column is skipped (the file wins).
1229fn append_partition_cols(
1230    batch: &RecordBatch,
1231    pairs: &[(String, String)],
1232) -> Result<RecordBatch, AppError> {
1233    let n = batch.num_rows();
1234    let mut fields: Vec<Field> = batch
1235        .schema()
1236        .fields()
1237        .iter()
1238        .map(|f| f.as_ref().clone())
1239        .collect();
1240    let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1241    for (k, v) in pairs {
1242        if fields.iter().any(|f| f.name() == k) {
1243            continue;
1244        }
1245        fields.push(Field::new(k, DataType::Utf8, false));
1246        cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1247    }
1248    RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1249        .map_err(|e| AppError::Internal(e.to_string()))
1250}
1251
1252/// Register an `AmazonS3` object store on the SessionContext, build a
1253/// `ListingTable` (with any hive partition columns) over the dataset prefix,
1254/// and stream the whole dataset back through `DataFrame::collect`. Using a
1255/// ListingTable here — rather than a bare `read_parquet` — means S3 sources
1256/// get the same hive-partition handling as local parquet.
1257async fn read_s3_parquet(
1258    d: &DatasetConfig,
1259    ctx: &SessionContext,
1260) -> Result<Vec<RecordBatch>, AppError> {
1261    register_s3_object_store(d, ctx)?;
1262    let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1263    let df = ctx
1264        .read_table(provider)
1265        .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1266    Ok(df.collect().await?)
1267}
1268
1269/// Open a Delta table (local or S3) and stream every row back as a Vec of
1270/// `RecordBatch`. We materialise eagerly so the rest of the backend can
1271/// treat all datasets uniformly (single in-memory batch + eq-index).
1272async fn read_delta(
1273    d: &DatasetConfig,
1274    opts: HashMap<String, String>,
1275) -> Result<Vec<RecordBatch>, AppError> {
1276    let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1277        AppError::Internal(format!(
1278            "dataset '{}': bad delta location '{}': {e}",
1279            d.name, d.source.location
1280        ))
1281    })?;
1282    let table = deltalake::open_table_with_storage_options(url, opts)
1283        .await
1284        .map_err(|e| {
1285            AppError::Internal(format!(
1286                "dataset '{}': delta open '{}': {e}",
1287                d.name, d.source.location
1288            ))
1289        })?;
1290    let provider = table.table_provider().await.map_err(|e| {
1291        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1292    })?;
1293    // Drive a full scan via a throwaway SessionContext so we end up with
1294    // an in-memory Vec<RecordBatch> the shared materialise path can use.
1295    let scan_ctx = SessionContext::new();
1296    let df = scan_ctx
1297        .read_table(provider)
1298        .map_err(|e| AppError::Internal(format!("dataset '{}': delta read_table: {e}", d.name)))?;
1299    Ok(df.collect().await?)
1300}
1301
1302/// Build the storage-options HashMap that `deltalake::open_table_with_storage_options`
1303/// expects for S3 access. Keys mirror the AWS env-var names; deltalake
1304/// passes them through to object_store internally.
1305fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1306    let creds = d.resolved_creds();
1307    let region = d.resolved_region();
1308    let s3 = d.s3.clone().unwrap_or_default();
1309    let (bucket, _) = d.source.s3_bucket()?;
1310
1311    let mut opts = HashMap::new();
1312    opts.insert("AWS_REGION".into(), region);
1313    if let Some(ep) = s3.effective_endpoint(bucket) {
1314        opts.insert("AWS_ENDPOINT_URL".into(), ep);
1315    }
1316    if s3.allow_http {
1317        opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1318    }
1319    opts.insert(
1320        "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1321        (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1322    );
1323    if let Some(k) = creds.access_key_id {
1324        opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1325    }
1326    if let Some(s) = creds.secret_access_key {
1327        opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1328    }
1329    if let Some(t) = creds.session_token {
1330        opts.insert("AWS_SESSION_TOKEN".into(), t);
1331    }
1332    // Read-only paths don't need the S3 lock-provider plumbing.
1333    opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1334    Ok(opts)
1335}
1336
1337/// Construct an `AmazonS3` object_store from the dataset's `[dataset.s3]`
1338/// block + resolved credentials and register it on `ctx` under
1339/// `s3://bucket/`.
1340fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1341    let (bucket, _key) = d.source.s3_bucket()?;
1342    let creds = d.resolved_creds();
1343    let region = d.resolved_region();
1344    let s3 = d.s3.clone().unwrap_or_default();
1345
1346    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1347        AppError::Internal(format!(
1348            "dataset '{}': build S3 store for '{bucket}': {e}",
1349            d.name
1350        ))
1351    })?;
1352
1353    let url = Url::parse(&format!("s3://{bucket}"))
1354        .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1355    ctx.register_object_store(&url, Arc::new(store));
1356    Ok(())
1357}
1358
1359fn build_s3(
1360    bucket: &str,
1361    region: &str,
1362    s3: &S3Config,
1363    creds: &ResolvedCreds,
1364) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1365    let mut b = AmazonS3Builder::new()
1366        .with_bucket_name(bucket)
1367        .with_region(region)
1368        .with_allow_http(s3.allow_http)
1369        .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1370    if let Some(ep) = s3.effective_endpoint(bucket) {
1371        b = b.with_endpoint(ep);
1372    }
1373    if let Some(k) = creds.access_key_id.as_deref() {
1374        b = b.with_access_key_id(k);
1375    }
1376    if let Some(s) = creds.secret_access_key.as_deref() {
1377        b = b.with_secret_access_key(s);
1378    }
1379    if let Some(t) = creds.session_token.as_deref() {
1380        b = b.with_token(t);
1381    }
1382    b.build()
1383}
1384
1385fn arrow_to_logical(dt: &DataType) -> LogicalType {
1386    match dt {
1387        DataType::Boolean => LogicalType::Bool,
1388        DataType::Int8
1389        | DataType::Int16
1390        | DataType::Int32
1391        | DataType::Int64
1392        | DataType::UInt8
1393        | DataType::UInt16
1394        | DataType::UInt32
1395        | DataType::UInt64 => LogicalType::Int,
1396        DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1397        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1398        // Dictionary-encoded strings are reported as plain strings — clients
1399        // (and the rest of the backend) shouldn't have to care that we keep
1400        // a compressed representation in memory.
1401        DataType::Dictionary(_, v)
1402            if matches!(
1403                v.as_ref(),
1404                DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1405            ) =>
1406        {
1407            LogicalType::Utf8
1408        }
1409        DataType::Date32
1410        | DataType::Date64
1411        | DataType::Time32(_)
1412        | DataType::Time64(_)
1413        | DataType::Timestamp(_, _)
1414        | DataType::Duration(_)
1415        | DataType::Interval(_) => LogicalType::Temporal,
1416        _ => LogicalType::Other,
1417    }
1418}
1419
1420// ---------------------------------------------------------------------------
1421// Per-batch projection
1422// ---------------------------------------------------------------------------
1423
1424fn project(
1425    schema: &DatasetSchema,
1426    batch: RecordBatch,
1427    columns: &[String],
1428) -> Result<RecordBatch, AppError> {
1429    if columns.is_empty() {
1430        return Ok(batch);
1431    }
1432    let indices: Vec<usize> = columns
1433        .iter()
1434        .map(|c| {
1435            schema
1436                .find(c)
1437                .map(|info| schema.by_name[&info.name.to_lowercase()])
1438        })
1439        .collect::<Result<_, _>>()?;
1440    let fields: Vec<Field> = indices
1441        .iter()
1442        .map(|&i| batch.schema().field(i).clone())
1443        .collect();
1444    let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1445    Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1446}
1447
1448// ---------------------------------------------------------------------------
1449// SQL builder
1450// ---------------------------------------------------------------------------
1451
1452/// Accumulates the typed literal values for a parameterised query.
1453///
1454/// Predicate values are never interpolated into the SQL text. Instead each
1455/// value is pushed here and the builder emits a positional placeholder
1456/// (`$1`, `$2`, …) referencing it. The collected [`ScalarValue`]s are bound
1457/// to the logical plan via [`DataFrame::with_param_values`], so user input
1458/// reaches the engine as typed scalars and can never alter the query
1459/// structure (no SQL injection surface, no escaping to get wrong).
1460#[derive(Default)]
1461struct Params {
1462    values: Vec<ScalarValue>,
1463}
1464
1465impl Params {
1466    fn new() -> Self {
1467        Self::default()
1468    }
1469
1470    /// Bind `v` and return its `$N` placeholder token.
1471    fn bind(&mut self, v: ScalarValue) -> String {
1472        self.values.push(v);
1473        format!("${}", self.values.len())
1474    }
1475
1476    fn into_values(self) -> Vec<ScalarValue> {
1477        self.values
1478    }
1479}
1480
1481fn build_query_sql(
1482    schema: &DatasetSchema,
1483    req: &QueryRequest,
1484    max_page_size: u64,
1485) -> Result<(String, Vec<ScalarValue>), AppError> {
1486    let (limit, offset) = req.effective_limit_offset(max_page_size);
1487    build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1488}
1489
1490fn build_query_stream_sql(
1491    schema: &DatasetSchema,
1492    req: &QueryRequest,
1493) -> Result<(String, Vec<ScalarValue>), AppError> {
1494    let suffix = req
1495        .limit
1496        .map(|limit| format!(" LIMIT {limit}"))
1497        .unwrap_or_default();
1498    build_query_sql_with_suffix(schema, req, &suffix)
1499}
1500
1501fn build_query_sql_with_suffix(
1502    schema: &DatasetSchema,
1503    req: &QueryRequest,
1504    suffix: &str,
1505) -> Result<(String, Vec<ScalarValue>), AppError> {
1506    let agg_plan = req.agg_plan(schema)?;
1507
1508    let cols = if let Some(plan) = &agg_plan {
1509        // Group cols, then aggregations, each aliased to the JSON output key.
1510        let mut parts: Vec<String> = plan
1511            .group_cols
1512            .iter()
1513            .map(|c| DatasetSchema::quote_ident(c))
1514            .collect();
1515        for a in &plan.aggs {
1516            let expr = a.sql_expr()?;
1517            parts.push(format!(
1518                "{expr} AS {}",
1519                DatasetSchema::quote_ident(&a.alias)
1520            ));
1521        }
1522        parts.join(", ")
1523    } else if req.columns.is_empty() {
1524        if req.distinct {
1525            "DISTINCT *".to_string()
1526        } else {
1527            "*".to_string()
1528        }
1529    } else {
1530        let list = req
1531            .columns
1532            .iter()
1533            .map(|c| {
1534                schema
1535                    .find(c)
1536                    .map(|info| DatasetSchema::quote_ident(&info.name))
1537            })
1538            .collect::<Result<Vec<_>, _>>()?
1539            .join(", ");
1540        if req.distinct {
1541            format!("DISTINCT {list}")
1542        } else {
1543            list
1544        }
1545    };
1546
1547    let mut params = Params::new();
1548    let clauses: Vec<String> = req
1549        .predicates
1550        .iter()
1551        .map(|p| pred_to_sql(schema, p, &mut params))
1552        .collect::<Result<_, _>>()?;
1553
1554    let table = DatasetSchema::quote_ident(&schema.name);
1555    let where_clause = if clauses.is_empty() {
1556        String::new()
1557    } else {
1558        format!(" WHERE {}", clauses.join(" AND "))
1559    };
1560    let group_clause = match &agg_plan {
1561        Some(p) => format!(
1562            " GROUP BY {}",
1563            p.group_cols
1564                .iter()
1565                .map(|c| DatasetSchema::quote_ident(c))
1566                .collect::<Vec<_>>()
1567                .join(", "),
1568        ),
1569        None => String::new(),
1570    };
1571    let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
1572        Some(s) => format!(" ORDER BY {s}"),
1573        None => String::new(),
1574    };
1575    let sql =
1576        format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{order_clause}{suffix}");
1577    Ok((sql, params.into_values()))
1578}
1579
1580fn build_count_sql(
1581    schema: &DatasetSchema,
1582    predicates: &[Predicate],
1583) -> Result<(String, Vec<ScalarValue>), AppError> {
1584    let mut params = Params::new();
1585    let clauses: Vec<String> = predicates
1586        .iter()
1587        .map(|p| pred_to_sql(schema, p, &mut params))
1588        .collect::<Result<_, _>>()?;
1589    let table = DatasetSchema::quote_ident(&schema.name);
1590    let where_clause = if clauses.is_empty() {
1591        String::new()
1592    } else {
1593        format!(" WHERE {}", clauses.join(" AND "))
1594    };
1595    let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
1596    Ok((sql, params.into_values()))
1597}
1598
1599fn pred_to_sql(
1600    schema: &DatasetSchema,
1601    pred: &Predicate,
1602    params: &mut Params,
1603) -> Result<String, AppError> {
1604    let info = schema.find(&pred.col)?;
1605    let col = DatasetSchema::quote_ident(&info.name);
1606
1607    match pred.op.as_str() {
1608        "is_null" => return Ok(format!("{col} IS NULL")),
1609        "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
1610        _ => {}
1611    }
1612
1613    let val = pred
1614        .val
1615        .as_ref()
1616        .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
1617
1618    if pred.op == "in" {
1619        let items = val
1620            .as_array()
1621            .filter(|a| !a.is_empty())
1622            .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
1623        let placeholders: Vec<String> = items
1624            .iter()
1625            .map(|item| Ok(params.bind(json_to_scalar(item)?)))
1626            .collect::<Result<_, AppError>>()?;
1627        return Ok(format!("{col} IN ({})", placeholders.join(", ")));
1628    }
1629
1630    let sql_op = match pred.op.as_str() {
1631        "eq" => "=",
1632        "neq" => "!=",
1633        "gt" => ">",
1634        "gte" => ">=",
1635        "lt" => "<",
1636        "lte" => "<=",
1637        "like" => "LIKE",
1638        "ilike" => "ILIKE",
1639        other => return Err(AppError::UnknownOperator(other.into())),
1640    };
1641    let placeholder = params.bind(json_to_scalar(val)?);
1642    Ok(format!("{col} {sql_op} {placeholder}"))
1643}
1644
1645/// Convert a JSON predicate value into a typed Arrow [`ScalarValue`] for
1646/// binding as a query parameter. The engine applies the usual numeric
1647/// widening / comparison coercion against the target column type.
1648fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
1649    match val {
1650        JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
1651        JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
1652        JsonValue::Null => Ok(ScalarValue::Null),
1653        JsonValue::Number(n) => {
1654            if let Some(i) = n.as_i64() {
1655                Ok(ScalarValue::Int64(Some(i)))
1656            } else if let Some(u) = n.as_u64() {
1657                Ok(ScalarValue::UInt64(Some(u)))
1658            } else if let Some(f) = n.as_f64() {
1659                Ok(ScalarValue::Float64(Some(f)))
1660            } else {
1661                Err(AppError::InvalidValue(
1662                    "unsupported numeric literal in predicate".into(),
1663                ))
1664            }
1665        }
1666        _ => Err(AppError::InvalidValue(
1667            "unsupported literal type in predicate".into(),
1668        )),
1669    }
1670}
1671
1672// ---------------------------------------------------------------------------
1673// Equality index — built once at startup, queried on every predicate request
1674// ---------------------------------------------------------------------------
1675
1676fn json_index_key(val: &JsonValue) -> Option<String> {
1677    match val {
1678        JsonValue::String(s) => Some(s.clone()),
1679        JsonValue::Number(n) => Some(n.to_string()),
1680        JsonValue::Bool(b) => Some(b.to_string()),
1681        _ => None,
1682    }
1683}
1684
1685fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1686    let mut out = Vec::new();
1687    let (mut i, mut j) = (0, 0);
1688    while i < a.len() && j < b.len() {
1689        match a[i].cmp(&b[j]) {
1690            Ordering::Equal => {
1691                out.push(a[i]);
1692                i += 1;
1693                j += 1;
1694            }
1695            Ordering::Less => i += 1,
1696            Ordering::Greater => j += 1,
1697        }
1698    }
1699    out
1700}
1701
1702fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1703    let mut out = Vec::with_capacity(a.len() + b.len());
1704    let (mut i, mut j) = (0, 0);
1705    while i < a.len() && j < b.len() {
1706        match a[i].cmp(&b[j]) {
1707            Ordering::Less => {
1708                out.push(a[i]);
1709                i += 1;
1710            }
1711            Ordering::Greater => {
1712                out.push(b[j]);
1713                j += 1;
1714            }
1715            Ordering::Equal => {
1716                out.push(a[i]);
1717                i += 1;
1718                j += 1;
1719            }
1720        }
1721    }
1722    out.extend_from_slice(&a[i..]);
1723    out.extend_from_slice(&b[j..]);
1724    out
1725}
1726
1727fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1728    if predicates.is_empty() || index.is_empty() {
1729        return None;
1730    }
1731
1732    // Fast path: a single `eq` predicate resolves to exactly one index bucket,
1733    // so borrow it directly instead of cloning the row-id vector.
1734    if let [pred] = predicates
1735        && pred.op.as_str() == "eq"
1736    {
1737        let col_lower = pred.col.to_lowercase();
1738        let col_map = index.get(&col_lower)?;
1739        let key = json_index_key(pred.val.as_ref()?)?;
1740        return Some(match col_map.get(&key) {
1741            Some(rows) => Cow::Borrowed(rows.as_slice()),
1742            None => Cow::Owned(Vec::new()),
1743        });
1744    }
1745
1746    let mut result: Option<Vec<u32>> = None;
1747    for pred in predicates {
1748        let col_lower = pred.col.to_lowercase();
1749        let col_map = index.get(&col_lower)?;
1750
1751        let rows: Vec<u32> = match pred.op.as_str() {
1752            "eq" => {
1753                let key = json_index_key(pred.val.as_ref()?)?;
1754                col_map.get(&key).cloned().unwrap_or_default()
1755            }
1756            "in" => {
1757                let items = pred.val.as_ref()?.as_array()?;
1758                let mut merged: Vec<u32> = Vec::new();
1759                for item in items {
1760                    if let Some(r) = col_map.get(&json_index_key(item)?) {
1761                        merged = union_sorted(&merged, r);
1762                    }
1763                }
1764                merged
1765            }
1766            _ => return None,
1767        };
1768
1769        result = Some(match result {
1770            None => rows,
1771            Some(r) => intersect_sorted(&r, &rows),
1772        });
1773    }
1774    result.map(Cow::Owned)
1775}
1776
1777/// Benchmark-only hooks for the equality-index hot path. Hidden from docs and
1778/// not part of the stable API — exposed solely so `benches/index.rs` can build
1779/// an `EqIndex` and exercise `try_index` directly.
1780#[doc(hidden)]
1781pub mod bench {
1782    use super::{EqIndex, FastMap, json_index_key, try_index};
1783    use datapress_core::models::Predicate;
1784    use serde_json::Value as JsonValue;
1785    use std::borrow::Cow;
1786
1787    /// Opaque equality index wrapper for benches (the inner alias is private).
1788    pub struct BenchIndex(EqIndex);
1789
1790    /// Build an index with a single `col` whose `val` bucket holds `rows`.
1791    /// The bucket key is derived with the same `json_index_key` the query path
1792    /// uses, so a matching `eq` predicate is guaranteed to hit.
1793    pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
1794        let key = json_index_key(val).expect("benchable index key");
1795        let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
1796        col_map.insert(key, rows);
1797        let mut index: EqIndex = EqIndex::default();
1798        index.insert(col.to_string(), col_map);
1799        BenchIndex(index)
1800    }
1801
1802    /// Resolve `predicates` against the index — the timed operation.
1803    pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1804        try_index(&idx.0, predicates)
1805    }
1806
1807    /// Reference implementation of the pre-`Cow` behaviour: always clones the
1808    /// matched bucket into an owned `Vec` (what `try_index` did before the
1809    /// single-`eq` borrow fast path). Used as the `clone-before` baseline so
1810    /// the borrow win is measured against the old allocation cost in-process.
1811    pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
1812        let [pred] = predicates else { return None };
1813        if pred.op.as_str() != "eq" {
1814            return None;
1815        }
1816        let col_lower = pred.col.to_lowercase();
1817        let col_map = idx.0.get(&col_lower)?;
1818        let key = json_index_key(pred.val.as_ref()?)?;
1819        Some(col_map.get(&key).cloned().unwrap_or_default())
1820    }
1821}
1822
1823/// Return rows `[offset, offset+limit)` from a chunked dataset by slicing
1824/// the underlying batches (zero-copy) and concatenating the (small) page.
1825fn slice_global(
1826    chunks: &[RecordBatch],
1827    schema: &Arc<Schema>,
1828    offset: usize,
1829    limit: usize,
1830) -> Result<RecordBatch, AppError> {
1831    if limit == 0 || chunks.is_empty() {
1832        return Ok(RecordBatch::new_empty(schema.clone()));
1833    }
1834    let mut out = Vec::new();
1835    let mut to_skip = offset;
1836    let mut remaining = limit;
1837    for b in chunks {
1838        if remaining == 0 {
1839            break;
1840        }
1841        let n = b.num_rows();
1842        if to_skip >= n {
1843            to_skip -= n;
1844            continue;
1845        }
1846        let take = remaining.min(n - to_skip);
1847        out.push(b.slice(to_skip, take));
1848        to_skip = 0;
1849        remaining -= take;
1850    }
1851    if out.is_empty() {
1852        return Ok(RecordBatch::new_empty(schema.clone()));
1853    }
1854    compute::concat_batches(schema, out.iter()).map_err(AppError::from)
1855}
1856
1857/// Materialise the page `rows[offset..offset+limit]` from a chunked dataset.
1858/// Row ids are global (across the concatenation of all chunks). We map each
1859/// requested row to its (chunk, local-index), `take` per chunk, then stitch
1860/// the per-chunk results back together preserving the original row order.
1861fn take_page(
1862    chunks: &[RecordBatch],
1863    schema: &Arc<Schema>,
1864    rows: &[u32],
1865    offset: usize,
1866    limit: usize,
1867) -> Result<RecordBatch, AppError> {
1868    let start = offset.min(rows.len());
1869    let len = limit.min(rows.len() - start);
1870    if len == 0 || chunks.is_empty() {
1871        return Ok(RecordBatch::new_empty(schema.clone()));
1872    }
1873
1874    // Prefix-sum table: `offsets[i]` is the first global row id of chunk `i`,
1875    // and `offsets.last()` is the total row count.
1876    let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
1877    let mut acc: u32 = 0;
1878    offsets.push(0);
1879    for b in chunks {
1880        acc = acc
1881            .checked_add(b.num_rows() as u32)
1882            .expect("row count exceeds u32::MAX");
1883        offsets.push(acc);
1884    }
1885
1886    // Bucket each global row id into the chunk that contains it, remembering
1887    // the original output position so we can restore page order at the end.
1888    let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
1889    for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
1890        let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
1891        let local = gid - offsets[bi];
1892        buckets[bi].push((out_pos as u32, local));
1893    }
1894
1895    // Per-chunk take, recording the destination index for each emitted row.
1896    let mut takens: Vec<RecordBatch> = Vec::new();
1897    let mut dest: Vec<u32> = Vec::with_capacity(len);
1898    for (bi, bucket) in buckets.iter().enumerate() {
1899        if bucket.is_empty() {
1900            continue;
1901        }
1902        let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
1903        let cols: Vec<ArrayRef> = chunks[bi]
1904            .columns()
1905            .iter()
1906            .map(|c| {
1907                arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
1908                    .map_err(AppError::from)
1909            })
1910            .collect::<Result<_, _>>()?;
1911        takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
1912        dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
1913    }
1914
1915    // Stitch per-chunk results then permute to restore the requested order.
1916    let stitched = compute::concat_batches(schema, takens.iter())?;
1917    let mut inv = vec![0u32; len];
1918    for (i, &d) in dest.iter().enumerate() {
1919        inv[d as usize] = i as u32;
1920    }
1921    let perm = UInt32Array::from(inv);
1922    let cols: Vec<ArrayRef> = stitched
1923        .columns()
1924        .iter()
1925        .map(|c| {
1926            arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
1927                .map_err(AppError::from)
1928        })
1929        .collect::<Result<_, _>>()?;
1930    RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
1931}
1932
1933/// Build the equality index per the dataset's policy, against the chunked
1934/// representation. Row ids are global across the concatenation of all
1935/// chunks (so they remain compatible with `take_page` / `slice_global`).
1936fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
1937    use rayon::prelude::*;
1938
1939    if cfg.mode == IndexMode::None || chunks.is_empty() {
1940        return EqIndex::default();
1941    }
1942
1943    let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
1944        Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
1945    } else {
1946        None
1947    };
1948
1949    let max_card = if cfg.mode == IndexMode::Auto {
1950        Some(cfg.max_cardinality)
1951    } else {
1952        None
1953    };
1954
1955    // Per-chunk starting global row id.
1956    let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
1957    let mut acc: u32 = 0;
1958    for b in chunks {
1959        batch_offsets.push(acc);
1960        acc = acc
1961            .checked_add(b.num_rows() as u32)
1962            .expect("row count exceeds u32::MAX");
1963    }
1964
1965    let schema = chunks[0].schema();
1966
1967    schema
1968        .fields()
1969        .par_iter()
1970        .enumerate()
1971        .filter_map(|(ci, field)| {
1972            let col_lower = field.name().to_lowercase();
1973            if let Some(a) = &allow
1974                && !a.contains_key(&col_lower)
1975            {
1976                return None;
1977            }
1978
1979            // Only build for index-friendly types; skip everything else
1980            // up-front so we don't pay the per-chunk dispatch cost.
1981            let dtype = field.data_type();
1982            let dict_utf8 = matches!(dtype,
1983                DataType::Dictionary(k, v)
1984                    if matches!(k.as_ref(), DataType::Int32)
1985                    && matches!(v.as_ref(), DataType::Utf8));
1986            match dtype {
1987                DataType::Utf8
1988                | DataType::Utf8View
1989                | DataType::Boolean
1990                | DataType::Int8
1991                | DataType::Int16
1992                | DataType::Int32
1993                | DataType::Int64 => {}
1994                _ if dict_utf8 => {}
1995                _ => return None,
1996            }
1997
1998            let mut map: FastMap<String, Vec<u32>> = FastMap::default();
1999
2000            for (bi, batch) in chunks.iter().enumerate() {
2001                let base = batch_offsets[bi];
2002                let col = batch.column(ci);
2003
2004                macro_rules! index_col {
2005                    ($arr_ty:ty) => {{
2006                        let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2007                        for row in 0..arr.len() {
2008                            if arr.is_null(row) {
2009                                continue;
2010                            }
2011                            let key = arr.value(row).to_string();
2012                            let gid = base + row as u32;
2013                            if let Some(v) = map.get_mut(&key) {
2014                                v.push(gid);
2015                            } else {
2016                                if let Some(mc) = max_card {
2017                                    if map.len() >= mc {
2018                                        return None;
2019                                    }
2020                                }
2021                                map.insert(key, vec![gid]);
2022                            }
2023                        }
2024                    }};
2025                }
2026
2027                if dict_utf8 {
2028                    // Dictionary(Int32, Utf8): iterate keys + look up the
2029                    // string value from the (small) dictionary. We allocate
2030                    // the key string only when the value is new — repeated
2031                    // values reuse the existing HashMap entry by hash, but
2032                    // `HashMap::get_mut` still needs the key, so we use a
2033                    // borrowed lookup via `get` first to avoid the alloc.
2034                    let arr = col
2035                        .as_any()
2036                        .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2037                        )?;
2038                    let keys = arr.keys();
2039                    let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2040                    for row in 0..arr.len() {
2041                        if arr.is_null(row) {
2042                            continue;
2043                        }
2044                        let k = keys.value(row) as usize;
2045                        let s = values.value(k);
2046                        let gid = base + row as u32;
2047                        if let Some(v) = map.get_mut(s) {
2048                            v.push(gid);
2049                        } else {
2050                            if let Some(mc) = max_card
2051                                && map.len() >= mc
2052                            {
2053                                return None;
2054                            }
2055                            map.insert(s.to_string(), vec![gid]);
2056                        }
2057                    }
2058                } else {
2059                    match dtype {
2060                        DataType::Utf8 => index_col!(StringArray),
2061                        DataType::Utf8View => index_col!(StringViewArray),
2062                        DataType::Boolean => index_col!(BooleanArray),
2063                        DataType::Int8 => index_col!(Int8Array),
2064                        DataType::Int16 => index_col!(Int16Array),
2065                        DataType::Int32 => index_col!(Int32Array),
2066                        DataType::Int64 => index_col!(Int64Array),
2067                        _ => unreachable!(),
2068                    }
2069                }
2070            }
2071
2072            Some((col_lower, map))
2073        })
2074        .collect()
2075}
2076
2077// ---------------------------------------------------------------------------
2078// Serialise-time temporal cast: convert Timestamp/Date/Time columns to Utf8
2079// on the page batch right before JSON encoding. We deliberately do **not**
2080// pay this cost at load time — a `Date32` is 4 bytes per row, its ISO-8601
2081// rendering is ~10–24 bytes per row, and a wide dataset full of temporal
2082// columns would balloon resident RAM. The cast is applied per returned page
2083// after pagination, so the cost is paid only for rows the caller requested.
2084// ---------------------------------------------------------------------------
2085
2086/// Returns true for Arrow types that `write_value` can render directly. Any
2087/// type returning false is pre-cast to Utf8 in [`cast_for_serialize`] so the
2088/// JSON output is faithful rather than silently `null`.
2089fn writable_inline(dt: &DataType) -> bool {
2090    match dt {
2091        DataType::Utf8
2092        | DataType::LargeUtf8
2093        | DataType::Utf8View
2094        | DataType::Boolean
2095        | DataType::Int8
2096        | DataType::Int16
2097        | DataType::Int32
2098        | DataType::Int64
2099        | DataType::UInt8
2100        | DataType::UInt16
2101        | DataType::UInt32
2102        | DataType::UInt64
2103        | DataType::Float32
2104        | DataType::Float64
2105        | DataType::Decimal128(_, _)
2106        | DataType::Decimal256(_, _) => true,
2107        DataType::Dictionary(k, v)
2108            if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2109        {
2110            true
2111        }
2112        _ => false,
2113    }
2114}
2115
2116/// Cast any column whose dtype isn't directly writable by `write_value` to
2117/// `Utf8`, on the bounded page batch. Covers temporals (Timestamp/Date/Time)
2118/// — kept native in resident memory to save RAM — and also any exotic dtype
2119/// (Float16, Binary, List, Struct, Decimal-with-unsupported-precision, …)
2120/// so the JSON serializer never falls back to writing literal `null`.
2121fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2122    let schema = batch.schema();
2123    let to_cast: Vec<usize> = schema
2124        .fields()
2125        .iter()
2126        .enumerate()
2127        .filter_map(|(i, f)| {
2128            if writable_inline(f.data_type()) {
2129                None
2130            } else {
2131                Some(i)
2132            }
2133        })
2134        .collect();
2135    if to_cast.is_empty() {
2136        return Ok(batch.clone());
2137    }
2138    let new_fields: Vec<Field> = schema
2139        .fields()
2140        .iter()
2141        .enumerate()
2142        .map(|(i, f)| {
2143            if to_cast.contains(&i) {
2144                Field::new(f.name(), DataType::Utf8, f.is_nullable())
2145            } else {
2146                f.as_ref().clone()
2147            }
2148        })
2149        .collect();
2150    let new_schema = Arc::new(Schema::new(new_fields));
2151    let cols: Vec<ArrayRef> = batch
2152        .columns()
2153        .iter()
2154        .enumerate()
2155        .map(|(i, c)| {
2156            if to_cast.contains(&i) {
2157                compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2158            } else {
2159                Ok(c.clone())
2160            }
2161        })
2162        .collect::<Result<_, _>>()?;
2163    RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2164}
2165
2166// ---------------------------------------------------------------------------
2167// Compute helpers — retained for symmetry; reserved for future inline scan
2168// path. Currently the engine fallback handles all non-index queries.
2169// ---------------------------------------------------------------------------
2170
2171#[allow(dead_code)]
2172#[derive(Clone, Copy)]
2173enum CmpOp {
2174    Eq,
2175    Neq,
2176    Gt,
2177    Gte,
2178    Lt,
2179    Lte,
2180    Like,
2181    ILike,
2182}
2183
2184#[allow(dead_code)]
2185fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2186    let arr = col
2187        .as_any()
2188        .downcast_ref::<StringArray>()
2189        .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2190    let s = Scalar::new(StringArray::from(vec![val]));
2191    Ok(eq(arr, &s)?)
2192}
2193
2194#[allow(dead_code)]
2195fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2196    macro_rules! num_cmp {
2197        ($arr_type:ty, $cast:ty) => {{
2198            let n = val
2199                .as_f64()
2200                .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2201                as $cast;
2202            let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2203            let s = Scalar::new(<$arr_type>::from(vec![n]));
2204            Ok(match op {
2205                CmpOp::Eq => eq(arr, &s)?,
2206                CmpOp::Neq => neq(arr, &s)?,
2207                CmpOp::Gt => gt(arr, &s)?,
2208                CmpOp::Gte => gt_eq(arr, &s)?,
2209                CmpOp::Lt => lt(arr, &s)?,
2210                CmpOp::Lte => lt_eq(arr, &s)?,
2211                CmpOp::Like | CmpOp::ILike => {
2212                    return Err(AppError::InvalidValue(
2213                        "LIKE requires a string column".into(),
2214                    ));
2215                }
2216            })
2217        }};
2218    }
2219    match col.data_type() {
2220        DataType::Utf8 => {
2221            let s = val
2222                .as_str()
2223                .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2224            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2225            let sc = Scalar::new(StringArray::from(vec![s]));
2226            Ok(match op {
2227                CmpOp::Eq => eq(arr, &sc)?,
2228                CmpOp::Neq => neq(arr, &sc)?,
2229                CmpOp::Gt => gt(arr, &sc)?,
2230                CmpOp::Gte => gt_eq(arr, &sc)?,
2231                CmpOp::Lt => lt(arr, &sc)?,
2232                CmpOp::Lte => lt_eq(arr, &sc)?,
2233                CmpOp::Like => compute::like(arr, &sc)?,
2234                CmpOp::ILike => compute::ilike(arr, &sc)?,
2235            })
2236        }
2237        DataType::Int8 => num_cmp!(Int8Array, i8),
2238        DataType::Int16 => num_cmp!(Int16Array, i16),
2239        DataType::Int32 => num_cmp!(Int32Array, i32),
2240        DataType::Int64 => num_cmp!(Int64Array, i64),
2241        DataType::Float32 => num_cmp!(Float32Array, f32),
2242        DataType::Float64 => num_cmp!(Float64Array, f64),
2243        dt => Err(AppError::InvalidValue(format!(
2244            "unsupported type for comparison: {dt:?}"
2245        ))),
2246    }
2247}
2248
2249// ---------------------------------------------------------------------------
2250// Serialisation
2251// ---------------------------------------------------------------------------
2252
2253pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2254    // Temporal columns are kept native in resident memory (compact). Cast
2255    // them — plus any other dtype `write_value` can't render directly — to
2256    // Utf8 here, on the bounded page batch, so the JSON output is faithful
2257    // without paying the load-time RAM cost.
2258    let batch = cast_for_serialize(batch)?;
2259    let schema = batch.schema();
2260    let n_rows = batch.num_rows();
2261
2262    let keys: Vec<Vec<u8>> = schema
2263        .fields()
2264        .iter()
2265        .map(|f| {
2266            let mut k = Vec::with_capacity(f.name().len() + 3);
2267            k.push(b'"');
2268            k.extend_from_slice(f.name().as_bytes());
2269            k.extend_from_slice(b"\":");
2270            k
2271        })
2272        .collect();
2273
2274    // Resolve every column's concrete Arrow array type exactly ONCE here,
2275    // instead of paying a `data_type()` match + `downcast_ref` for every
2276    // single cell. The inner row loop then dispatches over a small enum,
2277    // which the compiler turns into a cheap jump table.
2278    let encoders: Vec<ColEnc> = batch
2279        .columns()
2280        .iter()
2281        .map(|c| ColEnc::new(c.as_ref()))
2282        .collect();
2283
2284    let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2285    let mut itoa_buf = itoa::Buffer::new();
2286    let mut ryu_buf = ryu::Buffer::new();
2287    buf.push(b'[');
2288
2289    for row in 0..n_rows {
2290        if row > 0 {
2291            buf.push(b',');
2292        }
2293        buf.push(b'{');
2294        for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2295            if i > 0 {
2296                buf.push(b',');
2297            }
2298            buf.extend_from_slice(key);
2299            enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2300        }
2301        buf.push(b'}');
2302    }
2303
2304    buf.push(b']');
2305    Ok(unsafe { String::from_utf8_unchecked(buf) })
2306}
2307
2308/// Per-column JSON encoder. The concrete Arrow array type is resolved once
2309/// (in [`ColEnc::new`]) so the hot row loop dispatches over this small enum
2310/// instead of repeating `data_type()` matching + `downcast_ref` for every
2311/// cell. Each variant borrows the already-downcast typed array.
2312enum ColEnc<'a> {
2313    Utf8(&'a StringArray),
2314    LargeUtf8(&'a LargeStringArray),
2315    Utf8View(&'a StringViewArray),
2316    /// Dictionary-encoded UTF-8 (Int32 keys). Holds the keys array and the
2317    /// pre-downcast string values so neither is re-resolved per row.
2318    DictI32Utf8(
2319        &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2320        &'a StringArray,
2321    ),
2322    Bool(&'a BooleanArray),
2323    I8(&'a Int8Array),
2324    I16(&'a Int16Array),
2325    I32(&'a Int32Array),
2326    I64(&'a Int64Array),
2327    U8(&'a UInt8Array),
2328    U16(&'a UInt16Array),
2329    U32(&'a UInt32Array),
2330    U64(&'a UInt64Array),
2331    Dec128(&'a Decimal128Array),
2332    Dec256(&'a Decimal256Array),
2333    F32(&'a Float32Array),
2334    F64(&'a Float64Array),
2335    /// Anything else: fall back to the generic `write_value` dispatch.
2336    Other(&'a dyn Array),
2337}
2338
2339impl<'a> ColEnc<'a> {
2340    fn new(col: &'a dyn Array) -> ColEnc<'a> {
2341        macro_rules! dc {
2342            ($t:ty) => {
2343                col.as_any().downcast_ref::<$t>().unwrap()
2344            };
2345        }
2346        match col.data_type() {
2347            DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2348            DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2349            DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2350            DataType::Dictionary(key, value)
2351                if matches!(key.as_ref(), DataType::Int32)
2352                    && matches!(value.as_ref(), DataType::Utf8) =>
2353            {
2354                let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2355                let values = dict
2356                    .values()
2357                    .as_any()
2358                    .downcast_ref::<StringArray>()
2359                    .unwrap();
2360                ColEnc::DictI32Utf8(dict, values)
2361            }
2362            DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2363            DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2364            DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2365            DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2366            DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2367            DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2368            DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2369            DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2370            DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2371            DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2372            DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2373            DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2374            DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2375            _ => ColEnc::Other(col),
2376        }
2377    }
2378
2379    #[inline]
2380    fn write(
2381        &self,
2382        buf: &mut Vec<u8>,
2383        row: usize,
2384        itoa_buf: &mut itoa::Buffer,
2385        ryu_buf: &mut ryu::Buffer,
2386    ) {
2387        macro_rules! int {
2388            ($arr:expr) => {{
2389                if $arr.is_null(row) {
2390                    buf.extend_from_slice(b"null");
2391                } else {
2392                    buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
2393                }
2394            }};
2395        }
2396        match self {
2397            ColEnc::Utf8(a) => {
2398                if a.is_null(row) {
2399                    buf.extend_from_slice(b"null");
2400                } else {
2401                    write_str(buf, a.value(row));
2402                }
2403            }
2404            ColEnc::LargeUtf8(a) => {
2405                if a.is_null(row) {
2406                    buf.extend_from_slice(b"null");
2407                } else {
2408                    write_str(buf, a.value(row));
2409                }
2410            }
2411            ColEnc::Utf8View(a) => {
2412                if a.is_null(row) {
2413                    buf.extend_from_slice(b"null");
2414                } else {
2415                    write_str(buf, a.value(row));
2416                }
2417            }
2418            ColEnc::DictI32Utf8(keys, values) => {
2419                if keys.is_null(row) {
2420                    buf.extend_from_slice(b"null");
2421                } else {
2422                    let k = keys.keys().value(row) as usize;
2423                    write_str(buf, values.value(k));
2424                }
2425            }
2426            ColEnc::Bool(a) => {
2427                if a.is_null(row) {
2428                    buf.extend_from_slice(b"null");
2429                } else {
2430                    buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
2431                }
2432            }
2433            ColEnc::I8(a) => int!(a),
2434            ColEnc::I16(a) => int!(a),
2435            ColEnc::I32(a) => int!(a),
2436            ColEnc::I64(a) => int!(a),
2437            ColEnc::U8(a) => int!(a),
2438            ColEnc::U16(a) => int!(a),
2439            ColEnc::U32(a) => int!(a),
2440            ColEnc::U64(a) => int!(a),
2441            ColEnc::Dec128(a) => {
2442                if a.is_null(row) {
2443                    buf.extend_from_slice(b"null");
2444                } else {
2445                    write_str(buf, &a.value_as_string(row));
2446                }
2447            }
2448            ColEnc::Dec256(a) => {
2449                if a.is_null(row) {
2450                    buf.extend_from_slice(b"null");
2451                } else {
2452                    write_str(buf, &a.value_as_string(row));
2453                }
2454            }
2455            ColEnc::F32(a) => {
2456                if a.is_null(row) {
2457                    buf.extend_from_slice(b"null");
2458                } else {
2459                    let v = a.value(row);
2460                    if v.is_finite() {
2461                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2462                    } else {
2463                        buf.extend_from_slice(b"null");
2464                    }
2465                }
2466            }
2467            ColEnc::F64(a) => {
2468                if a.is_null(row) {
2469                    buf.extend_from_slice(b"null");
2470                } else {
2471                    let v = a.value(row);
2472                    if v.is_finite() {
2473                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2474                    } else {
2475                        buf.extend_from_slice(b"null");
2476                    }
2477                }
2478            }
2479            ColEnc::Other(col) => {
2480                if col.is_null(row) {
2481                    buf.extend_from_slice(b"null");
2482                } else {
2483                    write_value(buf, *col, row);
2484                }
2485            }
2486        }
2487    }
2488}
2489
2490#[inline]
2491fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
2492    match col.data_type() {
2493        DataType::Utf8 => write_str(
2494            buf,
2495            col.as_any()
2496                .downcast_ref::<StringArray>()
2497                .unwrap()
2498                .value(row),
2499        ),
2500        DataType::LargeUtf8 => write_str(
2501            buf,
2502            col.as_any()
2503                .downcast_ref::<LargeStringArray>()
2504                .unwrap()
2505                .value(row),
2506        ),
2507        DataType::Utf8View => write_str(
2508            buf,
2509            col.as_any()
2510                .downcast_ref::<StringViewArray>()
2511                .unwrap()
2512                .value(row),
2513        ),
2514        DataType::Dictionary(key, value)
2515            if matches!(key.as_ref(), DataType::Int32)
2516                && matches!(value.as_ref(), DataType::Utf8) =>
2517        {
2518            let dict = col
2519                .as_any()
2520                .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2521                .unwrap();
2522            let keys = dict.keys();
2523            let values = dict
2524                .values()
2525                .as_any()
2526                .downcast_ref::<StringArray>()
2527                .unwrap();
2528            let k = keys.value(row) as usize;
2529            write_str(buf, values.value(k));
2530        }
2531        DataType::Boolean => {
2532            let v = col
2533                .as_any()
2534                .downcast_ref::<BooleanArray>()
2535                .unwrap()
2536                .value(row);
2537            buf.extend_from_slice(if v { b"true" } else { b"false" });
2538        }
2539        DataType::Int8 => {
2540            let mut b = itoa::Buffer::new();
2541            buf.extend_from_slice(
2542                b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
2543                    .as_bytes(),
2544            );
2545        }
2546        DataType::Int16 => {
2547            let mut b = itoa::Buffer::new();
2548            buf.extend_from_slice(
2549                b.format(
2550                    col.as_any()
2551                        .downcast_ref::<Int16Array>()
2552                        .unwrap()
2553                        .value(row),
2554                )
2555                .as_bytes(),
2556            );
2557        }
2558        DataType::Int32 => {
2559            let mut b = itoa::Buffer::new();
2560            buf.extend_from_slice(
2561                b.format(
2562                    col.as_any()
2563                        .downcast_ref::<Int32Array>()
2564                        .unwrap()
2565                        .value(row),
2566                )
2567                .as_bytes(),
2568            );
2569        }
2570        DataType::Int64 => {
2571            let mut b = itoa::Buffer::new();
2572            buf.extend_from_slice(
2573                b.format(
2574                    col.as_any()
2575                        .downcast_ref::<Int64Array>()
2576                        .unwrap()
2577                        .value(row),
2578                )
2579                .as_bytes(),
2580            );
2581        }
2582        DataType::UInt8 => {
2583            let mut b = itoa::Buffer::new();
2584            buf.extend_from_slice(
2585                b.format(
2586                    col.as_any()
2587                        .downcast_ref::<UInt8Array>()
2588                        .unwrap()
2589                        .value(row),
2590                )
2591                .as_bytes(),
2592            );
2593        }
2594        DataType::UInt16 => {
2595            let mut b = itoa::Buffer::new();
2596            buf.extend_from_slice(
2597                b.format(
2598                    col.as_any()
2599                        .downcast_ref::<UInt16Array>()
2600                        .unwrap()
2601                        .value(row),
2602                )
2603                .as_bytes(),
2604            );
2605        }
2606        DataType::UInt32 => {
2607            let mut b = itoa::Buffer::new();
2608            buf.extend_from_slice(
2609                b.format(
2610                    col.as_any()
2611                        .downcast_ref::<UInt32Array>()
2612                        .unwrap()
2613                        .value(row),
2614                )
2615                .as_bytes(),
2616            );
2617        }
2618        DataType::UInt64 => {
2619            let mut b = itoa::Buffer::new();
2620            buf.extend_from_slice(
2621                b.format(
2622                    col.as_any()
2623                        .downcast_ref::<UInt64Array>()
2624                        .unwrap()
2625                        .value(row),
2626                )
2627                .as_bytes(),
2628            );
2629        }
2630        DataType::Decimal128(_, _) => {
2631            let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
2632            write_str(buf, &arr.value_as_string(row));
2633        }
2634        DataType::Decimal256(_, _) => {
2635            let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
2636            write_str(buf, &arr.value_as_string(row));
2637        }
2638        DataType::Float32 => {
2639            let v = col
2640                .as_any()
2641                .downcast_ref::<Float32Array>()
2642                .unwrap()
2643                .value(row);
2644            if v.is_finite() {
2645                let mut b = ryu::Buffer::new();
2646                buf.extend_from_slice(b.format_finite(v).as_bytes());
2647            } else {
2648                buf.extend_from_slice(b"null");
2649            }
2650        }
2651        DataType::Float64 => {
2652            let v = col
2653                .as_any()
2654                .downcast_ref::<Float64Array>()
2655                .unwrap()
2656                .value(row);
2657            if v.is_finite() {
2658                let mut b = ryu::Buffer::new();
2659                buf.extend_from_slice(b.format_finite(v).as_bytes());
2660            } else {
2661                buf.extend_from_slice(b"null");
2662            }
2663        }
2664        // Any dtype not handled above must have been pre-cast to Utf8 by
2665        // `cast_for_serialize`. Hitting this arm is a bug — surface it as a
2666        // visible JSON string rather than a silent null so it can't be
2667        // mistaken for a real NULL value.
2668        other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
2669    }
2670}
2671
2672#[inline]
2673fn write_str(buf: &mut Vec<u8>, s: &str) {
2674    buf.push(b'"');
2675    for &byte in s.as_bytes() {
2676        match byte {
2677            b'"' => buf.extend_from_slice(b"\\\""),
2678            b'\\' => buf.extend_from_slice(b"\\\\"),
2679            b'\n' => buf.extend_from_slice(b"\\n"),
2680            b'\r' => buf.extend_from_slice(b"\\r"),
2681            b'\t' => buf.extend_from_slice(b"\\t"),
2682            0x00..=0x1f => {
2683                buf.extend_from_slice(b"\\u00");
2684                const HEX: &[u8] = b"0123456789abcdef";
2685                buf.push(HEX[(byte >> 4) as usize]);
2686                buf.push(HEX[(byte & 0xf) as usize]);
2687            }
2688            b => buf.push(b),
2689        }
2690    }
2691    buf.push(b'"');
2692}
2693
2694// ---------------------------------------------------------------------------
2695// Backend trait impl — wires the store into the generic core handlers.
2696// ---------------------------------------------------------------------------
2697
2698#[async_trait]
2699impl Backend for Store {
2700    fn names(&self) -> Vec<String> {
2701        Store::names(self)
2702    }
2703
2704    fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
2705        let st = self.dataset(name)?;
2706        Ok(DatasetSummary {
2707            name: st.schema.name.clone(),
2708            columns: st.schema.columns.len(),
2709            rows: st.num_rows(),
2710        })
2711    }
2712
2713    fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
2714        let st = self.dataset(name)?;
2715        Ok(Arc::new(st.schema.clone()))
2716    }
2717
2718    fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
2719        let st = self.dataset(name)?;
2720        // Report indexed columns in the dataset's declared schema order
2721        // so the `/schema` response is deterministic.
2722        let mut cols: Vec<String> = st
2723            .schema
2724            .columns
2725            .iter()
2726            .map(|c| c.name.clone())
2727            .filter(|n| st.index.contains_key(n))
2728            .collect();
2729        // Any indexed columns not in `schema.columns` (shouldn't happen,
2730        // but be defensive) get appended sorted.
2731        let mut extras: Vec<String> = st
2732            .index
2733            .keys()
2734            .filter(|n| !cols.iter().any(|c| c == *n))
2735            .cloned()
2736            .collect();
2737        extras.sort();
2738        cols.extend(extras);
2739        Ok(cols)
2740    }
2741
2742    async fn sample(&self, name: &str) -> Result<String, AppError> {
2743        Store::sample(self, name).await
2744    }
2745
2746    async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
2747        Store::query(self, name, req).await
2748    }
2749
2750    async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
2751        Store::query_arrow(self, name, req).await
2752    }
2753
2754    async fn query_arrow_stream(
2755        &self,
2756        name: &str,
2757        req: &QueryRequest,
2758    ) -> Result<ArrowIpcStream, AppError> {
2759        Store::query_arrow_stream(self, name, req).await
2760    }
2761
2762    async fn query_arrow_stream_all(
2763        &self,
2764        name: &str,
2765        req: &QueryRequest,
2766    ) -> Result<ArrowIpcStream, AppError> {
2767        Store::query_arrow_stream_all(self, name, req).await
2768    }
2769
2770    async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
2771        Store::count(self, name, req).await
2772    }
2773
2774    async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
2775        Store::query_sql(self, sql, max_rows).await
2776    }
2777
2778    async fn query_sql_arrow_stream(
2779        &self,
2780        sql: &str,
2781        max_rows: u64,
2782    ) -> Result<ArrowIpcStream, AppError> {
2783        Store::query_sql_arrow_stream(self, sql, max_rows).await
2784    }
2785
2786    async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
2787        Store::parquet(self, name).await
2788    }
2789
2790    async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
2791        Store::reload(self, name).await
2792    }
2793}