Skip to main content

datapress_datafusion/
store.rs

1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5
6use arc_swap::ArcSwap;
7use arrow::array::{
8    Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
9    Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
10    StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
11};
12use arrow::compute;
13use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
14use arrow::datatypes::{DataType, Field, Schema};
15use async_trait::async_trait;
16use parquet::arrow::ProjectionMask;
17use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
18use serde_json::Value as JsonValue;
19
20use datafusion::datasource::file_format::parquet::ParquetFormat;
21use datafusion::datasource::listing::{
22    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
23};
24use datafusion::datasource::{MemTable, TableProvider};
25use datafusion::prelude::SessionContext;
26use datafusion::scalar::ScalarValue;
27
28use object_store::aws::AmazonS3Builder;
29use url::Url;
30
31use datapress_core::backend::{
32    ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
33};
34use datapress_core::config::{
35    AddressingStyle, AppConfig, DatasetConfig, IndexConfig, IndexMode, Partitioning, ResolvedCreds,
36    S3Config, SourceKind,
37};
38use datapress_core::errors::AppError;
39use datapress_core::models::{CountRequest, Predicate, QueryRequest};
40use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
41
42// ---------------------------------------------------------------------------
43// Public types
44// ---------------------------------------------------------------------------
45
46/// Hasher-swapped map alias. The default `std::collections::HashMap` uses
47/// SipHash (DoS-resistant but slow). The equality index keys are our own
48/// column values, not untrusted hash-flood input, so we use `ahash` — a much
49/// faster non-cryptographic hasher — on this per-request hot path.
50type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
51
52/// Pre-built equality index: lowercase col name → string-encoded value → sorted row ids.
53type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
54
55/// Per-dataset state: schema metadata, the resident chunks, and the
56/// equality index built per the dataset's `[dataset.index]` policy.
57///
58/// `data` is the dataset as a `Vec<RecordBatch>` — exactly the chunks
59/// produced by the underlying reader, after temporal columns are cast to
60/// `Utf8`. We deliberately do **not** call `concat_batches` to fuse them
61/// into one batch: on wide schemas (hundreds of columns) that transiently
62/// allocates a second full copy of the decoded Arrow data, pushing peak
63/// RSS to ~2× the resident size and OOM-killing the process at startup.
64///
65/// When `lazy` is true the dataset is *not* materialised: `data` is empty,
66/// `index` is empty, and every query is dispatched to DataFusion SQL
67/// against a registered `ListingTable`. `arrow_schema` still carries the
68/// inferred schema so discovery endpoints work.
69pub struct DatasetState {
70    pub schema: DatasetSchema,
71    pub data: Vec<RecordBatch>,
72    pub arrow_schema: Arc<Schema>,
73    pub index: EqIndex,
74    pub lazy: bool,
75}
76
77impl DatasetState {
78    /// Sum of `num_rows()` across all resident chunks. `0` for lazy datasets.
79    pub fn num_rows(&self) -> usize {
80        self.data.iter().map(|b| b.num_rows()).sum()
81    }
82}
83
84/// Multi-dataset registry. Each dataset is registered in the shared
85/// `SessionContext` under its configured name. The per-dataset state is
86/// held behind `ArcSwap` so a reload can atomically replace it without
87/// blocking concurrent queries.
88pub struct Store {
89    ctx: SessionContext,
90    max_page_size: u64,
91    /// Original dataset configs, indexed by name. Reload reads the source
92    /// path from here — clients can't redirect a reload at an arbitrary file.
93    configs: HashMap<String, DatasetConfig>,
94    /// Hot-swappable snapshot of all currently loaded datasets.
95    datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
96    /// Per-name reload mutex. Serialises concurrent reloads of the same
97    /// dataset; reloads of different datasets proceed in parallel.
98    reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
99}
100
101impl Store {
102    /// Load every dataset declared in `cfg`.
103    pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
104        // One-shot init for the deltalake S3 backend. Safe to call more
105        // than once — the handlers are idempotent.
106        if cfg
107            .datasets
108            .iter()
109            .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
110        {
111            deltalake::aws::register_handlers(None);
112        }
113
114        let ctx = SessionContext::new();
115        let mut datasets = HashMap::with_capacity(cfg.datasets.len());
116        let mut configs = HashMap::with_capacity(cfg.datasets.len());
117
118        for d in &cfg.datasets {
119            let (state, provider) = build_dataset(d, &ctx).await?;
120            ctx.register_table(d.name.as_str(), provider)?;
121            datasets.insert(d.name.clone(), Arc::new(state));
122            configs.insert(d.name.clone(), d.clone());
123        }
124        Ok(Self {
125            ctx,
126            max_page_size: cfg.server.max_page_size.max(1),
127            configs,
128            datasets: ArcSwap::from_pointee(datasets),
129            reload_locks: Mutex::new(HashMap::new()),
130        })
131    }
132
133    /// Sorted list of dataset names.
134    pub fn names(&self) -> Vec<String> {
135        let snap = self.datasets.load();
136        let mut v: Vec<String> = snap.keys().cloned().collect();
137        v.sort();
138        v
139    }
140
141    pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
142        self.datasets
143            .load()
144            .get(name)
145            .cloned()
146            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
147    }
148
149    /// JSON for the first row of the dataset, or `null` if empty. Used by
150    /// `GET /api/datasets/{name}/schema` for discoverability.
151    pub async fn sample(&self, name: &str) -> Result<String, AppError> {
152        let st = self.dataset(name)?;
153
154        // Lazy datasets have no resident batch — pull one row via SQL.
155        if st.lazy {
156            let table = DatasetSchema::quote_ident(&st.schema.name);
157            let sql = format!("SELECT * FROM {table} LIMIT 1");
158            let df = self.ctx.sql(&sql).await?;
159            let batches = df.collect().await?;
160            if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
161                return Ok("null".into());
162            }
163            let arr = serialize(&batches[0].slice(0, 1))?;
164            let trimmed = arr.trim();
165            let inner = trimmed
166                .strip_prefix('[')
167                .and_then(|s| s.strip_suffix(']'))
168                .unwrap_or(trimmed);
169            return Ok(inner.to_string());
170        }
171
172        let first = match st.data.iter().find(|b| b.num_rows() > 0) {
173            Some(b) => b,
174            None => return Ok("null".into()),
175        };
176        let arr = serialize(&first.slice(0, 1))?;
177        // strip the outer [] to return a single object
178        let trimmed = arr.trim();
179        let inner = trimmed
180            .strip_prefix('[')
181            .and_then(|s| s.strip_suffix(']'))
182            .unwrap_or(trimmed);
183        Ok(inner.to_string())
184    }
185
186    /// Rebuild `name` from disk and atomically swap it in. Concurrent queries
187    /// against the same name continue to see the *old* `Arc<DatasetState>`
188    /// until they finish; the old data is dropped once the last reference
189    /// goes away.
190    pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
191        // 1. Look up the dataset config. Not finding it = 404.
192        let cfg = self
193            .configs
194            .get(name)
195            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
196            .clone();
197
198        // 2. Per-name lock: only one reload of this dataset at a time.
199        let lock = {
200            let mut locks = self.reload_locks.lock().unwrap();
201            locks
202                .entry(name.to_string())
203                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
204                .clone()
205        };
206        let _guard = lock.lock().await;
207
208        let started = std::time::Instant::now();
209
210        // 3. Heavy lifting (source read + index build). Parquet/delta
211        // readers are themselves async, so we don't wrap in `web::block`.
212        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
213        let rows = state.num_rows();
214
215        // 4. Atomic swap.
216        //   a) Replace the MemTable inside the SessionContext.
217        //   b) ArcSwap a new snapshot map with the updated Arc<DatasetState>.
218        // In-flight queries already hold the old provider + old Arc; they
219        // run to completion. New queries see the new data.
220        let _ = self.ctx.deregister_table(name)?;
221        self.ctx.register_table(name, provider)?;
222
223        let mut new_map = (**self.datasets.load()).clone();
224        new_map.insert(name.to_string(), Arc::new(state));
225        self.datasets.store(Arc::new(new_map));
226
227        let elapsed_ms = started.elapsed().as_millis();
228        log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
229        Ok(ReloadStats { rows, elapsed_ms })
230    }
231
232    /// Run a `QueryRequest` against `name`. Empty predicates → O(1) Arrow
233    /// slice. Otherwise → DataFusion SQL on the single registered table.
234    /// Lazy datasets skip the in-memory hot paths and always dispatch to SQL.
235    pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
236        let batch = self.query_batch(name, req).await?;
237        if batch.num_rows() == 0 {
238            return Ok("[]".to_string());
239        }
240        serialize(&batch)
241    }
242
243    /// Same plan as [`Self::query`], but encode the result page as an
244    /// Arrow IPC stream (one schema message + one batch + EOS). Empty
245    /// results still produce a valid, self-describing zero-batch stream.
246    pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
247        let batch = self.query_batch(name, req).await?;
248        let schema = batch.schema();
249        let mut buf = Vec::with_capacity(8 * 1024);
250        {
251            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
252            if batch.num_rows() > 0 {
253                w.write(&batch)?;
254            }
255            w.finish()?;
256        }
257        Ok(buf)
258    }
259
260    pub async fn query_arrow_stream(
261        &self,
262        name: &str,
263        req: &QueryRequest,
264    ) -> Result<ArrowIpcStream, AppError> {
265        let batches = self.query_batches(name, req).await?;
266        Ok(stream_arrow_batches(batches))
267    }    pub async fn query_arrow_stream_all(
268        &self,
269        name: &str,
270        req: &QueryRequest,
271    ) -> Result<ArrowIpcStream, AppError> {
272        let batches = self.query_batches_all(name, req).await?;
273        Ok(stream_arrow_batches(batches))
274    }
275
276    /// Encode the entire dataset as a single self-contained Parquet file.
277    ///
278    /// Collects every row (all columns, no predicates, no paging) and runs
279    /// it through a single [`parquet::arrow::ArrowWriter`], so the result
280    /// carries the row-group + footer metadata a Parquet reader needs to
281    /// answer `count(*)` straight from the footer. Powers the cached
282    /// `GET /datasets/{name}/parquet` HTTP endpoint.
283    pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
284        // All rows, all columns, no predicates / ordering / limit.
285        let req = QueryRequest {
286            columns: Vec::new(),
287            predicates: Vec::new(),
288            group_by: Vec::new(),
289            aggregations: Vec::new(),
290            distinct: false,
291            order_by: Vec::new(),
292            limit: None,
293            page: 1,
294            page_size: 1,
295        };
296        let st = self.dataset(name)?;
297        let batches = self.query_batches_all(name, &req).await?;
298        // Use the actual batch schema when we have rows so the writer schema
299        // matches exactly (projection/nullability); fall back to the
300        // dataset schema for an empty dataset.
301        let schema = batches
302            .first()
303            .map(|b| b.schema())
304            .unwrap_or_else(|| st.arrow_schema.clone());
305
306        let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
307        {
308            let props = parquet::file::properties::WriterProperties::builder()
309                .set_compression(parquet::basic::Compression::SNAPPY)
310                .build();
311            let mut writer =
312                parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
313                    .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
314            for batch in &batches {
315                if batch.num_rows() > 0 {
316                    writer
317                        .write(batch)
318                        .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
319                }
320            }
321            writer
322                .close()
323                .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
324        }
325        Ok(bytes::Bytes::from(buf))
326    }
327
328    /// Compute the result page as a single `RecordBatch`. Shared between
329    /// the JSON and Arrow IPC encoders.
330    async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
331        let batches = self.query_batches(name, req).await?;
332        if batches.is_empty() {
333            return Ok(RecordBatch::new_empty(Arc::new(
334                arrow::datatypes::Schema::empty(),
335            )));
336        }
337        if batches.len() == 1 {
338            return Ok(batches.into_iter().next().expect("checked len"));
339        }
340        if batches.iter().all(|b| b.num_rows() == 0) {
341            return Ok(RecordBatch::new_empty(batches[0].schema()));
342        }
343        let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
344        Ok(batch)
345    }
346
347    /// Compute the result page as Arrow batches. Arrow IPC responses can
348    /// write these directly, while JSON callers concatenate via
349    /// [`Self::query_batch`] for the existing row conversion path.
350    async fn query_batches(
351        &self,
352        name: &str,
353        req: &QueryRequest,
354    ) -> Result<Vec<RecordBatch>, AppError> {
355        let st = self.dataset(name)?;
356
357        let page = req.page.max(1);
358        let page_size = req.page_size.clamp(1, self.max_page_size);
359        let offset = ((page - 1) * page_size) as usize;
360        let limit = page_size as usize;
361
362        self.query_batches_inner(st, req, Some((offset, limit)))
363            .await
364    }
365
366    /// Compute all matching rows as Arrow batches for the one-request
367    /// streaming endpoint. `page` and `page_size` are intentionally ignored;
368    /// optional `limit` still caps the total result size.
369    async fn query_batches_all(
370        &self,
371        name: &str,
372        req: &QueryRequest,
373    ) -> Result<Vec<RecordBatch>, AppError> {
374        let st = self.dataset(name)?;
375        self.query_batches_inner(st, req, None).await
376    }
377
378    async fn query_batches_inner(
379        &self,
380        st: Arc<DatasetState>,
381        req: &QueryRequest,
382        page_window: Option<(usize, usize)>,
383    ) -> Result<Vec<RecordBatch>, AppError> {
384        let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
385
386        // In-memory hot paths only fire when:
387        //   - the dataset is materialised,
388        //   - the caller did not ask for ordering,
389        //   - and did not ask for a hard `limit` cap on a paged request.
390        // Both of the latter two require sorting / capping that the SQL
391        // engine handles uniformly across all data types.
392        let can_fast_path = !st.lazy
393            && req.order_by.is_empty()
394            && (page_window.is_none() || req.limit.is_none())
395            && req.group_by.is_empty()
396            && !req.distinct;
397
398        if can_fast_path {
399            let total = st.num_rows();
400
401            // No predicates -> O(1) raw Arrow slices over resident batches,
402            // no engine overhead.
403            if req.predicates.is_empty() {
404                if page_window.is_none() && req.limit.is_none() {
405                    return st
406                        .data
407                        .iter()
408                        .cloned()
409                        .map(|batch| project(&st.schema, batch, &req.columns))
410                        .collect();
411                }
412                let start = offset.min(total);
413                let len = limit.min(total - start);
414                let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
415                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
416            }
417
418            // Index fast path: if every predicate is eq/in on an indexed column,
419            // resolve via the pre-built equality index.
420            if let Some(rows) = try_index(&st.index, &req.predicates) {
421                let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
422                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
423            }
424        }
425
426        // Fallback (and only path for lazy datasets): DataFusion SQL.
427        let (sql, params) = match page_window {
428            Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
429            None => build_query_stream_sql(&st.schema, req)?,
430        };
431        let mut df = self.ctx.sql(&sql).await?;
432        if !params.is_empty() {
433            df = df.with_param_values(params)?;
434        }
435        let batches = df.collect().await?;
436        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
437            let schema = batches
438                .first()
439                .map(|b| b.schema())
440                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
441            return Ok(vec![RecordBatch::new_empty(schema)]);
442        }
443        Ok(batches)
444    }
445}
446
447fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
448    let schema = batches
449        .first()
450        .map(|batch| batch.schema())
451        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
452    let (mut writer, stream) = arrow_ipc_stream_channel(8);
453
454    tokio::task::spawn_blocking(move || {
455        let result = (|| -> Result<(), AppError> {
456            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
457            for batch in batches {
458                if batch.num_rows() > 0 {
459                    w.write(&batch)?;
460                }
461            }
462            w.finish()?;
463            Ok(())
464        })();
465        if let Err(err) = result {
466            log::error!("datafusion arrow stream failed: {err}");
467            writer.send_error(err);
468        }
469    });
470
471    stream
472}
473
474impl Store {
475    /// Return the number of rows matching `req.predicates`. With no
476    /// predicates this is a cheap metadata lookup on materialised datasets
477    /// and a `SELECT COUNT(*)` on lazy ones.
478    pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
479        let st = self.dataset(name)?;
480
481        if !st.lazy {
482            // No predicates → resident row count, no scan.
483            if req.predicates.is_empty() {
484                return Ok(st.num_rows() as i64);
485            }
486            // Index fast path: same eligibility rules as `query`.
487            if let Some(rows) = try_index(&st.index, &req.predicates) {
488                return Ok(rows.len() as i64);
489            }
490        }
491
492        // Fallback: DataFusion SQL — same predicate translation as `query`,
493        // with predicate values bound as typed parameters.
494        let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
495        let mut df = self.ctx.sql(&sql).await?;
496        if !params.is_empty() {
497            df = df.with_param_values(params)?;
498        }
499        let batches = df.collect().await?;
500        let n = batches
501            .first()
502            .and_then(|b| {
503                b.column(0)
504                    .as_any()
505                    .downcast_ref::<arrow::array::Int64Array>()
506            })
507            .filter(|a| !a.is_empty())
508            .map(|a| a.value(0))
509            .unwrap_or(0);
510        Ok(n)
511    }
512}
513
514// ---------------------------------------------------------------------------
515// Dataset loading
516// ---------------------------------------------------------------------------
517
518async fn build_dataset(
519    d: &DatasetConfig,
520    ctx: &SessionContext,
521) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
522    // Lazy datasets: register a ListingTable straight against the source
523    // and skip the materialise / index / partition pipeline below. Delta
524    // is rejected — deltalake reads the transaction log eagerly to know
525    // which parquet files are current, so "lazy delta" doesn't map onto
526    // ListingTable cleanly.
527    if d.lazy {
528        match (d.source.kind, d.source.is_s3()) {
529            (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
530            (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
531            (SourceKind::Delta, _) => {
532                return Err(AppError::Internal(format!(
533                    "dataset '{}': lazy mode is not supported for delta sources",
534                    d.name
535                )));
536            }
537        }
538    }
539
540    // Fetch raw RecordBatches from whichever backing store the dataset
541    // is configured to use. All four (parquet, delta) x (local, s3)
542    // combinations converge into one Vec<RecordBatch>; the materialisation
543    // / indexing / partitioning logic below is shared.
544    let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
545        (SourceKind::Parquet, false) => read_local_parquet(d)?,
546        (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
547        (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
548        (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
549    };
550    if raw_batches.is_empty() {
551        return Err(AppError::Internal(format!(
552            "dataset '{}': source produced no batches",
553            d.name
554        )));
555    }
556
557    let chunks = raw_batches;
558    let arrow_sch = chunks[0].schema();
559
560    // Build DatasetSchema from the Arrow schema.
561    let columns: Vec<ColumnInfo> = arrow_sch
562        .fields()
563        .iter()
564        .map(|f| {
565            let dt = f.data_type();
566            ColumnInfo {
567                name: f.name().clone(),
568                logical: arrow_to_logical(dt),
569                sql_type: format!("{dt:?}"),
570                nullable: f.is_nullable(),
571            }
572        })
573        .collect();
574    let schema = DatasetSchema::new(&d.name, columns);
575
576    // Build the equality index per the per-dataset policy. Operates on the
577    // chunked representation directly so we never have to materialise a
578    // single concatenated batch (which would double peak RSS on wide
579    // schemas — see `DatasetState` docs).
580    let index = build_eq_index_with_policy(&chunks, &d.index);
581
582    // Partition for parallel scans by the SQL fallback path. We distribute
583    // the existing batches round-robin across `n_parts` partitions instead
584    // of re-slicing a concatenated batch — `clone()` on a RecordBatch is
585    // an Arc-clone of the column buffers, not a copy.
586    let n_parts = std::thread::available_parallelism()
587        .map(|n| n.get())
588        .unwrap_or(4);
589    let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
590    for (i, b) in chunks.iter().enumerate() {
591        if b.num_rows() == 0 {
592            continue;
593        }
594        parts[i % n_parts].push(b.clone());
595    }
596    parts.retain(|p| !p.is_empty());
597    let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
598
599    let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
600    let mem_mb: usize = chunks
601        .iter()
602        .flat_map(|b| b.columns().iter())
603        .map(|c| c.get_buffer_memory_size())
604        .sum::<usize>()
605        / 1_048_576;
606    log::info!(
607        "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
608        d.name,
609        d.source.kind.as_str(),
610        total_rows,
611        schema.columns.len(),
612        mem_mb,
613        chunks.len(),
614        index.len()
615    );
616
617    Ok((
618        DatasetState {
619            schema,
620            data: chunks,
621            arrow_schema: arrow_sch,
622            index,
623            lazy: false,
624        },
625        provider,
626    ))
627}
628
629/// Build a lazy state + `ListingTable` provider for a local parquet dataset.
630/// The dataset is never read into RAM; DataFusion streams row groups on
631/// each query. The returned `DatasetState.data` is an empty `Vec` —
632/// `arrow_schema` still carries the inferred Arrow schema for discovery.
633async fn build_lazy_local_parquet(
634    d: &DatasetConfig,
635    ctx: &SessionContext,
636) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
637    let (url, part_keys) = lazy_local_listing(d)?;
638
639    let mut opts =
640        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
641    if !part_keys.is_empty() {
642        opts = opts.with_table_partition_cols(
643            part_keys
644                .iter()
645                .map(|k| (k.clone(), DataType::Utf8))
646                .collect(),
647        );
648    }
649
650    let session_state = ctx.state();
651    // `infer_schema` returns the *file* schema (without partition columns);
652    // `ListingTable` appends the declared partition columns on top.
653    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
654        AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
655    })?;
656
657    let cfg = ListingTableConfig::new(url)
658        .with_listing_options(opts)
659        .with_schema(file_schema.clone());
660    let table = ListingTable::try_new(cfg).map_err(|e| {
661        AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
662    })?;
663    let provider: Arc<dyn TableProvider> = Arc::new(table);
664
665    // Discovery schema = file columns + partition columns (Utf8).
666    let mut fields: Vec<Field> = file_schema
667        .fields()
668        .iter()
669        .map(|f| f.as_ref().clone())
670        .collect();
671    for k in &part_keys {
672        if !fields.iter().any(|f| f.name() == k) {
673            fields.push(Field::new(k, DataType::Utf8, false));
674        }
675    }
676    let arrow_sch = Arc::new(Schema::new(fields));
677
678    let columns: Vec<ColumnInfo> = arrow_sch
679        .fields()
680        .iter()
681        .map(|f| {
682            let dt = f.data_type();
683            ColumnInfo {
684                name: f.name().clone(),
685                logical: arrow_to_logical(dt),
686                sql_type: format!("{dt:?}"),
687                nullable: f.is_nullable(),
688            }
689        })
690        .collect();
691    let schema = DatasetSchema::new(&d.name, columns);
692
693    log::info!(
694        "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
695        d.name,
696        d.source.kind.as_str(),
697        schema.columns.len(),
698        part_keys.len()
699    );
700
701    Ok((
702        DatasetState {
703            schema,
704            data: Vec::new(),
705            arrow_schema: arrow_sch,
706            index: EqIndex::default(),
707            lazy: true,
708        },
709        provider,
710    ))
711}
712
713/// Resolve a local lazy-parquet location into a single `ListingTableUrl`
714/// rooted at the dataset base plus the ordered hive partition keys (if any).
715/// Handles three shapes: a glob (`root/city=*/*.parquet`), a directory
716/// (hive root or flat folder of parquets), and a single `*.parquet` file.
717fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
718    let loc = &d.source.location;
719
720    if loc.contains('*') || loc.contains('?') || loc.contains('[') {
721        let parts: Vec<&str> = loc.split('/').collect();
722        let first_wild = parts
723            .iter()
724            .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
725            .unwrap_or(parts.len());
726        let base = parts[..first_wild].join("/");
727        let base = if base.is_empty() {
728            "/".to_string()
729        } else {
730            base
731        };
732        // Partition keys: `key=…` components between the base and the file
733        // pattern (the final component).
734        let upper = parts.len().saturating_sub(1);
735        let keys: Vec<String> = parts[first_wild.min(upper)..upper]
736            .iter()
737            .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
738            .filter(|k| !k.is_empty())
739            .collect();
740        return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
741    }
742
743    let path = std::path::Path::new(loc);
744    if path.is_dir() {
745        let keys = discover_hive_keys(path);
746        return Ok((dir_url(path, d)?, keys));
747    }
748
749    let url = ListingTableUrl::parse(loc)
750        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
751    Ok((url, Vec::new()))
752}
753
754/// Parse a directory path into a `ListingTableUrl` (trailing slash so
755/// DataFusion treats it as a directory root, not a single object).
756fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
757    let s = path.to_str().ok_or_else(|| {
758        AppError::Internal(format!(
759            "dataset '{}': non-utf8 path {}",
760            d.name,
761            path.display()
762        ))
763    })?;
764    let s = if s.ends_with('/') {
765        s.to_string()
766    } else {
767        format!("{s}/")
768    };
769    ListingTableUrl::parse(&s)
770        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
771}
772
773/// Walk down a directory following the first `key=value` subdirectory at
774/// each level to discover the ordered hive partition keys. Returns an empty
775/// vec for a flat (non-partitioned) folder.
776fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
777    let mut keys = Vec::new();
778    let mut cur = base.to_path_buf();
779    loop {
780        let Ok(rd) = std::fs::read_dir(&cur) else {
781            break;
782        };
783        let mut next: Option<(String, std::path::PathBuf)> = None;
784        for entry in rd.flatten() {
785            let p = entry.path();
786            if !p.is_dir() {
787                continue;
788            }
789            let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
790                continue;
791            };
792            if let Some((k, v)) = name.split_once('=')
793                && !k.is_empty()
794                && !v.is_empty()
795            {
796                next = Some((k.to_string(), p));
797                break;
798            }
799        }
800        match next {
801            Some((k, p)) => {
802                keys.push(k);
803                cur = p;
804            }
805            None => break,
806        }
807    }
808    keys
809}
810
811/// Lazy S3 parquet: register the dataset's S3 object store on `ctx`, then
812/// build a `ListingTable` rooted at the `s3://bucket/prefix/` location with
813/// any discovered hive partition columns. DataFusion does the directory
814/// listing through the registered store and streams row groups on each
815/// query — no local enumeration needed.
816async fn build_lazy_s3_parquet(
817    d: &DatasetConfig,
818    ctx: &SessionContext,
819) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
820    register_s3_object_store(d, ctx)?;
821
822    let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
823
824    // Discovery schema = file columns + partition columns (Utf8).
825    let mut fields: Vec<Field> = file_schema
826        .fields()
827        .iter()
828        .map(|f| f.as_ref().clone())
829        .collect();
830    for k in &part_keys {
831        if !fields.iter().any(|f| f.name() == k) {
832            fields.push(Field::new(k, DataType::Utf8, false));
833        }
834    }
835    let arrow_sch = Arc::new(Schema::new(fields));
836
837    let columns: Vec<ColumnInfo> = arrow_sch
838        .fields()
839        .iter()
840        .map(|f| {
841            let dt = f.data_type();
842            ColumnInfo {
843                name: f.name().clone(),
844                logical: arrow_to_logical(dt),
845                sql_type: format!("{dt:?}"),
846                nullable: f.is_nullable(),
847            }
848        })
849        .collect();
850    let schema = DatasetSchema::new(&d.name, columns);
851
852    log::info!(
853        "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
854        d.name,
855        d.source.kind.as_str(),
856        schema.columns.len(),
857        part_keys.len()
858    );
859
860    Ok((
861        DatasetState {
862            schema,
863            data: Vec::new(),
864            arrow_schema: arrow_sch,
865            index: EqIndex::default(),
866            lazy: true,
867        },
868        provider,
869    ))
870}
871
872/// Build a `ListingTable` provider for an S3 parquet source, resolving the
873/// base prefix and hive partition keys via [`s3_listing`]. The registered
874/// S3 object store must already be present on `ctx`. Returns the provider,
875/// the inferred *file* schema (without partition columns), and the ordered
876/// partition keys.
877async fn build_s3_listing_table(
878    d: &DatasetConfig,
879    ctx: &SessionContext,
880) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
881    let (url, part_keys) = s3_listing(d, ctx).await?;
882
883    let mut opts =
884        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
885    if !part_keys.is_empty() {
886        opts = opts.with_table_partition_cols(
887            part_keys
888                .iter()
889                .map(|k| (k.clone(), DataType::Utf8))
890                .collect(),
891        );
892    }
893
894    let session_state = ctx.state();
895    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
896        AppError::Internal(format!(
897            "dataset '{}': infer parquet schema on s3: {e}",
898            d.name
899        ))
900    })?;
901
902    let cfg = ListingTableConfig::new(url)
903        .with_listing_options(opts)
904        .with_schema(file_schema.clone());
905    let table = ListingTable::try_new(cfg).map_err(|e| {
906        AppError::Internal(format!(
907            "dataset '{}': ListingTable::try_new (s3): {e}",
908            d.name
909        ))
910    })?;
911    Ok((Arc::new(table), file_schema, part_keys))
912}
913
914/// Resolve an S3 parquet source into a `(base ListingTableUrl, partition
915/// keys)` pair. Hive keys come from the location glob when present
916/// (`s3://bucket/events/year=*/...`), otherwise — for a plain prefix — they
917/// are discovered by listing the registered object store. Honours the
918/// dataset's `partitioning` mode (`auto` / `hive` / `none`).
919async fn s3_listing(
920    d: &DatasetConfig,
921    ctx: &SessionContext,
922) -> Result<(ListingTableUrl, Vec<String>), AppError> {
923    let s3 = d.s3.clone().unwrap_or_default();
924    let want_partitions = !matches!(s3.partitioning, Partitioning::None);
925    let loc = &d.source.location;
926
927    if d.source.has_glob() {
928        let (base, keys) = split_glob_base_keys(loc);
929        let base = format!("{}/", base.trim_end_matches('/'));
930        let url = ListingTableUrl::parse(&base).map_err(|e| {
931            AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
932        })?;
933        let keys = if want_partitions { keys } else { Vec::new() };
934        return Ok((url, keys));
935    }
936
937    let base = if loc.ends_with('/') {
938        loc.clone()
939    } else {
940        format!("{loc}/")
941    };
942    let url = ListingTableUrl::parse(&base).map_err(|e| {
943        AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
944    })?;
945    let keys = if want_partitions {
946        discover_s3_hive_keys(ctx, &url).await
947    } else {
948        Vec::new()
949    };
950    Ok((url, keys))
951}
952
953/// Split a glob location into its non-wildcard base path and the ordered
954/// hive partition keys (`key=…` directory components between the base and
955/// the final file pattern). Works for both local and `s3://` paths.
956fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
957    let parts: Vec<&str> = loc.split('/').collect();
958    let first_wild = parts
959        .iter()
960        .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
961        .unwrap_or(parts.len());
962    let base = parts[..first_wild].join("/");
963    let base = if base.is_empty() {
964        "/".to_string()
965    } else {
966        base
967    };
968    let upper = parts.len().saturating_sub(1);
969    let keys: Vec<String> = parts[first_wild.min(upper)..upper]
970        .iter()
971        .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
972        .filter(|k| !k.is_empty())
973        .collect();
974    (base, keys)
975}
976
977/// Discover ordered hive partition keys for an S3 prefix by walking the
978/// object store one `key=value/` level at a time via delimiter listings.
979/// Best-effort: any listing error stops discovery and returns what was found
980/// so far (empty = treat as a flat folder).
981async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
982    let store = match ctx.runtime_env().object_store(url.object_store()) {
983        Ok(s) => s,
984        Err(_) => return Vec::new(),
985    };
986    let mut keys = Vec::new();
987    let mut prefix = url.prefix().clone();
988    loop {
989        let listing = match store.list_with_delimiter(Some(&prefix)).await {
990            Ok(l) => l,
991            Err(_) => break,
992        };
993        let mut next: Option<object_store::path::Path> = None;
994        for cp in &listing.common_prefixes {
995            if let Some(seg) = cp.parts().next_back() {
996                let seg = seg.as_ref().to_string();
997                if let Some((k, v)) = seg.split_once('=')
998                    && !k.is_empty()
999                    && !v.is_empty()
1000                {
1001                    keys.push(k.to_string());
1002                    next = Some(cp.clone());
1003                    break;
1004                }
1005            }
1006        }
1007        match next {
1008            Some(p) => prefix = p,
1009            None => break,
1010        }
1011    }
1012    keys
1013}
1014
1015/// Original local-parquet code path — sync file I/O. We set a large reader
1016/// batch size so wide schemas (hundreds of columns) don't pay per-array
1017/// metadata overhead on thousands of small (default 1024-row) batches.
1018///
1019/// Two memory-saving knobs are applied here:
1020///
1021/// * **Column projection** — if `d.columns` is non-empty, only those
1022///   columns are decoded; everything else is skipped at the parquet reader
1023///   level (no Arrow array is ever allocated for the dropped columns).
1024/// * **Dictionary preservation** — Utf8 columns whose parquet column chunks
1025///   carry a dictionary page are materialised as Arrow
1026///   `Dictionary(Int32, Utf8)` instead of plain `Utf8`. Low-cardinality
1027///   string columns (state, country, severity, …) stay represented as
1028///   `n_unique` string slots plus an Int32 index per row instead of
1029///   `n_rows` independent strings — typically 10×–50× smaller for
1030///   real-world data.
1031fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1032    let files = d.resolve_local_parquet_files()?;
1033    let mut all = Vec::new();
1034    let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1035        None
1036    } else {
1037        Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1038    };
1039
1040    for f in &files {
1041        let file = std::fs::File::open(f)
1042            .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1043
1044        // First pass: peek the parquet metadata + default Arrow schema so we
1045        // can (a) decide a column projection and (b) override Utf8 columns
1046        // that are dictionary-encoded in the file so the reader materialises
1047        // them as Arrow Dictionary arrays instead of expanding to plain Utf8.
1048        let probe = ParquetRecordBatchReaderBuilder::try_new(
1049            file.try_clone()
1050                .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1051        )?;
1052        let parquet_schema = probe.parquet_schema().clone();
1053        let arrow_schema = probe.schema().clone();
1054        let metadata = probe.metadata().clone();
1055        drop(probe);
1056
1057        // Column projection (top-level / leaf indices for flat schemas).
1058        let projection = if let Some(w) = &wanted {
1059            let indices: Vec<usize> = arrow_schema
1060                .fields()
1061                .iter()
1062                .enumerate()
1063                .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1064                .map(|(i, _)| i)
1065                .collect();
1066            if indices.is_empty() {
1067                return Err(AppError::Internal(format!(
1068                    "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1069                    d.name,
1070                    d.columns,
1071                    f.display()
1072                )));
1073            }
1074            ProjectionMask::roots(&parquet_schema, indices)
1075        } else {
1076            ProjectionMask::all()
1077        };
1078
1079        // Dictionary override: any Utf8 column whose first row group carries
1080        // a dictionary page is re-typed to Dictionary(Int32, Utf8). The
1081        // override schema must still describe every column in the parquet
1082        // file (projection is applied separately). Skipped entirely when
1083        // the dataset has `dict_encode = false` — escape hatch for cases
1084        // where the override interacts badly with null propagation in the
1085        // downstream engine.
1086        let mut new_fields: Vec<Field> = arrow_schema
1087            .fields()
1088            .iter()
1089            .map(|f| f.as_ref().clone())
1090            .collect();
1091        if d.dict_encode
1092            && let Some(rg0) = metadata.row_groups().first()
1093        {
1094            for (i, fld) in arrow_schema.fields().iter().enumerate() {
1095                if !matches!(
1096                    fld.data_type(),
1097                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1098                ) {
1099                    continue;
1100                }
1101                if let Some(col) = rg0.columns().get(i)
1102                    && col.dictionary_page_offset().is_some()
1103                {
1104                    new_fields[i] = Field::new(
1105                        fld.name(),
1106                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1107                        fld.is_nullable(),
1108                    );
1109                }
1110            }
1111        }
1112        let forced_schema = Arc::new(Schema::new(new_fields));
1113
1114        let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1115        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1116            .with_batch_size(65_536)
1117            .with_projection(projection)
1118            .build()?;
1119        // Hive-style partition columns (`city=NYC/…`) live in the path, not
1120        // the file. Fold them in as constant Utf8 columns so they show up in
1121        // the schema and are queryable — matching the DuckDB backend.
1122        let pairs = hive_pairs(f);
1123        for batch in reader {
1124            let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1125            all.push(if pairs.is_empty() {
1126                batch
1127            } else {
1128                append_partition_cols(&batch, &pairs)?
1129            });
1130        }
1131    }
1132    if all.is_empty() {
1133        return Err(AppError::Internal(format!(
1134            "dataset '{}': parquet source is empty",
1135            d.name
1136        )));
1137    }
1138    Ok(all)
1139}
1140
1141/// Ordered hive-style partition `(key, value)` pairs encoded in a path, i.e.
1142/// directory components shaped like `key=value` (e.g. `year=2024/city=NYC`).
1143fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1144    path.components()
1145        .filter_map(|c| c.as_os_str().to_str())
1146        .filter_map(|seg| {
1147            let (k, v) = seg.split_once('=')?;
1148            if k.is_empty() || v.is_empty() || v.contains('=') {
1149                return None;
1150            }
1151            Some((k.to_string(), v.to_string()))
1152        })
1153        .collect()
1154}
1155
1156/// Append constant Utf8 columns for each hive partition pair. A partition
1157/// key that collides with a real file column is skipped (the file wins).
1158fn append_partition_cols(
1159    batch: &RecordBatch,
1160    pairs: &[(String, String)],
1161) -> Result<RecordBatch, AppError> {
1162    let n = batch.num_rows();
1163    let mut fields: Vec<Field> = batch
1164        .schema()
1165        .fields()
1166        .iter()
1167        .map(|f| f.as_ref().clone())
1168        .collect();
1169    let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1170    for (k, v) in pairs {
1171        if fields.iter().any(|f| f.name() == k) {
1172            continue;
1173        }
1174        fields.push(Field::new(k, DataType::Utf8, false));
1175        cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1176    }
1177    RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1178        .map_err(|e| AppError::Internal(e.to_string()))
1179}
1180
1181/// Register an `AmazonS3` object store on the SessionContext, build a
1182/// `ListingTable` (with any hive partition columns) over the dataset prefix,
1183/// and stream the whole dataset back through `DataFrame::collect`. Using a
1184/// ListingTable here — rather than a bare `read_parquet` — means S3 sources
1185/// get the same hive-partition handling as local parquet.
1186async fn read_s3_parquet(
1187    d: &DatasetConfig,
1188    ctx: &SessionContext,
1189) -> Result<Vec<RecordBatch>, AppError> {
1190    register_s3_object_store(d, ctx)?;
1191    let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1192    let df = ctx
1193        .read_table(provider)
1194        .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1195    Ok(df.collect().await?)
1196}
1197
1198/// Open a Delta table (local or S3) and stream every row back as a Vec of
1199/// `RecordBatch`. We materialise eagerly so the rest of the backend can
1200/// treat all datasets uniformly (single in-memory batch + eq-index).
1201async fn read_delta(
1202    d: &DatasetConfig,
1203    opts: HashMap<String, String>,
1204) -> Result<Vec<RecordBatch>, AppError> {
1205    let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1206        AppError::Internal(format!(
1207            "dataset '{}': bad delta location '{}': {e}",
1208            d.name, d.source.location
1209        ))
1210    })?;
1211    let table = deltalake::open_table_with_storage_options(url, opts)
1212        .await
1213        .map_err(|e| {
1214            AppError::Internal(format!(
1215                "dataset '{}': delta open '{}': {e}",
1216                d.name, d.source.location
1217            ))
1218        })?;
1219    let provider = table.table_provider().await.map_err(|e| {
1220        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1221    })?;
1222    // Drive a full scan via a throwaway SessionContext so we end up with
1223    // an in-memory Vec<RecordBatch> the shared materialise path can use.
1224    let scan_ctx = SessionContext::new();
1225    let df = scan_ctx
1226        .read_table(provider)
1227        .map_err(|e| AppError::Internal(format!("dataset '{}': delta read_table: {e}", d.name)))?;
1228    Ok(df.collect().await?)
1229}
1230
1231/// Build the storage-options HashMap that `deltalake::open_table_with_storage_options`
1232/// expects for S3 access. Keys mirror the AWS env-var names; deltalake
1233/// passes them through to object_store internally.
1234fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1235    let creds = d.resolved_creds();
1236    let region = d.resolved_region();
1237    let s3 = d.s3.clone().unwrap_or_default();
1238    let (bucket, _) = d.source.s3_bucket()?;
1239
1240    let mut opts = HashMap::new();
1241    opts.insert("AWS_REGION".into(), region);
1242    if let Some(ep) = s3.effective_endpoint(bucket) {
1243        opts.insert("AWS_ENDPOINT_URL".into(), ep);
1244    }
1245    if s3.allow_http {
1246        opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1247    }
1248    opts.insert(
1249        "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1250        (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1251    );
1252    if let Some(k) = creds.access_key_id {
1253        opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1254    }
1255    if let Some(s) = creds.secret_access_key {
1256        opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1257    }
1258    if let Some(t) = creds.session_token {
1259        opts.insert("AWS_SESSION_TOKEN".into(), t);
1260    }
1261    // Read-only paths don't need the S3 lock-provider plumbing.
1262    opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1263    Ok(opts)
1264}
1265
1266/// Construct an `AmazonS3` object_store from the dataset's `[dataset.s3]`
1267/// block + resolved credentials and register it on `ctx` under
1268/// `s3://bucket/`.
1269fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1270    let (bucket, _key) = d.source.s3_bucket()?;
1271    let creds = d.resolved_creds();
1272    let region = d.resolved_region();
1273    let s3 = d.s3.clone().unwrap_or_default();
1274
1275    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1276        AppError::Internal(format!(
1277            "dataset '{}': build S3 store for '{bucket}': {e}",
1278            d.name
1279        ))
1280    })?;
1281
1282    let url = Url::parse(&format!("s3://{bucket}"))
1283        .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1284    ctx.register_object_store(&url, Arc::new(store));
1285    Ok(())
1286}
1287
1288fn build_s3(
1289    bucket: &str,
1290    region: &str,
1291    s3: &S3Config,
1292    creds: &ResolvedCreds,
1293) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1294    let mut b = AmazonS3Builder::new()
1295        .with_bucket_name(bucket)
1296        .with_region(region)
1297        .with_allow_http(s3.allow_http)
1298        .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1299    if let Some(ep) = s3.effective_endpoint(bucket) {
1300        b = b.with_endpoint(ep);
1301    }
1302    if let Some(k) = creds.access_key_id.as_deref() {
1303        b = b.with_access_key_id(k);
1304    }
1305    if let Some(s) = creds.secret_access_key.as_deref() {
1306        b = b.with_secret_access_key(s);
1307    }
1308    if let Some(t) = creds.session_token.as_deref() {
1309        b = b.with_token(t);
1310    }
1311    b.build()
1312}
1313
1314fn arrow_to_logical(dt: &DataType) -> LogicalType {
1315    match dt {
1316        DataType::Boolean => LogicalType::Bool,
1317        DataType::Int8
1318        | DataType::Int16
1319        | DataType::Int32
1320        | DataType::Int64
1321        | DataType::UInt8
1322        | DataType::UInt16
1323        | DataType::UInt32
1324        | DataType::UInt64 => LogicalType::Int,
1325        DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1326        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1327        // Dictionary-encoded strings are reported as plain strings — clients
1328        // (and the rest of the backend) shouldn't have to care that we keep
1329        // a compressed representation in memory.
1330        DataType::Dictionary(_, v)
1331            if matches!(
1332                v.as_ref(),
1333                DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1334            ) =>
1335        {
1336            LogicalType::Utf8
1337        }
1338        DataType::Date32
1339        | DataType::Date64
1340        | DataType::Time32(_)
1341        | DataType::Time64(_)
1342        | DataType::Timestamp(_, _)
1343        | DataType::Duration(_)
1344        | DataType::Interval(_) => LogicalType::Temporal,
1345        _ => LogicalType::Other,
1346    }
1347}
1348
1349// ---------------------------------------------------------------------------
1350// Per-batch projection
1351// ---------------------------------------------------------------------------
1352
1353fn project(
1354    schema: &DatasetSchema,
1355    batch: RecordBatch,
1356    columns: &[String],
1357) -> Result<RecordBatch, AppError> {
1358    if columns.is_empty() {
1359        return Ok(batch);
1360    }
1361    let indices: Vec<usize> = columns
1362        .iter()
1363        .map(|c| {
1364            schema
1365                .find(c)
1366                .map(|info| schema.by_name[&info.name.to_lowercase()])
1367        })
1368        .collect::<Result<_, _>>()?;
1369    let fields: Vec<Field> = indices
1370        .iter()
1371        .map(|&i| batch.schema().field(i).clone())
1372        .collect();
1373    let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1374    Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1375}
1376
1377// ---------------------------------------------------------------------------
1378// SQL builder
1379// ---------------------------------------------------------------------------
1380
1381/// Accumulates the typed literal values for a parameterised query.
1382///
1383/// Predicate values are never interpolated into the SQL text. Instead each
1384/// value is pushed here and the builder emits a positional placeholder
1385/// (`$1`, `$2`, …) referencing it. The collected [`ScalarValue`]s are bound
1386/// to the logical plan via [`DataFrame::with_param_values`], so user input
1387/// reaches the engine as typed scalars and can never alter the query
1388/// structure (no SQL injection surface, no escaping to get wrong).
1389#[derive(Default)]
1390struct Params {
1391    values: Vec<ScalarValue>,
1392}
1393
1394impl Params {
1395    fn new() -> Self {
1396        Self::default()
1397    }
1398
1399    /// Bind `v` and return its `$N` placeholder token.
1400    fn bind(&mut self, v: ScalarValue) -> String {
1401        self.values.push(v);
1402        format!("${}", self.values.len())
1403    }
1404
1405    fn into_values(self) -> Vec<ScalarValue> {
1406        self.values
1407    }
1408}
1409
1410fn build_query_sql(
1411    schema: &DatasetSchema,
1412    req: &QueryRequest,
1413    max_page_size: u64,
1414) -> Result<(String, Vec<ScalarValue>), AppError> {
1415    let (limit, offset) = req.effective_limit_offset(max_page_size);
1416    build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1417}
1418
1419fn build_query_stream_sql(
1420    schema: &DatasetSchema,
1421    req: &QueryRequest,
1422) -> Result<(String, Vec<ScalarValue>), AppError> {
1423    let suffix = req
1424        .limit
1425        .map(|limit| format!(" LIMIT {limit}"))
1426        .unwrap_or_default();
1427    build_query_sql_with_suffix(schema, req, &suffix)
1428}
1429
1430fn build_query_sql_with_suffix(
1431    schema: &DatasetSchema,
1432    req: &QueryRequest,
1433    suffix: &str,
1434) -> Result<(String, Vec<ScalarValue>), AppError> {
1435    let agg_plan = req.agg_plan(schema)?;
1436
1437    let cols = if let Some(plan) = &agg_plan {
1438        // Group cols, then aggregations, each aliased to the JSON output key.
1439        let mut parts: Vec<String> = plan
1440            .group_cols
1441            .iter()
1442            .map(|c| DatasetSchema::quote_ident(c))
1443            .collect();
1444        for a in &plan.aggs {
1445            let expr = a.sql_expr()?;
1446            parts.push(format!(
1447                "{expr} AS {}",
1448                DatasetSchema::quote_ident(&a.alias)
1449            ));
1450        }
1451        parts.join(", ")
1452    } else if req.columns.is_empty() {
1453        if req.distinct {
1454            "DISTINCT *".to_string()
1455        } else {
1456            "*".to_string()
1457        }
1458    } else {
1459        let list = req
1460            .columns
1461            .iter()
1462            .map(|c| {
1463                schema
1464                    .find(c)
1465                    .map(|info| DatasetSchema::quote_ident(&info.name))
1466            })
1467            .collect::<Result<Vec<_>, _>>()?
1468            .join(", ");
1469        if req.distinct {
1470            format!("DISTINCT {list}")
1471        } else {
1472            list
1473        }
1474    };
1475
1476    let mut params = Params::new();
1477    let clauses: Vec<String> = req
1478        .predicates
1479        .iter()
1480        .map(|p| pred_to_sql(schema, p, &mut params))
1481        .collect::<Result<_, _>>()?;
1482
1483    let table = DatasetSchema::quote_ident(&schema.name);
1484    let where_clause = if clauses.is_empty() {
1485        String::new()
1486    } else {
1487        format!(" WHERE {}", clauses.join(" AND "))
1488    };
1489    let group_clause = match &agg_plan {
1490        Some(p) => format!(
1491            " GROUP BY {}",
1492            p.group_cols
1493                .iter()
1494                .map(|c| DatasetSchema::quote_ident(c))
1495                .collect::<Vec<_>>()
1496                .join(", "),
1497        ),
1498        None => String::new(),
1499    };
1500    let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
1501        Some(s) => format!(" ORDER BY {s}"),
1502        None => String::new(),
1503    };
1504    let sql =
1505        format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{order_clause}{suffix}");
1506    Ok((sql, params.into_values()))
1507}
1508
1509fn build_count_sql(
1510    schema: &DatasetSchema,
1511    predicates: &[Predicate],
1512) -> Result<(String, Vec<ScalarValue>), AppError> {
1513    let mut params = Params::new();
1514    let clauses: Vec<String> = predicates
1515        .iter()
1516        .map(|p| pred_to_sql(schema, p, &mut params))
1517        .collect::<Result<_, _>>()?;
1518    let table = DatasetSchema::quote_ident(&schema.name);
1519    let where_clause = if clauses.is_empty() {
1520        String::new()
1521    } else {
1522        format!(" WHERE {}", clauses.join(" AND "))
1523    };
1524    let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
1525    Ok((sql, params.into_values()))
1526}
1527
1528fn pred_to_sql(
1529    schema: &DatasetSchema,
1530    pred: &Predicate,
1531    params: &mut Params,
1532) -> Result<String, AppError> {
1533    let info = schema.find(&pred.col)?;
1534    let col = DatasetSchema::quote_ident(&info.name);
1535
1536    match pred.op.as_str() {
1537        "is_null" => return Ok(format!("{col} IS NULL")),
1538        "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
1539        _ => {}
1540    }
1541
1542    let val = pred
1543        .val
1544        .as_ref()
1545        .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
1546
1547    if pred.op == "in" {
1548        let items = val
1549            .as_array()
1550            .filter(|a| !a.is_empty())
1551            .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
1552        let placeholders: Vec<String> = items
1553            .iter()
1554            .map(|item| Ok(params.bind(json_to_scalar(item)?)))
1555            .collect::<Result<_, AppError>>()?;
1556        return Ok(format!("{col} IN ({})", placeholders.join(", ")));
1557    }
1558
1559    let sql_op = match pred.op.as_str() {
1560        "eq" => "=",
1561        "neq" => "!=",
1562        "gt" => ">",
1563        "gte" => ">=",
1564        "lt" => "<",
1565        "lte" => "<=",
1566        "like" => "LIKE",
1567        "ilike" => "ILIKE",
1568        other => return Err(AppError::UnknownOperator(other.into())),
1569    };
1570    let placeholder = params.bind(json_to_scalar(val)?);
1571    Ok(format!("{col} {sql_op} {placeholder}"))
1572}
1573
1574/// Convert a JSON predicate value into a typed Arrow [`ScalarValue`] for
1575/// binding as a query parameter. The engine applies the usual numeric
1576/// widening / comparison coercion against the target column type.
1577fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
1578    match val {
1579        JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
1580        JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
1581        JsonValue::Null => Ok(ScalarValue::Null),
1582        JsonValue::Number(n) => {
1583            if let Some(i) = n.as_i64() {
1584                Ok(ScalarValue::Int64(Some(i)))
1585            } else if let Some(u) = n.as_u64() {
1586                Ok(ScalarValue::UInt64(Some(u)))
1587            } else if let Some(f) = n.as_f64() {
1588                Ok(ScalarValue::Float64(Some(f)))
1589            } else {
1590                Err(AppError::InvalidValue(
1591                    "unsupported numeric literal in predicate".into(),
1592                ))
1593            }
1594        }
1595        _ => Err(AppError::InvalidValue(
1596            "unsupported literal type in predicate".into(),
1597        )),
1598    }
1599}
1600
1601// ---------------------------------------------------------------------------
1602// Equality index — built once at startup, queried on every predicate request
1603// ---------------------------------------------------------------------------
1604
1605fn json_index_key(val: &JsonValue) -> Option<String> {
1606    match val {
1607        JsonValue::String(s) => Some(s.clone()),
1608        JsonValue::Number(n) => Some(n.to_string()),
1609        JsonValue::Bool(b) => Some(b.to_string()),
1610        _ => None,
1611    }
1612}
1613
1614fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1615    let mut out = Vec::new();
1616    let (mut i, mut j) = (0, 0);
1617    while i < a.len() && j < b.len() {
1618        match a[i].cmp(&b[j]) {
1619            Ordering::Equal => {
1620                out.push(a[i]);
1621                i += 1;
1622                j += 1;
1623            }
1624            Ordering::Less => i += 1,
1625            Ordering::Greater => j += 1,
1626        }
1627    }
1628    out
1629}
1630
1631fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1632    let mut out = Vec::with_capacity(a.len() + b.len());
1633    let (mut i, mut j) = (0, 0);
1634    while i < a.len() && j < b.len() {
1635        match a[i].cmp(&b[j]) {
1636            Ordering::Less => {
1637                out.push(a[i]);
1638                i += 1;
1639            }
1640            Ordering::Greater => {
1641                out.push(b[j]);
1642                j += 1;
1643            }
1644            Ordering::Equal => {
1645                out.push(a[i]);
1646                i += 1;
1647                j += 1;
1648            }
1649        }
1650    }
1651    out.extend_from_slice(&a[i..]);
1652    out.extend_from_slice(&b[j..]);
1653    out
1654}
1655
1656fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1657    if predicates.is_empty() || index.is_empty() {
1658        return None;
1659    }
1660
1661    // Fast path: a single `eq` predicate resolves to exactly one index bucket,
1662    // so borrow it directly instead of cloning the row-id vector.
1663    if let [pred] = predicates
1664        && pred.op.as_str() == "eq"
1665    {
1666        let col_lower = pred.col.to_lowercase();
1667        let col_map = index.get(&col_lower)?;
1668        let key = json_index_key(pred.val.as_ref()?)?;
1669        return Some(match col_map.get(&key) {
1670            Some(rows) => Cow::Borrowed(rows.as_slice()),
1671            None => Cow::Owned(Vec::new()),
1672        });
1673    }
1674
1675    let mut result: Option<Vec<u32>> = None;
1676    for pred in predicates {
1677        let col_lower = pred.col.to_lowercase();
1678        let col_map = index.get(&col_lower)?;
1679
1680        let rows: Vec<u32> = match pred.op.as_str() {
1681            "eq" => {
1682                let key = json_index_key(pred.val.as_ref()?)?;
1683                col_map.get(&key).cloned().unwrap_or_default()
1684            }
1685            "in" => {
1686                let items = pred.val.as_ref()?.as_array()?;
1687                let mut merged: Vec<u32> = Vec::new();
1688                for item in items {
1689                    if let Some(r) = col_map.get(&json_index_key(item)?) {
1690                        merged = union_sorted(&merged, r);
1691                    }
1692                }
1693                merged
1694            }
1695            _ => return None,
1696        };
1697
1698        result = Some(match result {
1699            None => rows,
1700            Some(r) => intersect_sorted(&r, &rows),
1701        });
1702    }
1703    result.map(Cow::Owned)
1704}
1705
1706/// Benchmark-only hooks for the equality-index hot path. Hidden from docs and
1707/// not part of the stable API — exposed solely so `benches/index.rs` can build
1708/// an `EqIndex` and exercise `try_index` directly.
1709#[doc(hidden)]
1710pub mod bench {
1711    use super::{EqIndex, FastMap, json_index_key, try_index};
1712    use datapress_core::models::Predicate;
1713    use serde_json::Value as JsonValue;
1714    use std::borrow::Cow;
1715
1716    /// Opaque equality index wrapper for benches (the inner alias is private).
1717    pub struct BenchIndex(EqIndex);
1718
1719    /// Build an index with a single `col` whose `val` bucket holds `rows`.
1720    /// The bucket key is derived with the same `json_index_key` the query path
1721    /// uses, so a matching `eq` predicate is guaranteed to hit.
1722    pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
1723        let key = json_index_key(val).expect("benchable index key");
1724        let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
1725        col_map.insert(key, rows);
1726        let mut index: EqIndex = EqIndex::default();
1727        index.insert(col.to_string(), col_map);
1728        BenchIndex(index)
1729    }
1730
1731    /// Resolve `predicates` against the index — the timed operation.
1732    pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1733        try_index(&idx.0, predicates)
1734    }
1735
1736    /// Reference implementation of the pre-`Cow` behaviour: always clones the
1737    /// matched bucket into an owned `Vec` (what `try_index` did before the
1738    /// single-`eq` borrow fast path). Used as the `clone-before` baseline so
1739    /// the borrow win is measured against the old allocation cost in-process.
1740    pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
1741        let [pred] = predicates else { return None };
1742        if pred.op.as_str() != "eq" {
1743            return None;
1744        }
1745        let col_lower = pred.col.to_lowercase();
1746        let col_map = idx.0.get(&col_lower)?;
1747        let key = json_index_key(pred.val.as_ref()?)?;
1748        Some(col_map.get(&key).cloned().unwrap_or_default())
1749    }
1750}
1751
1752/// Return rows `[offset, offset+limit)` from a chunked dataset by slicing
1753/// the underlying batches (zero-copy) and concatenating the (small) page.
1754fn slice_global(
1755    chunks: &[RecordBatch],
1756    schema: &Arc<Schema>,
1757    offset: usize,
1758    limit: usize,
1759) -> Result<RecordBatch, AppError> {
1760    if limit == 0 || chunks.is_empty() {
1761        return Ok(RecordBatch::new_empty(schema.clone()));
1762    }
1763    let mut out = Vec::new();
1764    let mut to_skip = offset;
1765    let mut remaining = limit;
1766    for b in chunks {
1767        if remaining == 0 {
1768            break;
1769        }
1770        let n = b.num_rows();
1771        if to_skip >= n {
1772            to_skip -= n;
1773            continue;
1774        }
1775        let take = remaining.min(n - to_skip);
1776        out.push(b.slice(to_skip, take));
1777        to_skip = 0;
1778        remaining -= take;
1779    }
1780    if out.is_empty() {
1781        return Ok(RecordBatch::new_empty(schema.clone()));
1782    }
1783    compute::concat_batches(schema, out.iter()).map_err(AppError::from)
1784}
1785
1786/// Materialise the page `rows[offset..offset+limit]` from a chunked dataset.
1787/// Row ids are global (across the concatenation of all chunks). We map each
1788/// requested row to its (chunk, local-index), `take` per chunk, then stitch
1789/// the per-chunk results back together preserving the original row order.
1790fn take_page(
1791    chunks: &[RecordBatch],
1792    schema: &Arc<Schema>,
1793    rows: &[u32],
1794    offset: usize,
1795    limit: usize,
1796) -> Result<RecordBatch, AppError> {
1797    let start = offset.min(rows.len());
1798    let len = limit.min(rows.len() - start);
1799    if len == 0 || chunks.is_empty() {
1800        return Ok(RecordBatch::new_empty(schema.clone()));
1801    }
1802
1803    // Prefix-sum table: `offsets[i]` is the first global row id of chunk `i`,
1804    // and `offsets.last()` is the total row count.
1805    let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
1806    let mut acc: u32 = 0;
1807    offsets.push(0);
1808    for b in chunks {
1809        acc = acc
1810            .checked_add(b.num_rows() as u32)
1811            .expect("row count exceeds u32::MAX");
1812        offsets.push(acc);
1813    }
1814
1815    // Bucket each global row id into the chunk that contains it, remembering
1816    // the original output position so we can restore page order at the end.
1817    let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
1818    for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
1819        let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
1820        let local = gid - offsets[bi];
1821        buckets[bi].push((out_pos as u32, local));
1822    }
1823
1824    // Per-chunk take, recording the destination index for each emitted row.
1825    let mut takens: Vec<RecordBatch> = Vec::new();
1826    let mut dest: Vec<u32> = Vec::with_capacity(len);
1827    for (bi, bucket) in buckets.iter().enumerate() {
1828        if bucket.is_empty() {
1829            continue;
1830        }
1831        let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
1832        let cols: Vec<ArrayRef> = chunks[bi]
1833            .columns()
1834            .iter()
1835            .map(|c| {
1836                arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
1837                    .map_err(AppError::from)
1838            })
1839            .collect::<Result<_, _>>()?;
1840        takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
1841        dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
1842    }
1843
1844    // Stitch per-chunk results then permute to restore the requested order.
1845    let stitched = compute::concat_batches(schema, takens.iter())?;
1846    let mut inv = vec![0u32; len];
1847    for (i, &d) in dest.iter().enumerate() {
1848        inv[d as usize] = i as u32;
1849    }
1850    let perm = UInt32Array::from(inv);
1851    let cols: Vec<ArrayRef> = stitched
1852        .columns()
1853        .iter()
1854        .map(|c| {
1855            arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
1856                .map_err(AppError::from)
1857        })
1858        .collect::<Result<_, _>>()?;
1859    RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
1860}
1861
1862/// Build the equality index per the dataset's policy, against the chunked
1863/// representation. Row ids are global across the concatenation of all
1864/// chunks (so they remain compatible with `take_page` / `slice_global`).
1865fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
1866    use rayon::prelude::*;
1867
1868    if cfg.mode == IndexMode::None || chunks.is_empty() {
1869        return EqIndex::default();
1870    }
1871
1872    let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
1873        Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
1874    } else {
1875        None
1876    };
1877
1878    let max_card = if cfg.mode == IndexMode::Auto {
1879        Some(cfg.max_cardinality)
1880    } else {
1881        None
1882    };
1883
1884    // Per-chunk starting global row id.
1885    let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
1886    let mut acc: u32 = 0;
1887    for b in chunks {
1888        batch_offsets.push(acc);
1889        acc = acc
1890            .checked_add(b.num_rows() as u32)
1891            .expect("row count exceeds u32::MAX");
1892    }
1893
1894    let schema = chunks[0].schema();
1895
1896    schema
1897        .fields()
1898        .par_iter()
1899        .enumerate()
1900        .filter_map(|(ci, field)| {
1901            let col_lower = field.name().to_lowercase();
1902            if let Some(a) = &allow
1903                && !a.contains_key(&col_lower)
1904            {
1905                return None;
1906            }
1907
1908            // Only build for index-friendly types; skip everything else
1909            // up-front so we don't pay the per-chunk dispatch cost.
1910            let dtype = field.data_type();
1911            let dict_utf8 = matches!(dtype,
1912                DataType::Dictionary(k, v)
1913                    if matches!(k.as_ref(), DataType::Int32)
1914                    && matches!(v.as_ref(), DataType::Utf8));
1915            match dtype {
1916                DataType::Utf8
1917                | DataType::Utf8View
1918                | DataType::Boolean
1919                | DataType::Int8
1920                | DataType::Int16
1921                | DataType::Int32
1922                | DataType::Int64 => {}
1923                _ if dict_utf8 => {}
1924                _ => return None,
1925            }
1926
1927            let mut map: FastMap<String, Vec<u32>> = FastMap::default();
1928
1929            for (bi, batch) in chunks.iter().enumerate() {
1930                let base = batch_offsets[bi];
1931                let col = batch.column(ci);
1932
1933                macro_rules! index_col {
1934                    ($arr_ty:ty) => {{
1935                        let arr = col.as_any().downcast_ref::<$arr_ty>()?;
1936                        for row in 0..arr.len() {
1937                            if arr.is_null(row) {
1938                                continue;
1939                            }
1940                            let key = arr.value(row).to_string();
1941                            let gid = base + row as u32;
1942                            if let Some(v) = map.get_mut(&key) {
1943                                v.push(gid);
1944                            } else {
1945                                if let Some(mc) = max_card {
1946                                    if map.len() >= mc {
1947                                        return None;
1948                                    }
1949                                }
1950                                map.insert(key, vec![gid]);
1951                            }
1952                        }
1953                    }};
1954                }
1955
1956                if dict_utf8 {
1957                    // Dictionary(Int32, Utf8): iterate keys + look up the
1958                    // string value from the (small) dictionary. We allocate
1959                    // the key string only when the value is new — repeated
1960                    // values reuse the existing HashMap entry by hash, but
1961                    // `HashMap::get_mut` still needs the key, so we use a
1962                    // borrowed lookup via `get` first to avoid the alloc.
1963                    let arr = col
1964                        .as_any()
1965                        .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
1966                        )?;
1967                    let keys = arr.keys();
1968                    let values = arr.values().as_any().downcast_ref::<StringArray>()?;
1969                    for row in 0..arr.len() {
1970                        if arr.is_null(row) {
1971                            continue;
1972                        }
1973                        let k = keys.value(row) as usize;
1974                        let s = values.value(k);
1975                        let gid = base + row as u32;
1976                        if let Some(v) = map.get_mut(s) {
1977                            v.push(gid);
1978                        } else {
1979                            if let Some(mc) = max_card
1980                                && map.len() >= mc
1981                            {
1982                                return None;
1983                            }
1984                            map.insert(s.to_string(), vec![gid]);
1985                        }
1986                    }
1987                } else {
1988                    match dtype {
1989                        DataType::Utf8 => index_col!(StringArray),
1990                        DataType::Utf8View => index_col!(StringViewArray),
1991                        DataType::Boolean => index_col!(BooleanArray),
1992                        DataType::Int8 => index_col!(Int8Array),
1993                        DataType::Int16 => index_col!(Int16Array),
1994                        DataType::Int32 => index_col!(Int32Array),
1995                        DataType::Int64 => index_col!(Int64Array),
1996                        _ => unreachable!(),
1997                    }
1998                }
1999            }
2000
2001            Some((col_lower, map))
2002        })
2003        .collect()
2004}
2005
2006// ---------------------------------------------------------------------------
2007// Serialise-time temporal cast: convert Timestamp/Date/Time columns to Utf8
2008// on the page batch right before JSON encoding. We deliberately do **not**
2009// pay this cost at load time — a `Date32` is 4 bytes per row, its ISO-8601
2010// rendering is ~10–24 bytes per row, and a wide dataset full of temporal
2011// columns would balloon resident RAM. The cast is applied per returned page
2012// after pagination, so the cost is paid only for rows the caller requested.
2013// ---------------------------------------------------------------------------
2014
2015/// Returns true for Arrow types that `write_value` can render directly. Any
2016/// type returning false is pre-cast to Utf8 in [`cast_for_serialize`] so the
2017/// JSON output is faithful rather than silently `null`.
2018fn writable_inline(dt: &DataType) -> bool {
2019    match dt {
2020        DataType::Utf8
2021        | DataType::LargeUtf8
2022        | DataType::Utf8View
2023        | DataType::Boolean
2024        | DataType::Int8
2025        | DataType::Int16
2026        | DataType::Int32
2027        | DataType::Int64
2028        | DataType::UInt8
2029        | DataType::UInt16
2030        | DataType::UInt32
2031        | DataType::UInt64
2032        | DataType::Float32
2033        | DataType::Float64
2034        | DataType::Decimal128(_, _)
2035        | DataType::Decimal256(_, _) => true,
2036        DataType::Dictionary(k, v)
2037            if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2038        {
2039            true
2040        }
2041        _ => false,
2042    }
2043}
2044
2045/// Cast any column whose dtype isn't directly writable by `write_value` to
2046/// `Utf8`, on the bounded page batch. Covers temporals (Timestamp/Date/Time)
2047/// — kept native in resident memory to save RAM — and also any exotic dtype
2048/// (Float16, Binary, List, Struct, Decimal-with-unsupported-precision, …)
2049/// so the JSON serializer never falls back to writing literal `null`.
2050fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2051    let schema = batch.schema();
2052    let to_cast: Vec<usize> = schema
2053        .fields()
2054        .iter()
2055        .enumerate()
2056        .filter_map(|(i, f)| {
2057            if writable_inline(f.data_type()) {
2058                None
2059            } else {
2060                Some(i)
2061            }
2062        })
2063        .collect();
2064    if to_cast.is_empty() {
2065        return Ok(batch.clone());
2066    }
2067    let new_fields: Vec<Field> = schema
2068        .fields()
2069        .iter()
2070        .enumerate()
2071        .map(|(i, f)| {
2072            if to_cast.contains(&i) {
2073                Field::new(f.name(), DataType::Utf8, f.is_nullable())
2074            } else {
2075                f.as_ref().clone()
2076            }
2077        })
2078        .collect();
2079    let new_schema = Arc::new(Schema::new(new_fields));
2080    let cols: Vec<ArrayRef> = batch
2081        .columns()
2082        .iter()
2083        .enumerate()
2084        .map(|(i, c)| {
2085            if to_cast.contains(&i) {
2086                compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2087            } else {
2088                Ok(c.clone())
2089            }
2090        })
2091        .collect::<Result<_, _>>()?;
2092    RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2093}
2094
2095// ---------------------------------------------------------------------------
2096// Compute helpers — retained for symmetry; reserved for future inline scan
2097// path. Currently the engine fallback handles all non-index queries.
2098// ---------------------------------------------------------------------------
2099
2100#[allow(dead_code)]
2101#[derive(Clone, Copy)]
2102enum CmpOp {
2103    Eq,
2104    Neq,
2105    Gt,
2106    Gte,
2107    Lt,
2108    Lte,
2109    Like,
2110    ILike,
2111}
2112
2113#[allow(dead_code)]
2114fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2115    let arr = col
2116        .as_any()
2117        .downcast_ref::<StringArray>()
2118        .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2119    let s = Scalar::new(StringArray::from(vec![val]));
2120    Ok(eq(arr, &s)?)
2121}
2122
2123#[allow(dead_code)]
2124fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2125    macro_rules! num_cmp {
2126        ($arr_type:ty, $cast:ty) => {{
2127            let n = val
2128                .as_f64()
2129                .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2130                as $cast;
2131            let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2132            let s = Scalar::new(<$arr_type>::from(vec![n]));
2133            Ok(match op {
2134                CmpOp::Eq => eq(arr, &s)?,
2135                CmpOp::Neq => neq(arr, &s)?,
2136                CmpOp::Gt => gt(arr, &s)?,
2137                CmpOp::Gte => gt_eq(arr, &s)?,
2138                CmpOp::Lt => lt(arr, &s)?,
2139                CmpOp::Lte => lt_eq(arr, &s)?,
2140                CmpOp::Like | CmpOp::ILike => {
2141                    return Err(AppError::InvalidValue(
2142                        "LIKE requires a string column".into(),
2143                    ));
2144                }
2145            })
2146        }};
2147    }
2148    match col.data_type() {
2149        DataType::Utf8 => {
2150            let s = val
2151                .as_str()
2152                .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2153            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2154            let sc = Scalar::new(StringArray::from(vec![s]));
2155            Ok(match op {
2156                CmpOp::Eq => eq(arr, &sc)?,
2157                CmpOp::Neq => neq(arr, &sc)?,
2158                CmpOp::Gt => gt(arr, &sc)?,
2159                CmpOp::Gte => gt_eq(arr, &sc)?,
2160                CmpOp::Lt => lt(arr, &sc)?,
2161                CmpOp::Lte => lt_eq(arr, &sc)?,
2162                CmpOp::Like => compute::like(arr, &sc)?,
2163                CmpOp::ILike => compute::ilike(arr, &sc)?,
2164            })
2165        }
2166        DataType::Int8 => num_cmp!(Int8Array, i8),
2167        DataType::Int16 => num_cmp!(Int16Array, i16),
2168        DataType::Int32 => num_cmp!(Int32Array, i32),
2169        DataType::Int64 => num_cmp!(Int64Array, i64),
2170        DataType::Float32 => num_cmp!(Float32Array, f32),
2171        DataType::Float64 => num_cmp!(Float64Array, f64),
2172        dt => Err(AppError::InvalidValue(format!(
2173            "unsupported type for comparison: {dt:?}"
2174        ))),
2175    }
2176}
2177
2178// ---------------------------------------------------------------------------
2179// Serialisation
2180// ---------------------------------------------------------------------------
2181
2182pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2183    // Temporal columns are kept native in resident memory (compact). Cast
2184    // them — plus any other dtype `write_value` can't render directly — to
2185    // Utf8 here, on the bounded page batch, so the JSON output is faithful
2186    // without paying the load-time RAM cost.
2187    let batch = cast_for_serialize(batch)?;
2188    let schema = batch.schema();
2189    let n_rows = batch.num_rows();
2190
2191    let keys: Vec<Vec<u8>> = schema
2192        .fields()
2193        .iter()
2194        .map(|f| {
2195            let mut k = Vec::with_capacity(f.name().len() + 3);
2196            k.push(b'"');
2197            k.extend_from_slice(f.name().as_bytes());
2198            k.extend_from_slice(b"\":");
2199            k
2200        })
2201        .collect();
2202
2203    // Resolve every column's concrete Arrow array type exactly ONCE here,
2204    // instead of paying a `data_type()` match + `downcast_ref` for every
2205    // single cell. The inner row loop then dispatches over a small enum,
2206    // which the compiler turns into a cheap jump table.
2207    let encoders: Vec<ColEnc> = batch
2208        .columns()
2209        .iter()
2210        .map(|c| ColEnc::new(c.as_ref()))
2211        .collect();
2212
2213    let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2214    let mut itoa_buf = itoa::Buffer::new();
2215    let mut ryu_buf = ryu::Buffer::new();
2216    buf.push(b'[');
2217
2218    for row in 0..n_rows {
2219        if row > 0 {
2220            buf.push(b',');
2221        }
2222        buf.push(b'{');
2223        for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2224            if i > 0 {
2225                buf.push(b',');
2226            }
2227            buf.extend_from_slice(key);
2228            enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2229        }
2230        buf.push(b'}');
2231    }
2232
2233    buf.push(b']');
2234    Ok(unsafe { String::from_utf8_unchecked(buf) })
2235}
2236
2237/// Per-column JSON encoder. The concrete Arrow array type is resolved once
2238/// (in [`ColEnc::new`]) so the hot row loop dispatches over this small enum
2239/// instead of repeating `data_type()` matching + `downcast_ref` for every
2240/// cell. Each variant borrows the already-downcast typed array.
2241enum ColEnc<'a> {
2242    Utf8(&'a StringArray),
2243    LargeUtf8(&'a LargeStringArray),
2244    Utf8View(&'a StringViewArray),
2245    /// Dictionary-encoded UTF-8 (Int32 keys). Holds the keys array and the
2246    /// pre-downcast string values so neither is re-resolved per row.
2247    DictI32Utf8(
2248        &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2249        &'a StringArray,
2250    ),
2251    Bool(&'a BooleanArray),
2252    I8(&'a Int8Array),
2253    I16(&'a Int16Array),
2254    I32(&'a Int32Array),
2255    I64(&'a Int64Array),
2256    U8(&'a UInt8Array),
2257    U16(&'a UInt16Array),
2258    U32(&'a UInt32Array),
2259    U64(&'a UInt64Array),
2260    Dec128(&'a Decimal128Array),
2261    Dec256(&'a Decimal256Array),
2262    F32(&'a Float32Array),
2263    F64(&'a Float64Array),
2264    /// Anything else: fall back to the generic `write_value` dispatch.
2265    Other(&'a dyn Array),
2266}
2267
2268impl<'a> ColEnc<'a> {
2269    fn new(col: &'a dyn Array) -> ColEnc<'a> {
2270        macro_rules! dc {
2271            ($t:ty) => {
2272                col.as_any().downcast_ref::<$t>().unwrap()
2273            };
2274        }
2275        match col.data_type() {
2276            DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2277            DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2278            DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2279            DataType::Dictionary(key, value)
2280                if matches!(key.as_ref(), DataType::Int32)
2281                    && matches!(value.as_ref(), DataType::Utf8) =>
2282            {
2283                let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2284                let values = dict
2285                    .values()
2286                    .as_any()
2287                    .downcast_ref::<StringArray>()
2288                    .unwrap();
2289                ColEnc::DictI32Utf8(dict, values)
2290            }
2291            DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2292            DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2293            DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2294            DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2295            DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2296            DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2297            DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2298            DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2299            DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2300            DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2301            DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2302            DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2303            DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2304            _ => ColEnc::Other(col),
2305        }
2306    }
2307
2308    #[inline]
2309    fn write(
2310        &self,
2311        buf: &mut Vec<u8>,
2312        row: usize,
2313        itoa_buf: &mut itoa::Buffer,
2314        ryu_buf: &mut ryu::Buffer,
2315    ) {
2316        macro_rules! int {
2317            ($arr:expr) => {{
2318                if $arr.is_null(row) {
2319                    buf.extend_from_slice(b"null");
2320                } else {
2321                    buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
2322                }
2323            }};
2324        }
2325        match self {
2326            ColEnc::Utf8(a) => {
2327                if a.is_null(row) {
2328                    buf.extend_from_slice(b"null");
2329                } else {
2330                    write_str(buf, a.value(row));
2331                }
2332            }
2333            ColEnc::LargeUtf8(a) => {
2334                if a.is_null(row) {
2335                    buf.extend_from_slice(b"null");
2336                } else {
2337                    write_str(buf, a.value(row));
2338                }
2339            }
2340            ColEnc::Utf8View(a) => {
2341                if a.is_null(row) {
2342                    buf.extend_from_slice(b"null");
2343                } else {
2344                    write_str(buf, a.value(row));
2345                }
2346            }
2347            ColEnc::DictI32Utf8(keys, values) => {
2348                if keys.is_null(row) {
2349                    buf.extend_from_slice(b"null");
2350                } else {
2351                    let k = keys.keys().value(row) as usize;
2352                    write_str(buf, values.value(k));
2353                }
2354            }
2355            ColEnc::Bool(a) => {
2356                if a.is_null(row) {
2357                    buf.extend_from_slice(b"null");
2358                } else {
2359                    buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
2360                }
2361            }
2362            ColEnc::I8(a) => int!(a),
2363            ColEnc::I16(a) => int!(a),
2364            ColEnc::I32(a) => int!(a),
2365            ColEnc::I64(a) => int!(a),
2366            ColEnc::U8(a) => int!(a),
2367            ColEnc::U16(a) => int!(a),
2368            ColEnc::U32(a) => int!(a),
2369            ColEnc::U64(a) => int!(a),
2370            ColEnc::Dec128(a) => {
2371                if a.is_null(row) {
2372                    buf.extend_from_slice(b"null");
2373                } else {
2374                    write_str(buf, &a.value_as_string(row));
2375                }
2376            }
2377            ColEnc::Dec256(a) => {
2378                if a.is_null(row) {
2379                    buf.extend_from_slice(b"null");
2380                } else {
2381                    write_str(buf, &a.value_as_string(row));
2382                }
2383            }
2384            ColEnc::F32(a) => {
2385                if a.is_null(row) {
2386                    buf.extend_from_slice(b"null");
2387                } else {
2388                    let v = a.value(row);
2389                    if v.is_finite() {
2390                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2391                    } else {
2392                        buf.extend_from_slice(b"null");
2393                    }
2394                }
2395            }
2396            ColEnc::F64(a) => {
2397                if a.is_null(row) {
2398                    buf.extend_from_slice(b"null");
2399                } else {
2400                    let v = a.value(row);
2401                    if v.is_finite() {
2402                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2403                    } else {
2404                        buf.extend_from_slice(b"null");
2405                    }
2406                }
2407            }
2408            ColEnc::Other(col) => {
2409                if col.is_null(row) {
2410                    buf.extend_from_slice(b"null");
2411                } else {
2412                    write_value(buf, *col, row);
2413                }
2414            }
2415        }
2416    }
2417}
2418
2419#[inline]
2420fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
2421    match col.data_type() {
2422        DataType::Utf8 => write_str(
2423            buf,
2424            col.as_any()
2425                .downcast_ref::<StringArray>()
2426                .unwrap()
2427                .value(row),
2428        ),
2429        DataType::LargeUtf8 => write_str(
2430            buf,
2431            col.as_any()
2432                .downcast_ref::<LargeStringArray>()
2433                .unwrap()
2434                .value(row),
2435        ),
2436        DataType::Utf8View => write_str(
2437            buf,
2438            col.as_any()
2439                .downcast_ref::<StringViewArray>()
2440                .unwrap()
2441                .value(row),
2442        ),
2443        DataType::Dictionary(key, value)
2444            if matches!(key.as_ref(), DataType::Int32)
2445                && matches!(value.as_ref(), DataType::Utf8) =>
2446        {
2447            let dict = col
2448                .as_any()
2449                .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2450                .unwrap();
2451            let keys = dict.keys();
2452            let values = dict
2453                .values()
2454                .as_any()
2455                .downcast_ref::<StringArray>()
2456                .unwrap();
2457            let k = keys.value(row) as usize;
2458            write_str(buf, values.value(k));
2459        }
2460        DataType::Boolean => {
2461            let v = col
2462                .as_any()
2463                .downcast_ref::<BooleanArray>()
2464                .unwrap()
2465                .value(row);
2466            buf.extend_from_slice(if v { b"true" } else { b"false" });
2467        }
2468        DataType::Int8 => {
2469            let mut b = itoa::Buffer::new();
2470            buf.extend_from_slice(
2471                b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
2472                    .as_bytes(),
2473            );
2474        }
2475        DataType::Int16 => {
2476            let mut b = itoa::Buffer::new();
2477            buf.extend_from_slice(
2478                b.format(
2479                    col.as_any()
2480                        .downcast_ref::<Int16Array>()
2481                        .unwrap()
2482                        .value(row),
2483                )
2484                .as_bytes(),
2485            );
2486        }
2487        DataType::Int32 => {
2488            let mut b = itoa::Buffer::new();
2489            buf.extend_from_slice(
2490                b.format(
2491                    col.as_any()
2492                        .downcast_ref::<Int32Array>()
2493                        .unwrap()
2494                        .value(row),
2495                )
2496                .as_bytes(),
2497            );
2498        }
2499        DataType::Int64 => {
2500            let mut b = itoa::Buffer::new();
2501            buf.extend_from_slice(
2502                b.format(
2503                    col.as_any()
2504                        .downcast_ref::<Int64Array>()
2505                        .unwrap()
2506                        .value(row),
2507                )
2508                .as_bytes(),
2509            );
2510        }
2511        DataType::UInt8 => {
2512            let mut b = itoa::Buffer::new();
2513            buf.extend_from_slice(
2514                b.format(
2515                    col.as_any()
2516                        .downcast_ref::<UInt8Array>()
2517                        .unwrap()
2518                        .value(row),
2519                )
2520                .as_bytes(),
2521            );
2522        }
2523        DataType::UInt16 => {
2524            let mut b = itoa::Buffer::new();
2525            buf.extend_from_slice(
2526                b.format(
2527                    col.as_any()
2528                        .downcast_ref::<UInt16Array>()
2529                        .unwrap()
2530                        .value(row),
2531                )
2532                .as_bytes(),
2533            );
2534        }
2535        DataType::UInt32 => {
2536            let mut b = itoa::Buffer::new();
2537            buf.extend_from_slice(
2538                b.format(
2539                    col.as_any()
2540                        .downcast_ref::<UInt32Array>()
2541                        .unwrap()
2542                        .value(row),
2543                )
2544                .as_bytes(),
2545            );
2546        }
2547        DataType::UInt64 => {
2548            let mut b = itoa::Buffer::new();
2549            buf.extend_from_slice(
2550                b.format(
2551                    col.as_any()
2552                        .downcast_ref::<UInt64Array>()
2553                        .unwrap()
2554                        .value(row),
2555                )
2556                .as_bytes(),
2557            );
2558        }
2559        DataType::Decimal128(_, _) => {
2560            let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
2561            write_str(buf, &arr.value_as_string(row));
2562        }
2563        DataType::Decimal256(_, _) => {
2564            let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
2565            write_str(buf, &arr.value_as_string(row));
2566        }
2567        DataType::Float32 => {
2568            let v = col
2569                .as_any()
2570                .downcast_ref::<Float32Array>()
2571                .unwrap()
2572                .value(row);
2573            if v.is_finite() {
2574                let mut b = ryu::Buffer::new();
2575                buf.extend_from_slice(b.format_finite(v).as_bytes());
2576            } else {
2577                buf.extend_from_slice(b"null");
2578            }
2579        }
2580        DataType::Float64 => {
2581            let v = col
2582                .as_any()
2583                .downcast_ref::<Float64Array>()
2584                .unwrap()
2585                .value(row);
2586            if v.is_finite() {
2587                let mut b = ryu::Buffer::new();
2588                buf.extend_from_slice(b.format_finite(v).as_bytes());
2589            } else {
2590                buf.extend_from_slice(b"null");
2591            }
2592        }
2593        // Any dtype not handled above must have been pre-cast to Utf8 by
2594        // `cast_for_serialize`. Hitting this arm is a bug — surface it as a
2595        // visible JSON string rather than a silent null so it can't be
2596        // mistaken for a real NULL value.
2597        other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
2598    }
2599}
2600
2601#[inline]
2602fn write_str(buf: &mut Vec<u8>, s: &str) {
2603    buf.push(b'"');
2604    for &byte in s.as_bytes() {
2605        match byte {
2606            b'"' => buf.extend_from_slice(b"\\\""),
2607            b'\\' => buf.extend_from_slice(b"\\\\"),
2608            b'\n' => buf.extend_from_slice(b"\\n"),
2609            b'\r' => buf.extend_from_slice(b"\\r"),
2610            b'\t' => buf.extend_from_slice(b"\\t"),
2611            0x00..=0x1f => {
2612                buf.extend_from_slice(b"\\u00");
2613                const HEX: &[u8] = b"0123456789abcdef";
2614                buf.push(HEX[(byte >> 4) as usize]);
2615                buf.push(HEX[(byte & 0xf) as usize]);
2616            }
2617            b => buf.push(b),
2618        }
2619    }
2620    buf.push(b'"');
2621}
2622
2623// ---------------------------------------------------------------------------
2624// Backend trait impl — wires the store into the generic core handlers.
2625// ---------------------------------------------------------------------------
2626
2627#[async_trait]
2628impl Backend for Store {
2629    fn names(&self) -> Vec<String> {
2630        Store::names(self)
2631    }
2632
2633    fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
2634        let st = self.dataset(name)?;
2635        Ok(DatasetSummary {
2636            name: st.schema.name.clone(),
2637            columns: st.schema.columns.len(),
2638            rows: st.num_rows(),
2639        })
2640    }
2641
2642    fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
2643        let st = self.dataset(name)?;
2644        Ok(Arc::new(st.schema.clone()))
2645    }
2646
2647    fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
2648        let st = self.dataset(name)?;
2649        // Report indexed columns in the dataset's declared schema order
2650        // so the `/schema` response is deterministic.
2651        let mut cols: Vec<String> = st
2652            .schema
2653            .columns
2654            .iter()
2655            .map(|c| c.name.clone())
2656            .filter(|n| st.index.contains_key(n))
2657            .collect();
2658        // Any indexed columns not in `schema.columns` (shouldn't happen,
2659        // but be defensive) get appended sorted.
2660        let mut extras: Vec<String> = st
2661            .index
2662            .keys()
2663            .filter(|n| !cols.iter().any(|c| c == *n))
2664            .cloned()
2665            .collect();
2666        extras.sort();
2667        cols.extend(extras);
2668        Ok(cols)
2669    }
2670
2671    async fn sample(&self, name: &str) -> Result<String, AppError> {
2672        Store::sample(self, name).await
2673    }
2674
2675    async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
2676        Store::query(self, name, req).await
2677    }
2678
2679    async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
2680        Store::query_arrow(self, name, req).await
2681    }
2682
2683    async fn query_arrow_stream(
2684        &self,
2685        name: &str,
2686        req: &QueryRequest,
2687    ) -> Result<ArrowIpcStream, AppError> {
2688        Store::query_arrow_stream(self, name, req).await
2689    }
2690
2691    async fn query_arrow_stream_all(
2692        &self,
2693        name: &str,
2694        req: &QueryRequest,
2695    ) -> Result<ArrowIpcStream, AppError> {
2696        Store::query_arrow_stream_all(self, name, req).await
2697    }
2698
2699    async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
2700        Store::count(self, name, req).await
2701    }
2702
2703    async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
2704        Store::parquet(self, name).await
2705    }
2706
2707    async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
2708        Store::reload(self, name).await
2709    }
2710}