Skip to main content

datapress_datafusion/
store.rs

1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::sync::{Arc, Mutex};
4
5use arc_swap::ArcSwap;
6use arrow::array::{
7    Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
8    Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
9    StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
10};
11use arrow::compute;
12use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
13use arrow::datatypes::{DataType, Field, Schema};
14use async_trait::async_trait;
15use parquet::arrow::ProjectionMask;
16use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
17use serde_json::Value as JsonValue;
18
19use datafusion::datasource::file_format::parquet::ParquetFormat;
20use datafusion::datasource::listing::{
21    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
22};
23use datafusion::datasource::{MemTable, TableProvider};
24use datafusion::prelude::{ParquetReadOptions, SessionContext};
25use datafusion::scalar::ScalarValue;
26
27use object_store::aws::AmazonS3Builder;
28use url::Url;
29
30use datapress_core::backend::{
31    ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
32};
33use datapress_core::config::{
34    AddressingStyle, AppConfig, DatasetConfig, IndexConfig, IndexMode, ResolvedCreds, S3Config,
35    SourceKind,
36};
37use datapress_core::errors::AppError;
38use datapress_core::models::{CountRequest, Predicate, QueryRequest};
39use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
40
41// ---------------------------------------------------------------------------
42// Public types
43// ---------------------------------------------------------------------------
44
45/// Pre-built equality index: lowercase col name → string-encoded value → sorted row ids.
46type EqIndex = HashMap<String, HashMap<String, Vec<u32>>>;
47
48/// Per-dataset state: schema metadata, the resident chunks, and the
49/// equality index built per the dataset's `[dataset.index]` policy.
50///
51/// `data` is the dataset as a `Vec<RecordBatch>` — exactly the chunks
52/// produced by the underlying reader, after temporal columns are cast to
53/// `Utf8`. We deliberately do **not** call `concat_batches` to fuse them
54/// into one batch: on wide schemas (hundreds of columns) that transiently
55/// allocates a second full copy of the decoded Arrow data, pushing peak
56/// RSS to ~2× the resident size and OOM-killing the process at startup.
57///
58/// When `lazy` is true the dataset is *not* materialised: `data` is empty,
59/// `index` is empty, and every query is dispatched to DataFusion SQL
60/// against a registered `ListingTable`. `arrow_schema` still carries the
61/// inferred schema so discovery endpoints work.
62pub struct DatasetState {
63    pub schema: DatasetSchema,
64    pub data: Vec<RecordBatch>,
65    pub arrow_schema: Arc<Schema>,
66    pub index: EqIndex,
67    pub lazy: bool,
68}
69
70impl DatasetState {
71    /// Sum of `num_rows()` across all resident chunks. `0` for lazy datasets.
72    pub fn num_rows(&self) -> usize {
73        self.data.iter().map(|b| b.num_rows()).sum()
74    }
75}
76
77/// Multi-dataset registry. Each dataset is registered in the shared
78/// `SessionContext` under its configured name. The per-dataset state is
79/// held behind `ArcSwap` so a reload can atomically replace it without
80/// blocking concurrent queries.
81pub struct Store {
82    ctx: SessionContext,
83    max_page_size: u64,
84    /// Original dataset configs, indexed by name. Reload reads the source
85    /// path from here — clients can't redirect a reload at an arbitrary file.
86    configs: HashMap<String, DatasetConfig>,
87    /// Hot-swappable snapshot of all currently loaded datasets.
88    datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
89    /// Per-name reload mutex. Serialises concurrent reloads of the same
90    /// dataset; reloads of different datasets proceed in parallel.
91    reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
92}
93
94impl Store {
95    /// Load every dataset declared in `cfg`.
96    pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
97        // One-shot init for the deltalake S3 backend. Safe to call more
98        // than once — the handlers are idempotent.
99        if cfg
100            .datasets
101            .iter()
102            .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
103        {
104            deltalake::aws::register_handlers(None);
105        }
106
107        let ctx = SessionContext::new();
108        let mut datasets = HashMap::with_capacity(cfg.datasets.len());
109        let mut configs = HashMap::with_capacity(cfg.datasets.len());
110
111        for d in &cfg.datasets {
112            let (state, provider) = build_dataset(d, &ctx).await?;
113            ctx.register_table(d.name.as_str(), provider)?;
114            datasets.insert(d.name.clone(), Arc::new(state));
115            configs.insert(d.name.clone(), d.clone());
116        }
117        Ok(Self {
118            ctx,
119            max_page_size: cfg.server.max_page_size.max(1),
120            configs,
121            datasets: ArcSwap::from_pointee(datasets),
122            reload_locks: Mutex::new(HashMap::new()),
123        })
124    }
125
126    /// Sorted list of dataset names.
127    pub fn names(&self) -> Vec<String> {
128        let snap = self.datasets.load();
129        let mut v: Vec<String> = snap.keys().cloned().collect();
130        v.sort();
131        v
132    }
133
134    pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
135        self.datasets
136            .load()
137            .get(name)
138            .cloned()
139            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
140    }
141
142    /// JSON for the first row of the dataset, or `null` if empty. Used by
143    /// `GET /api/datasets/{name}/schema` for discoverability.
144    pub async fn sample(&self, name: &str) -> Result<String, AppError> {
145        let st = self.dataset(name)?;
146
147        // Lazy datasets have no resident batch — pull one row via SQL.
148        if st.lazy {
149            let table = DatasetSchema::quote_ident(&st.schema.name);
150            let sql = format!("SELECT * FROM {table} LIMIT 1");
151            let df = self.ctx.sql(&sql).await?;
152            let batches = df.collect().await?;
153            if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
154                return Ok("null".into());
155            }
156            let arr = serialize(&batches[0].slice(0, 1))?;
157            let trimmed = arr.trim();
158            let inner = trimmed
159                .strip_prefix('[')
160                .and_then(|s| s.strip_suffix(']'))
161                .unwrap_or(trimmed);
162            return Ok(inner.to_string());
163        }
164
165        let first = match st.data.iter().find(|b| b.num_rows() > 0) {
166            Some(b) => b,
167            None => return Ok("null".into()),
168        };
169        let arr = serialize(&first.slice(0, 1))?;
170        // strip the outer [] to return a single object
171        let trimmed = arr.trim();
172        let inner = trimmed
173            .strip_prefix('[')
174            .and_then(|s| s.strip_suffix(']'))
175            .unwrap_or(trimmed);
176        Ok(inner.to_string())
177    }
178
179    /// Rebuild `name` from disk and atomically swap it in. Concurrent queries
180    /// against the same name continue to see the *old* `Arc<DatasetState>`
181    /// until they finish; the old data is dropped once the last reference
182    /// goes away.
183    pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
184        // 1. Look up the dataset config. Not finding it = 404.
185        let cfg = self
186            .configs
187            .get(name)
188            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
189            .clone();
190
191        // 2. Per-name lock: only one reload of this dataset at a time.
192        let lock = {
193            let mut locks = self.reload_locks.lock().unwrap();
194            locks
195                .entry(name.to_string())
196                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
197                .clone()
198        };
199        let _guard = lock.lock().await;
200
201        let started = std::time::Instant::now();
202
203        // 3. Heavy lifting (source read + index build). Parquet/delta
204        // readers are themselves async, so we don't wrap in `web::block`.
205        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
206        let rows = state.num_rows();
207
208        // 4. Atomic swap.
209        //   a) Replace the MemTable inside the SessionContext.
210        //   b) ArcSwap a new snapshot map with the updated Arc<DatasetState>.
211        // In-flight queries already hold the old provider + old Arc; they
212        // run to completion. New queries see the new data.
213        let _ = self.ctx.deregister_table(name)?;
214        self.ctx.register_table(name, provider)?;
215
216        let mut new_map = (**self.datasets.load()).clone();
217        new_map.insert(name.to_string(), Arc::new(state));
218        self.datasets.store(Arc::new(new_map));
219
220        let elapsed_ms = started.elapsed().as_millis();
221        log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
222        Ok(ReloadStats { rows, elapsed_ms })
223    }
224
225    /// Run a `QueryRequest` against `name`. Empty predicates → O(1) Arrow
226    /// slice. Otherwise → DataFusion SQL on the single registered table.
227    /// Lazy datasets skip the in-memory hot paths and always dispatch to SQL.
228    pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
229        let batch = self.query_batch(name, req).await?;
230        if batch.num_rows() == 0 {
231            return Ok("[]".to_string());
232        }
233        serialize(&batch)
234    }
235
236    /// Same plan as [`Self::query`], but encode the result page as an
237    /// Arrow IPC stream (one schema message + one batch + EOS). Empty
238    /// results still produce a valid, self-describing zero-batch stream.
239    pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
240        let batch = self.query_batch(name, req).await?;
241        let schema = batch.schema();
242        let mut buf = Vec::with_capacity(8 * 1024);
243        {
244            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
245            if batch.num_rows() > 0 {
246                w.write(&batch)?;
247            }
248            w.finish()?;
249        }
250        Ok(buf)
251    }
252
253    pub async fn query_arrow_stream(
254        &self,
255        name: &str,
256        req: &QueryRequest,
257    ) -> Result<ArrowIpcStream, AppError> {
258        let batches = self.query_batches(name, req).await?;
259        Ok(stream_arrow_batches(batches))
260    }
261
262    pub async fn query_arrow_stream_all(
263        &self,
264        name: &str,
265        req: &QueryRequest,
266    ) -> Result<ArrowIpcStream, AppError> {
267        let batches = self.query_batches_all(name, req).await?;
268        Ok(stream_arrow_batches(batches))
269    }
270
271    /// Compute the result page as a single `RecordBatch`. Shared between
272    /// the JSON and Arrow IPC encoders.
273    async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
274        let batches = self.query_batches(name, req).await?;
275        if batches.is_empty() {
276            return Ok(RecordBatch::new_empty(Arc::new(
277                arrow::datatypes::Schema::empty(),
278            )));
279        }
280        if batches.len() == 1 {
281            return Ok(batches.into_iter().next().expect("checked len"));
282        }
283        if batches.iter().all(|b| b.num_rows() == 0) {
284            return Ok(RecordBatch::new_empty(batches[0].schema()));
285        }
286        let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
287        Ok(batch)
288    }
289
290    /// Compute the result page as Arrow batches. Arrow IPC responses can
291    /// write these directly, while JSON callers concatenate via
292    /// [`Self::query_batch`] for the existing row conversion path.
293    async fn query_batches(
294        &self,
295        name: &str,
296        req: &QueryRequest,
297    ) -> Result<Vec<RecordBatch>, AppError> {
298        let st = self.dataset(name)?;
299
300        let page = req.page.max(1);
301        let page_size = req.page_size.clamp(1, self.max_page_size);
302        let offset = ((page - 1) * page_size) as usize;
303        let limit = page_size as usize;
304
305        self.query_batches_inner(st, req, Some((offset, limit)))
306            .await
307    }
308
309    /// Compute all matching rows as Arrow batches for the one-request
310    /// streaming endpoint. `page` and `page_size` are intentionally ignored;
311    /// optional `limit` still caps the total result size.
312    async fn query_batches_all(
313        &self,
314        name: &str,
315        req: &QueryRequest,
316    ) -> Result<Vec<RecordBatch>, AppError> {
317        let st = self.dataset(name)?;
318        self.query_batches_inner(st, req, None).await
319    }
320
321    async fn query_batches_inner(
322        &self,
323        st: Arc<DatasetState>,
324        req: &QueryRequest,
325        page_window: Option<(usize, usize)>,
326    ) -> Result<Vec<RecordBatch>, AppError> {
327        let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
328
329        // In-memory hot paths only fire when:
330        //   - the dataset is materialised,
331        //   - the caller did not ask for ordering,
332        //   - and did not ask for a hard `limit` cap on a paged request.
333        // Both of the latter two require sorting / capping that the SQL
334        // engine handles uniformly across all data types.
335        let can_fast_path = !st.lazy
336            && req.order_by.is_empty()
337            && (page_window.is_none() || req.limit.is_none())
338            && req.group_by.is_empty()
339            && !req.distinct;
340
341        if can_fast_path {
342            let total = st.num_rows();
343
344            // No predicates -> O(1) raw Arrow slices over resident batches,
345            // no engine overhead.
346            if req.predicates.is_empty() {
347                if page_window.is_none() && req.limit.is_none() {
348                    return st
349                        .data
350                        .iter()
351                        .cloned()
352                        .map(|batch| project(&st.schema, batch, &req.columns))
353                        .collect();
354                }
355                let start = offset.min(total);
356                let len = limit.min(total - start);
357                let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
358                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
359            }
360
361            // Index fast path: if every predicate is eq/in on an indexed column,
362            // resolve via the pre-built equality index.
363            if let Some(rows) = try_index(&st.index, &req.predicates) {
364                let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
365                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
366            }
367        }
368
369        // Fallback (and only path for lazy datasets): DataFusion SQL.
370        let (sql, params) = match page_window {
371            Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
372            None => build_query_stream_sql(&st.schema, req)?,
373        };
374        let mut df = self.ctx.sql(&sql).await?;
375        if !params.is_empty() {
376            df = df.with_param_values(params)?;
377        }
378        let batches = df.collect().await?;
379        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
380            let schema = batches
381                .first()
382                .map(|b| b.schema())
383                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
384            return Ok(vec![RecordBatch::new_empty(schema)]);
385        }
386        Ok(batches)
387    }
388}
389
390fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
391    let schema = batches
392        .first()
393        .map(|batch| batch.schema())
394        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
395    let (mut writer, stream) = arrow_ipc_stream_channel(8);
396
397    tokio::task::spawn_blocking(move || {
398        let result = (|| -> Result<(), AppError> {
399            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
400            for batch in batches {
401                if batch.num_rows() > 0 {
402                    w.write(&batch)?;
403                }
404            }
405            w.finish()?;
406            Ok(())
407        })();
408        if let Err(err) = result {
409            log::error!("datafusion arrow stream failed: {err}");
410            writer.send_error(err);
411        }
412    });
413
414    stream
415}
416
417impl Store {
418    /// Return the number of rows matching `req.predicates`. With no
419    /// predicates this is a cheap metadata lookup on materialised datasets
420    /// and a `SELECT COUNT(*)` on lazy ones.
421    pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
422        let st = self.dataset(name)?;
423
424        if !st.lazy {
425            // No predicates → resident row count, no scan.
426            if req.predicates.is_empty() {
427                return Ok(st.num_rows() as i64);
428            }
429            // Index fast path: same eligibility rules as `query`.
430            if let Some(rows) = try_index(&st.index, &req.predicates) {
431                return Ok(rows.len() as i64);
432            }
433        }
434
435        // Fallback: DataFusion SQL — same predicate translation as `query`,
436        // with predicate values bound as typed parameters.
437        let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
438        let mut df = self.ctx.sql(&sql).await?;
439        if !params.is_empty() {
440            df = df.with_param_values(params)?;
441        }
442        let batches = df.collect().await?;
443        let n = batches
444            .first()
445            .and_then(|b| {
446                b.column(0)
447                    .as_any()
448                    .downcast_ref::<arrow::array::Int64Array>()
449            })
450            .filter(|a| !a.is_empty())
451            .map(|a| a.value(0))
452            .unwrap_or(0);
453        Ok(n)
454    }
455}
456
457// ---------------------------------------------------------------------------
458// Dataset loading
459// ---------------------------------------------------------------------------
460
461async fn build_dataset(
462    d: &DatasetConfig,
463    ctx: &SessionContext,
464) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
465    // Lazy datasets: register a ListingTable straight against the source
466    // and skip the materialise / index / partition pipeline below. Delta
467    // is rejected — deltalake reads the transaction log eagerly to know
468    // which parquet files are current, so "lazy delta" doesn't map onto
469    // ListingTable cleanly.
470    if d.lazy {
471        match (d.source.kind, d.source.is_s3()) {
472            (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
473            (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
474            (SourceKind::Delta, _) => {
475                return Err(AppError::Internal(format!(
476                    "dataset '{}': lazy mode is not supported for delta sources",
477                    d.name
478                )));
479            }
480        }
481    }
482
483    // Fetch raw RecordBatches from whichever backing store the dataset
484    // is configured to use. All four (parquet, delta) x (local, s3)
485    // combinations converge into one Vec<RecordBatch>; the materialisation
486    // / indexing / partitioning logic below is shared.
487    let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
488        (SourceKind::Parquet, false) => read_local_parquet(d)?,
489        (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
490        (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
491        (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
492    };
493    if raw_batches.is_empty() {
494        return Err(AppError::Internal(format!(
495            "dataset '{}': source produced no batches",
496            d.name
497        )));
498    }
499
500    let chunks = raw_batches;
501    let arrow_sch = chunks[0].schema();
502
503    // Build DatasetSchema from the Arrow schema.
504    let columns: Vec<ColumnInfo> = arrow_sch
505        .fields()
506        .iter()
507        .map(|f| {
508            let dt = f.data_type();
509            ColumnInfo {
510                name: f.name().clone(),
511                logical: arrow_to_logical(dt),
512                sql_type: format!("{dt:?}"),
513                nullable: f.is_nullable(),
514            }
515        })
516        .collect();
517    let schema = DatasetSchema::new(&d.name, columns);
518
519    // Build the equality index per the per-dataset policy. Operates on the
520    // chunked representation directly so we never have to materialise a
521    // single concatenated batch (which would double peak RSS on wide
522    // schemas — see `DatasetState` docs).
523    let index = build_eq_index_with_policy(&chunks, &d.index);
524
525    // Partition for parallel scans by the SQL fallback path. We distribute
526    // the existing batches round-robin across `n_parts` partitions instead
527    // of re-slicing a concatenated batch — `clone()` on a RecordBatch is
528    // an Arc-clone of the column buffers, not a copy.
529    let n_parts = std::thread::available_parallelism()
530        .map(|n| n.get())
531        .unwrap_or(4);
532    let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
533    for (i, b) in chunks.iter().enumerate() {
534        if b.num_rows() == 0 {
535            continue;
536        }
537        parts[i % n_parts].push(b.clone());
538    }
539    parts.retain(|p| !p.is_empty());
540    let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
541
542    let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
543    let mem_mb: usize = chunks
544        .iter()
545        .flat_map(|b| b.columns().iter())
546        .map(|c| c.get_buffer_memory_size())
547        .sum::<usize>()
548        / 1_048_576;
549    log::info!(
550        "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
551        d.name,
552        d.source.kind.as_str(),
553        total_rows,
554        schema.columns.len(),
555        mem_mb,
556        chunks.len(),
557        index.len()
558    );
559
560    Ok((
561        DatasetState {
562            schema,
563            data: chunks,
564            arrow_schema: arrow_sch,
565            index,
566            lazy: false,
567        },
568        provider,
569    ))
570}
571
572/// Build a lazy state + `ListingTable` provider for a local parquet dataset.
573/// The dataset is never read into RAM; DataFusion streams row groups on
574/// each query. The returned `DatasetState.data` is an empty `Vec` —
575/// `arrow_schema` still carries the inferred Arrow schema for discovery.
576async fn build_lazy_local_parquet(
577    d: &DatasetConfig,
578    ctx: &SessionContext,
579) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
580    let (url, part_keys) = lazy_local_listing(d)?;
581
582    let mut opts =
583        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
584    if !part_keys.is_empty() {
585        opts = opts.with_table_partition_cols(
586            part_keys
587                .iter()
588                .map(|k| (k.clone(), DataType::Utf8))
589                .collect(),
590        );
591    }
592
593    let session_state = ctx.state();
594    // `infer_schema` returns the *file* schema (without partition columns);
595    // `ListingTable` appends the declared partition columns on top.
596    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
597        AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
598    })?;
599
600    let cfg = ListingTableConfig::new(url)
601        .with_listing_options(opts)
602        .with_schema(file_schema.clone());
603    let table = ListingTable::try_new(cfg).map_err(|e| {
604        AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
605    })?;
606    let provider: Arc<dyn TableProvider> = Arc::new(table);
607
608    // Discovery schema = file columns + partition columns (Utf8).
609    let mut fields: Vec<Field> = file_schema
610        .fields()
611        .iter()
612        .map(|f| f.as_ref().clone())
613        .collect();
614    for k in &part_keys {
615        if !fields.iter().any(|f| f.name() == k) {
616            fields.push(Field::new(k, DataType::Utf8, false));
617        }
618    }
619    let arrow_sch = Arc::new(Schema::new(fields));
620
621    let columns: Vec<ColumnInfo> = arrow_sch
622        .fields()
623        .iter()
624        .map(|f| {
625            let dt = f.data_type();
626            ColumnInfo {
627                name: f.name().clone(),
628                logical: arrow_to_logical(dt),
629                sql_type: format!("{dt:?}"),
630                nullable: f.is_nullable(),
631            }
632        })
633        .collect();
634    let schema = DatasetSchema::new(&d.name, columns);
635
636    log::info!(
637        "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
638        d.name,
639        d.source.kind.as_str(),
640        schema.columns.len(),
641        part_keys.len()
642    );
643
644    Ok((
645        DatasetState {
646            schema,
647            data: Vec::new(),
648            arrow_schema: arrow_sch,
649            index: EqIndex::new(),
650            lazy: true,
651        },
652        provider,
653    ))
654}
655
656/// Resolve a local lazy-parquet location into a single `ListingTableUrl`
657/// rooted at the dataset base plus the ordered hive partition keys (if any).
658/// Handles three shapes: a glob (`root/city=*/*.parquet`), a directory
659/// (hive root or flat folder of parquets), and a single `*.parquet` file.
660fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
661    let loc = &d.source.location;
662
663    if loc.contains('*') || loc.contains('?') || loc.contains('[') {
664        let parts: Vec<&str> = loc.split('/').collect();
665        let first_wild = parts
666            .iter()
667            .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
668            .unwrap_or(parts.len());
669        let base = parts[..first_wild].join("/");
670        let base = if base.is_empty() {
671            "/".to_string()
672        } else {
673            base
674        };
675        // Partition keys: `key=…` components between the base and the file
676        // pattern (the final component).
677        let upper = parts.len().saturating_sub(1);
678        let keys: Vec<String> = parts[first_wild.min(upper)..upper]
679            .iter()
680            .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
681            .filter(|k| !k.is_empty())
682            .collect();
683        return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
684    }
685
686    let path = std::path::Path::new(loc);
687    if path.is_dir() {
688        let keys = discover_hive_keys(path);
689        return Ok((dir_url(path, d)?, keys));
690    }
691
692    let url = ListingTableUrl::parse(loc)
693        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
694    Ok((url, Vec::new()))
695}
696
697/// Parse a directory path into a `ListingTableUrl` (trailing slash so
698/// DataFusion treats it as a directory root, not a single object).
699fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
700    let s = path.to_str().ok_or_else(|| {
701        AppError::Internal(format!(
702            "dataset '{}': non-utf8 path {}",
703            d.name,
704            path.display()
705        ))
706    })?;
707    let s = if s.ends_with('/') {
708        s.to_string()
709    } else {
710        format!("{s}/")
711    };
712    ListingTableUrl::parse(&s)
713        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
714}
715
716/// Walk down a directory following the first `key=value` subdirectory at
717/// each level to discover the ordered hive partition keys. Returns an empty
718/// vec for a flat (non-partitioned) folder.
719fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
720    let mut keys = Vec::new();
721    let mut cur = base.to_path_buf();
722    loop {
723        let Ok(rd) = std::fs::read_dir(&cur) else {
724            break;
725        };
726        let mut next: Option<(String, std::path::PathBuf)> = None;
727        for entry in rd.flatten() {
728            let p = entry.path();
729            if !p.is_dir() {
730                continue;
731            }
732            let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
733                continue;
734            };
735            if let Some((k, v)) = name.split_once('=')
736                && !k.is_empty()
737                && !v.is_empty()
738            {
739                next = Some((k.to_string(), p));
740                break;
741            }
742        }
743        match next {
744            Some((k, p)) => {
745                keys.push(k);
746                cur = p;
747            }
748            None => break,
749        }
750    }
751    keys
752}
753
754/// Lazy S3 parquet: register the dataset's S3 object store on `ctx`, then
755/// build a `ListingTable` rooted at the `s3://bucket/prefix/` location.
756/// DataFusion does the directory listing through the registered store and
757/// streams row groups on each query — no local enumeration needed.
758async fn build_lazy_s3_parquet(
759    d: &DatasetConfig,
760    ctx: &SessionContext,
761) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
762    register_s3_object_store(d, ctx)?;
763
764    let url = ListingTableUrl::parse(&d.source.location).map_err(|e| {
765        AppError::Internal(format!(
766            "dataset '{}': bad s3 url '{}': {e}",
767            d.name, d.source.location
768        ))
769    })?;
770
771    let opts =
772        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
773
774    let session_state = ctx.state();
775    let resolved_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
776        AppError::Internal(format!(
777            "dataset '{}': infer parquet schema on s3: {e}",
778            d.name
779        ))
780    })?;
781
782    let cfg = ListingTableConfig::new(url)
783        .with_listing_options(opts)
784        .with_schema(resolved_schema.clone());
785    let table = ListingTable::try_new(cfg).map_err(|e| {
786        AppError::Internal(format!(
787            "dataset '{}': ListingTable::try_new (s3): {e}",
788            d.name
789        ))
790    })?;
791    let provider: Arc<dyn TableProvider> = Arc::new(table);
792
793    let arrow_sch = resolved_schema;
794    let columns: Vec<ColumnInfo> = arrow_sch
795        .fields()
796        .iter()
797        .map(|f| {
798            let dt = f.data_type();
799            ColumnInfo {
800                name: f.name().clone(),
801                logical: arrow_to_logical(dt),
802                sql_type: format!("{dt:?}"),
803                nullable: f.is_nullable(),
804            }
805        })
806        .collect();
807    let schema = DatasetSchema::new(&d.name, columns);
808
809    log::info!(
810        "dataset '{}' [{}, lazy, s3]: {} cols (no materialise, no index)",
811        d.name,
812        d.source.kind.as_str(),
813        schema.columns.len()
814    );
815
816    Ok((
817        DatasetState {
818            schema,
819            data: Vec::new(),
820            arrow_schema: arrow_sch,
821            index: EqIndex::new(),
822            lazy: true,
823        },
824        provider,
825    ))
826}
827
828/// Original local-parquet code path — sync file I/O. We set a large reader
829/// batch size so wide schemas (hundreds of columns) don't pay per-array
830/// metadata overhead on thousands of small (default 1024-row) batches.
831///
832/// Two memory-saving knobs are applied here:
833///
834/// * **Column projection** — if `d.columns` is non-empty, only those
835///   columns are decoded; everything else is skipped at the parquet reader
836///   level (no Arrow array is ever allocated for the dropped columns).
837/// * **Dictionary preservation** — Utf8 columns whose parquet column chunks
838///   carry a dictionary page are materialised as Arrow
839///   `Dictionary(Int32, Utf8)` instead of plain `Utf8`. Low-cardinality
840///   string columns (state, country, severity, …) stay represented as
841///   `n_unique` string slots plus an Int32 index per row instead of
842///   `n_rows` independent strings — typically 10×–50× smaller for
843///   real-world data.
844fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
845    let files = d.resolve_local_parquet_files()?;
846    let mut all = Vec::new();
847    let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
848        None
849    } else {
850        Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
851    };
852
853    for f in &files {
854        let file = std::fs::File::open(f)
855            .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
856
857        // First pass: peek the parquet metadata + default Arrow schema so we
858        // can (a) decide a column projection and (b) override Utf8 columns
859        // that are dictionary-encoded in the file so the reader materialises
860        // them as Arrow Dictionary arrays instead of expanding to plain Utf8.
861        let probe = ParquetRecordBatchReaderBuilder::try_new(
862            file.try_clone()
863                .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
864        )?;
865        let parquet_schema = probe.parquet_schema().clone();
866        let arrow_schema = probe.schema().clone();
867        let metadata = probe.metadata().clone();
868        drop(probe);
869
870        // Column projection (top-level / leaf indices for flat schemas).
871        let projection = if let Some(w) = &wanted {
872            let indices: Vec<usize> = arrow_schema
873                .fields()
874                .iter()
875                .enumerate()
876                .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
877                .map(|(i, _)| i)
878                .collect();
879            if indices.is_empty() {
880                return Err(AppError::Internal(format!(
881                    "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
882                    d.name,
883                    d.columns,
884                    f.display()
885                )));
886            }
887            ProjectionMask::roots(&parquet_schema, indices)
888        } else {
889            ProjectionMask::all()
890        };
891
892        // Dictionary override: any Utf8 column whose first row group carries
893        // a dictionary page is re-typed to Dictionary(Int32, Utf8). The
894        // override schema must still describe every column in the parquet
895        // file (projection is applied separately). Skipped entirely when
896        // the dataset has `dict_encode = false` — escape hatch for cases
897        // where the override interacts badly with null propagation in the
898        // downstream engine.
899        let mut new_fields: Vec<Field> = arrow_schema
900            .fields()
901            .iter()
902            .map(|f| f.as_ref().clone())
903            .collect();
904        if d.dict_encode
905            && let Some(rg0) = metadata.row_groups().first()
906        {
907            for (i, fld) in arrow_schema.fields().iter().enumerate() {
908                if !matches!(
909                    fld.data_type(),
910                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
911                ) {
912                    continue;
913                }
914                if let Some(col) = rg0.columns().get(i)
915                    && col.dictionary_page_offset().is_some()
916                {
917                    new_fields[i] = Field::new(
918                        fld.name(),
919                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
920                        fld.is_nullable(),
921                    );
922                }
923            }
924        }
925        let forced_schema = Arc::new(Schema::new(new_fields));
926
927        let opts = ArrowReaderOptions::new().with_schema(forced_schema);
928        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
929            .with_batch_size(65_536)
930            .with_projection(projection)
931            .build()?;
932        // Hive-style partition columns (`city=NYC/…`) live in the path, not
933        // the file. Fold them in as constant Utf8 columns so they show up in
934        // the schema and are queryable — matching the DuckDB backend.
935        let pairs = hive_pairs(f);
936        for batch in reader {
937            let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
938            all.push(if pairs.is_empty() {
939                batch
940            } else {
941                append_partition_cols(&batch, &pairs)?
942            });
943        }
944    }
945    if all.is_empty() {
946        return Err(AppError::Internal(format!(
947            "dataset '{}': parquet source is empty",
948            d.name
949        )));
950    }
951    Ok(all)
952}
953
954/// Ordered hive-style partition `(key, value)` pairs encoded in a path, i.e.
955/// directory components shaped like `key=value` (e.g. `year=2024/city=NYC`).
956fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
957    path.components()
958        .filter_map(|c| c.as_os_str().to_str())
959        .filter_map(|seg| {
960            let (k, v) = seg.split_once('=')?;
961            if k.is_empty() || v.is_empty() || v.contains('=') {
962                return None;
963            }
964            Some((k.to_string(), v.to_string()))
965        })
966        .collect()
967}
968
969/// Append constant Utf8 columns for each hive partition pair. A partition
970/// key that collides with a real file column is skipped (the file wins).
971fn append_partition_cols(
972    batch: &RecordBatch,
973    pairs: &[(String, String)],
974) -> Result<RecordBatch, AppError> {
975    let n = batch.num_rows();
976    let mut fields: Vec<Field> = batch
977        .schema()
978        .fields()
979        .iter()
980        .map(|f| f.as_ref().clone())
981        .collect();
982    let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
983    for (k, v) in pairs {
984        if fields.iter().any(|f| f.name() == k) {
985            continue;
986        }
987        fields.push(Field::new(k, DataType::Utf8, false));
988        cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
989    }
990    RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
991        .map_err(|e| AppError::Internal(e.to_string()))
992}
993
994/// Register an `AmazonS3` object store on the SessionContext (so DataFusion's
995/// `read_parquet("s3://…")` can resolve the URL) and stream the whole
996/// dataset back through `DataFrame::collect`.
997async fn read_s3_parquet(
998    d: &DatasetConfig,
999    ctx: &SessionContext,
1000) -> Result<Vec<RecordBatch>, AppError> {
1001    register_s3_object_store(d, ctx)?;
1002    let df = ctx
1003        .read_parquet(d.source.location.clone(), ParquetReadOptions::default())
1004        .await?;
1005    Ok(df.collect().await?)
1006}
1007
1008/// Open a Delta table (local or S3) and stream every row back as a Vec of
1009/// `RecordBatch`. We materialise eagerly so the rest of the backend can
1010/// treat all datasets uniformly (single in-memory batch + eq-index).
1011async fn read_delta(
1012    d: &DatasetConfig,
1013    opts: HashMap<String, String>,
1014) -> Result<Vec<RecordBatch>, AppError> {
1015    let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1016        AppError::Internal(format!(
1017            "dataset '{}': bad delta location '{}': {e}",
1018            d.name, d.source.location
1019        ))
1020    })?;
1021    let table = deltalake::open_table_with_storage_options(url, opts)
1022        .await
1023        .map_err(|e| {
1024            AppError::Internal(format!(
1025                "dataset '{}': delta open '{}': {e}",
1026                d.name, d.source.location
1027            ))
1028        })?;
1029    let provider = table.table_provider().await.map_err(|e| {
1030        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1031    })?;
1032    // Drive a full scan via a throwaway SessionContext so we end up with
1033    // an in-memory Vec<RecordBatch> the shared materialise path can use.
1034    let scan_ctx = SessionContext::new();
1035    let df = scan_ctx
1036        .read_table(provider)
1037        .map_err(|e| AppError::Internal(format!("dataset '{}': delta read_table: {e}", d.name)))?;
1038    Ok(df.collect().await?)
1039}
1040
1041/// Build the storage-options HashMap that `deltalake::open_table_with_storage_options`
1042/// expects for S3 access. Keys mirror the AWS env-var names; deltalake
1043/// passes them through to object_store internally.
1044fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1045    let creds = d.resolved_creds();
1046    let region = d.resolved_region();
1047    let s3 = d.s3.clone().unwrap_or_default();
1048
1049    let mut opts = HashMap::new();
1050    opts.insert("AWS_REGION".into(), region);
1051    if let Some(ep) = s3.endpoint.as_deref().filter(|s| !s.is_empty()) {
1052        opts.insert("AWS_ENDPOINT_URL".into(), ep.to_string());
1053    }
1054    if s3.allow_http {
1055        opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1056    }
1057    opts.insert(
1058        "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1059        (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1060    );
1061    if let Some(k) = creds.access_key_id {
1062        opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1063    }
1064    if let Some(s) = creds.secret_access_key {
1065        opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1066    }
1067    if let Some(t) = creds.session_token {
1068        opts.insert("AWS_SESSION_TOKEN".into(), t);
1069    }
1070    // Read-only paths don't need the S3 lock-provider plumbing.
1071    opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1072    Ok(opts)
1073}
1074
1075/// Construct an `AmazonS3` object_store from the dataset's `[dataset.s3]`
1076/// block + resolved credentials and register it on `ctx` under
1077/// `s3://bucket/`.
1078fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1079    let (bucket, _key) = d.source.s3_bucket()?;
1080    let creds = d.resolved_creds();
1081    let region = d.resolved_region();
1082    let s3 = d.s3.clone().unwrap_or_default();
1083
1084    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1085        AppError::Internal(format!(
1086            "dataset '{}': build S3 store for '{bucket}': {e}",
1087            d.name
1088        ))
1089    })?;
1090
1091    let url = Url::parse(&format!("s3://{bucket}"))
1092        .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1093    ctx.register_object_store(&url, Arc::new(store));
1094    Ok(())
1095}
1096
1097fn build_s3(
1098    bucket: &str,
1099    region: &str,
1100    s3: &S3Config,
1101    creds: &ResolvedCreds,
1102) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1103    let mut b = AmazonS3Builder::new()
1104        .with_bucket_name(bucket)
1105        .with_region(region)
1106        .with_allow_http(s3.allow_http)
1107        .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1108    if let Some(ep) = s3.endpoint.as_deref().filter(|s| !s.is_empty()) {
1109        b = b.with_endpoint(ep);
1110    }
1111    if let Some(k) = creds.access_key_id.as_deref() {
1112        b = b.with_access_key_id(k);
1113    }
1114    if let Some(s) = creds.secret_access_key.as_deref() {
1115        b = b.with_secret_access_key(s);
1116    }
1117    if let Some(t) = creds.session_token.as_deref() {
1118        b = b.with_token(t);
1119    }
1120    b.build()
1121}
1122
1123fn arrow_to_logical(dt: &DataType) -> LogicalType {
1124    match dt {
1125        DataType::Boolean => LogicalType::Bool,
1126        DataType::Int8
1127        | DataType::Int16
1128        | DataType::Int32
1129        | DataType::Int64
1130        | DataType::UInt8
1131        | DataType::UInt16
1132        | DataType::UInt32
1133        | DataType::UInt64 => LogicalType::Int,
1134        DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1135        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1136        // Dictionary-encoded strings are reported as plain strings — clients
1137        // (and the rest of the backend) shouldn't have to care that we keep
1138        // a compressed representation in memory.
1139        DataType::Dictionary(_, v)
1140            if matches!(
1141                v.as_ref(),
1142                DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1143            ) =>
1144        {
1145            LogicalType::Utf8
1146        }
1147        DataType::Date32
1148        | DataType::Date64
1149        | DataType::Time32(_)
1150        | DataType::Time64(_)
1151        | DataType::Timestamp(_, _)
1152        | DataType::Duration(_)
1153        | DataType::Interval(_) => LogicalType::Temporal,
1154        _ => LogicalType::Other,
1155    }
1156}
1157
1158// ---------------------------------------------------------------------------
1159// Per-batch projection
1160// ---------------------------------------------------------------------------
1161
1162fn project(
1163    schema: &DatasetSchema,
1164    batch: RecordBatch,
1165    columns: &[String],
1166) -> Result<RecordBatch, AppError> {
1167    if columns.is_empty() {
1168        return Ok(batch);
1169    }
1170    let indices: Vec<usize> = columns
1171        .iter()
1172        .map(|c| {
1173            schema
1174                .find(c)
1175                .map(|info| schema.by_name[&info.name.to_lowercase()])
1176        })
1177        .collect::<Result<_, _>>()?;
1178    let fields: Vec<Field> = indices
1179        .iter()
1180        .map(|&i| batch.schema().field(i).clone())
1181        .collect();
1182    let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1183    Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1184}
1185
1186// ---------------------------------------------------------------------------
1187// SQL builder
1188// ---------------------------------------------------------------------------
1189
1190/// Accumulates the typed literal values for a parameterised query.
1191///
1192/// Predicate values are never interpolated into the SQL text. Instead each
1193/// value is pushed here and the builder emits a positional placeholder
1194/// (`$1`, `$2`, …) referencing it. The collected [`ScalarValue`]s are bound
1195/// to the logical plan via [`DataFrame::with_param_values`], so user input
1196/// reaches the engine as typed scalars and can never alter the query
1197/// structure (no SQL injection surface, no escaping to get wrong).
1198#[derive(Default)]
1199struct Params {
1200    values: Vec<ScalarValue>,
1201}
1202
1203impl Params {
1204    fn new() -> Self {
1205        Self::default()
1206    }
1207
1208    /// Bind `v` and return its `$N` placeholder token.
1209    fn bind(&mut self, v: ScalarValue) -> String {
1210        self.values.push(v);
1211        format!("${}", self.values.len())
1212    }
1213
1214    fn into_values(self) -> Vec<ScalarValue> {
1215        self.values
1216    }
1217}
1218
1219fn build_query_sql(
1220    schema: &DatasetSchema,
1221    req: &QueryRequest,
1222    max_page_size: u64,
1223) -> Result<(String, Vec<ScalarValue>), AppError> {
1224    let (limit, offset) = req.effective_limit_offset(max_page_size);
1225    build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1226}
1227
1228fn build_query_stream_sql(
1229    schema: &DatasetSchema,
1230    req: &QueryRequest,
1231) -> Result<(String, Vec<ScalarValue>), AppError> {
1232    let suffix = req
1233        .limit
1234        .map(|limit| format!(" LIMIT {limit}"))
1235        .unwrap_or_default();
1236    build_query_sql_with_suffix(schema, req, &suffix)
1237}
1238
1239fn build_query_sql_with_suffix(
1240    schema: &DatasetSchema,
1241    req: &QueryRequest,
1242    suffix: &str,
1243) -> Result<(String, Vec<ScalarValue>), AppError> {
1244    let agg_plan = req.agg_plan(schema)?;
1245
1246    let cols = if let Some(plan) = &agg_plan {
1247        // Group cols, then aggregations, each aliased to the JSON output key.
1248        let mut parts: Vec<String> = plan
1249            .group_cols
1250            .iter()
1251            .map(|c| DatasetSchema::quote_ident(c))
1252            .collect();
1253        for a in &plan.aggs {
1254            let expr = a.sql_expr()?;
1255            parts.push(format!(
1256                "{expr} AS {}",
1257                DatasetSchema::quote_ident(&a.alias)
1258            ));
1259        }
1260        parts.join(", ")
1261    } else if req.columns.is_empty() {
1262        if req.distinct {
1263            "DISTINCT *".to_string()
1264        } else {
1265            "*".to_string()
1266        }
1267    } else {
1268        let list = req
1269            .columns
1270            .iter()
1271            .map(|c| {
1272                schema
1273                    .find(c)
1274                    .map(|info| DatasetSchema::quote_ident(&info.name))
1275            })
1276            .collect::<Result<Vec<_>, _>>()?
1277            .join(", ");
1278        if req.distinct {
1279            format!("DISTINCT {list}")
1280        } else {
1281            list
1282        }
1283    };
1284
1285    let mut params = Params::new();
1286    let clauses: Vec<String> = req
1287        .predicates
1288        .iter()
1289        .map(|p| pred_to_sql(schema, p, &mut params))
1290        .collect::<Result<_, _>>()?;
1291
1292    let table = DatasetSchema::quote_ident(&schema.name);
1293    let where_clause = if clauses.is_empty() {
1294        String::new()
1295    } else {
1296        format!(" WHERE {}", clauses.join(" AND "))
1297    };
1298    let group_clause = match &agg_plan {
1299        Some(p) => format!(
1300            " GROUP BY {}",
1301            p.group_cols
1302                .iter()
1303                .map(|c| DatasetSchema::quote_ident(c))
1304                .collect::<Vec<_>>()
1305                .join(", "),
1306        ),
1307        None => String::new(),
1308    };
1309    let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
1310        Some(s) => format!(" ORDER BY {s}"),
1311        None => String::new(),
1312    };
1313    let sql =
1314        format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{order_clause}{suffix}");
1315    Ok((sql, params.into_values()))
1316}
1317
1318fn build_count_sql(
1319    schema: &DatasetSchema,
1320    predicates: &[Predicate],
1321) -> Result<(String, Vec<ScalarValue>), AppError> {
1322    let mut params = Params::new();
1323    let clauses: Vec<String> = predicates
1324        .iter()
1325        .map(|p| pred_to_sql(schema, p, &mut params))
1326        .collect::<Result<_, _>>()?;
1327    let table = DatasetSchema::quote_ident(&schema.name);
1328    let where_clause = if clauses.is_empty() {
1329        String::new()
1330    } else {
1331        format!(" WHERE {}", clauses.join(" AND "))
1332    };
1333    let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
1334    Ok((sql, params.into_values()))
1335}
1336
1337fn pred_to_sql(
1338    schema: &DatasetSchema,
1339    pred: &Predicate,
1340    params: &mut Params,
1341) -> Result<String, AppError> {
1342    let info = schema.find(&pred.col)?;
1343    let col = DatasetSchema::quote_ident(&info.name);
1344
1345    match pred.op.as_str() {
1346        "is_null" => return Ok(format!("{col} IS NULL")),
1347        "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
1348        _ => {}
1349    }
1350
1351    let val = pred
1352        .val
1353        .as_ref()
1354        .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
1355
1356    if pred.op == "in" {
1357        let items = val
1358            .as_array()
1359            .filter(|a| !a.is_empty())
1360            .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
1361        let placeholders: Vec<String> = items
1362            .iter()
1363            .map(|item| Ok(params.bind(json_to_scalar(item)?)))
1364            .collect::<Result<_, AppError>>()?;
1365        return Ok(format!("{col} IN ({})", placeholders.join(", ")));
1366    }
1367
1368    let sql_op = match pred.op.as_str() {
1369        "eq" => "=",
1370        "neq" => "!=",
1371        "gt" => ">",
1372        "gte" => ">=",
1373        "lt" => "<",
1374        "lte" => "<=",
1375        "like" => "LIKE",
1376        "ilike" => "ILIKE",
1377        other => return Err(AppError::UnknownOperator(other.into())),
1378    };
1379    let placeholder = params.bind(json_to_scalar(val)?);
1380    Ok(format!("{col} {sql_op} {placeholder}"))
1381}
1382
1383/// Convert a JSON predicate value into a typed Arrow [`ScalarValue`] for
1384/// binding as a query parameter. The engine applies the usual numeric
1385/// widening / comparison coercion against the target column type.
1386fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
1387    match val {
1388        JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
1389        JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
1390        JsonValue::Null => Ok(ScalarValue::Null),
1391        JsonValue::Number(n) => {
1392            if let Some(i) = n.as_i64() {
1393                Ok(ScalarValue::Int64(Some(i)))
1394            } else if let Some(u) = n.as_u64() {
1395                Ok(ScalarValue::UInt64(Some(u)))
1396            } else if let Some(f) = n.as_f64() {
1397                Ok(ScalarValue::Float64(Some(f)))
1398            } else {
1399                Err(AppError::InvalidValue(
1400                    "unsupported numeric literal in predicate".into(),
1401                ))
1402            }
1403        }
1404        _ => Err(AppError::InvalidValue(
1405            "unsupported literal type in predicate".into(),
1406        )),
1407    }
1408}
1409
1410// ---------------------------------------------------------------------------
1411// Equality index — built once at startup, queried on every predicate request
1412// ---------------------------------------------------------------------------
1413
1414fn json_index_key(val: &JsonValue) -> Option<String> {
1415    match val {
1416        JsonValue::String(s) => Some(s.clone()),
1417        JsonValue::Number(n) => Some(n.to_string()),
1418        JsonValue::Bool(b) => Some(b.to_string()),
1419        _ => None,
1420    }
1421}
1422
1423fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1424    let mut out = Vec::new();
1425    let (mut i, mut j) = (0, 0);
1426    while i < a.len() && j < b.len() {
1427        match a[i].cmp(&b[j]) {
1428            Ordering::Equal => {
1429                out.push(a[i]);
1430                i += 1;
1431                j += 1;
1432            }
1433            Ordering::Less => i += 1,
1434            Ordering::Greater => j += 1,
1435        }
1436    }
1437    out
1438}
1439
1440fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1441    let mut out = Vec::with_capacity(a.len() + b.len());
1442    let (mut i, mut j) = (0, 0);
1443    while i < a.len() && j < b.len() {
1444        match a[i].cmp(&b[j]) {
1445            Ordering::Less => {
1446                out.push(a[i]);
1447                i += 1;
1448            }
1449            Ordering::Greater => {
1450                out.push(b[j]);
1451                j += 1;
1452            }
1453            Ordering::Equal => {
1454                out.push(a[i]);
1455                i += 1;
1456                j += 1;
1457            }
1458        }
1459    }
1460    out.extend_from_slice(&a[i..]);
1461    out.extend_from_slice(&b[j..]);
1462    out
1463}
1464
1465fn try_index(index: &EqIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
1466    if predicates.is_empty() || index.is_empty() {
1467        return None;
1468    }
1469
1470    let mut result: Option<Vec<u32>> = None;
1471    for pred in predicates {
1472        let col_lower = pred.col.to_lowercase();
1473        let col_map = index.get(&col_lower)?;
1474
1475        let rows: Vec<u32> = match pred.op.as_str() {
1476            "eq" => {
1477                let key = json_index_key(pred.val.as_ref()?)?;
1478                col_map.get(&key).cloned().unwrap_or_default()
1479            }
1480            "in" => {
1481                let items = pred.val.as_ref()?.as_array()?;
1482                let mut merged: Vec<u32> = Vec::new();
1483                for item in items {
1484                    if let Some(r) = col_map.get(&json_index_key(item)?) {
1485                        merged = union_sorted(&merged, r);
1486                    }
1487                }
1488                merged
1489            }
1490            _ => return None,
1491        };
1492
1493        result = Some(match result {
1494            None => rows,
1495            Some(r) => intersect_sorted(&r, &rows),
1496        });
1497    }
1498    result
1499}
1500
1501/// Return rows `[offset, offset+limit)` from a chunked dataset by slicing
1502/// the underlying batches (zero-copy) and concatenating the (small) page.
1503fn slice_global(
1504    chunks: &[RecordBatch],
1505    schema: &Arc<Schema>,
1506    offset: usize,
1507    limit: usize,
1508) -> Result<RecordBatch, AppError> {
1509    if limit == 0 || chunks.is_empty() {
1510        return Ok(RecordBatch::new_empty(schema.clone()));
1511    }
1512    let mut out = Vec::new();
1513    let mut to_skip = offset;
1514    let mut remaining = limit;
1515    for b in chunks {
1516        if remaining == 0 {
1517            break;
1518        }
1519        let n = b.num_rows();
1520        if to_skip >= n {
1521            to_skip -= n;
1522            continue;
1523        }
1524        let take = remaining.min(n - to_skip);
1525        out.push(b.slice(to_skip, take));
1526        to_skip = 0;
1527        remaining -= take;
1528    }
1529    if out.is_empty() {
1530        return Ok(RecordBatch::new_empty(schema.clone()));
1531    }
1532    compute::concat_batches(schema, out.iter()).map_err(AppError::from)
1533}
1534
1535/// Materialise the page `rows[offset..offset+limit]` from a chunked dataset.
1536/// Row ids are global (across the concatenation of all chunks). We map each
1537/// requested row to its (chunk, local-index), `take` per chunk, then stitch
1538/// the per-chunk results back together preserving the original row order.
1539fn take_page(
1540    chunks: &[RecordBatch],
1541    schema: &Arc<Schema>,
1542    rows: &[u32],
1543    offset: usize,
1544    limit: usize,
1545) -> Result<RecordBatch, AppError> {
1546    let start = offset.min(rows.len());
1547    let len = limit.min(rows.len() - start);
1548    if len == 0 || chunks.is_empty() {
1549        return Ok(RecordBatch::new_empty(schema.clone()));
1550    }
1551
1552    // Prefix-sum table: `offsets[i]` is the first global row id of chunk `i`,
1553    // and `offsets.last()` is the total row count.
1554    let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
1555    let mut acc: u32 = 0;
1556    offsets.push(0);
1557    for b in chunks {
1558        acc = acc
1559            .checked_add(b.num_rows() as u32)
1560            .expect("row count exceeds u32::MAX");
1561        offsets.push(acc);
1562    }
1563
1564    // Bucket each global row id into the chunk that contains it, remembering
1565    // the original output position so we can restore page order at the end.
1566    let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
1567    for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
1568        let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
1569        let local = gid - offsets[bi];
1570        buckets[bi].push((out_pos as u32, local));
1571    }
1572
1573    // Per-chunk take, recording the destination index for each emitted row.
1574    let mut takens: Vec<RecordBatch> = Vec::new();
1575    let mut dest: Vec<u32> = Vec::with_capacity(len);
1576    for (bi, bucket) in buckets.iter().enumerate() {
1577        if bucket.is_empty() {
1578            continue;
1579        }
1580        let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
1581        let cols: Vec<ArrayRef> = chunks[bi]
1582            .columns()
1583            .iter()
1584            .map(|c| {
1585                arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
1586                    .map_err(AppError::from)
1587            })
1588            .collect::<Result<_, _>>()?;
1589        takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
1590        dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
1591    }
1592
1593    // Stitch per-chunk results then permute to restore the requested order.
1594    let stitched = compute::concat_batches(schema, takens.iter())?;
1595    let mut inv = vec![0u32; len];
1596    for (i, &d) in dest.iter().enumerate() {
1597        inv[d as usize] = i as u32;
1598    }
1599    let perm = UInt32Array::from(inv);
1600    let cols: Vec<ArrayRef> = stitched
1601        .columns()
1602        .iter()
1603        .map(|c| {
1604            arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
1605                .map_err(AppError::from)
1606        })
1607        .collect::<Result<_, _>>()?;
1608    RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
1609}
1610
1611/// Build the equality index per the dataset's policy, against the chunked
1612/// representation. Row ids are global across the concatenation of all
1613/// chunks (so they remain compatible with `take_page` / `slice_global`).
1614fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
1615    use rayon::prelude::*;
1616
1617    if cfg.mode == IndexMode::None || chunks.is_empty() {
1618        return EqIndex::new();
1619    }
1620
1621    let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
1622        Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
1623    } else {
1624        None
1625    };
1626
1627    let max_card = if cfg.mode == IndexMode::Auto {
1628        Some(cfg.max_cardinality)
1629    } else {
1630        None
1631    };
1632
1633    // Per-chunk starting global row id.
1634    let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
1635    let mut acc: u32 = 0;
1636    for b in chunks {
1637        batch_offsets.push(acc);
1638        acc = acc
1639            .checked_add(b.num_rows() as u32)
1640            .expect("row count exceeds u32::MAX");
1641    }
1642
1643    let schema = chunks[0].schema();
1644
1645    schema
1646        .fields()
1647        .par_iter()
1648        .enumerate()
1649        .filter_map(|(ci, field)| {
1650            let col_lower = field.name().to_lowercase();
1651            if let Some(a) = &allow
1652                && !a.contains_key(&col_lower)
1653            {
1654                return None;
1655            }
1656
1657            // Only build for index-friendly types; skip everything else
1658            // up-front so we don't pay the per-chunk dispatch cost.
1659            let dtype = field.data_type();
1660            let dict_utf8 = matches!(dtype,
1661                DataType::Dictionary(k, v)
1662                    if matches!(k.as_ref(), DataType::Int32)
1663                    && matches!(v.as_ref(), DataType::Utf8));
1664            match dtype {
1665                DataType::Utf8
1666                | DataType::Utf8View
1667                | DataType::Boolean
1668                | DataType::Int8
1669                | DataType::Int16
1670                | DataType::Int32
1671                | DataType::Int64 => {}
1672                _ if dict_utf8 => {}
1673                _ => return None,
1674            }
1675
1676            let mut map: HashMap<String, Vec<u32>> = HashMap::new();
1677
1678            for (bi, batch) in chunks.iter().enumerate() {
1679                let base = batch_offsets[bi];
1680                let col = batch.column(ci);
1681
1682                macro_rules! index_col {
1683                    ($arr_ty:ty) => {{
1684                        let arr = col.as_any().downcast_ref::<$arr_ty>()?;
1685                        for row in 0..arr.len() {
1686                            if arr.is_null(row) {
1687                                continue;
1688                            }
1689                            let key = arr.value(row).to_string();
1690                            let gid = base + row as u32;
1691                            if let Some(v) = map.get_mut(&key) {
1692                                v.push(gid);
1693                            } else {
1694                                if let Some(mc) = max_card {
1695                                    if map.len() >= mc {
1696                                        return None;
1697                                    }
1698                                }
1699                                map.insert(key, vec![gid]);
1700                            }
1701                        }
1702                    }};
1703                }
1704
1705                if dict_utf8 {
1706                    // Dictionary(Int32, Utf8): iterate keys + look up the
1707                    // string value from the (small) dictionary. We allocate
1708                    // the key string only when the value is new — repeated
1709                    // values reuse the existing HashMap entry by hash, but
1710                    // `HashMap::get_mut` still needs the key, so we use a
1711                    // borrowed lookup via `get` first to avoid the alloc.
1712                    let arr = col
1713                        .as_any()
1714                        .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
1715                        )?;
1716                    let keys = arr.keys();
1717                    let values = arr.values().as_any().downcast_ref::<StringArray>()?;
1718                    for row in 0..arr.len() {
1719                        if arr.is_null(row) {
1720                            continue;
1721                        }
1722                        let k = keys.value(row) as usize;
1723                        let s = values.value(k);
1724                        let gid = base + row as u32;
1725                        if let Some(v) = map.get_mut(s) {
1726                            v.push(gid);
1727                        } else {
1728                            if let Some(mc) = max_card
1729                                && map.len() >= mc
1730                            {
1731                                return None;
1732                            }
1733                            map.insert(s.to_string(), vec![gid]);
1734                        }
1735                    }
1736                } else {
1737                    match dtype {
1738                        DataType::Utf8 => index_col!(StringArray),
1739                        DataType::Utf8View => index_col!(StringViewArray),
1740                        DataType::Boolean => index_col!(BooleanArray),
1741                        DataType::Int8 => index_col!(Int8Array),
1742                        DataType::Int16 => index_col!(Int16Array),
1743                        DataType::Int32 => index_col!(Int32Array),
1744                        DataType::Int64 => index_col!(Int64Array),
1745                        _ => unreachable!(),
1746                    }
1747                }
1748            }
1749
1750            Some((col_lower, map))
1751        })
1752        .collect()
1753}
1754
1755// ---------------------------------------------------------------------------
1756// Serialise-time temporal cast: convert Timestamp/Date/Time columns to Utf8
1757// on the page batch right before JSON encoding. We deliberately do **not**
1758// pay this cost at load time — a `Date32` is 4 bytes per row, its ISO-8601
1759// rendering is ~10–24 bytes per row, and a wide dataset full of temporal
1760// columns would balloon resident RAM. The cast is applied per returned page
1761// after pagination, so the cost is paid only for rows the caller requested.
1762// ---------------------------------------------------------------------------
1763
1764/// Returns true for Arrow types that `write_value` can render directly. Any
1765/// type returning false is pre-cast to Utf8 in [`cast_for_serialize`] so the
1766/// JSON output is faithful rather than silently `null`.
1767fn writable_inline(dt: &DataType) -> bool {
1768    match dt {
1769        DataType::Utf8
1770        | DataType::LargeUtf8
1771        | DataType::Utf8View
1772        | DataType::Boolean
1773        | DataType::Int8
1774        | DataType::Int16
1775        | DataType::Int32
1776        | DataType::Int64
1777        | DataType::UInt8
1778        | DataType::UInt16
1779        | DataType::UInt32
1780        | DataType::UInt64
1781        | DataType::Float32
1782        | DataType::Float64
1783        | DataType::Decimal128(_, _)
1784        | DataType::Decimal256(_, _) => true,
1785        DataType::Dictionary(k, v)
1786            if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
1787        {
1788            true
1789        }
1790        _ => false,
1791    }
1792}
1793
1794/// Cast any column whose dtype isn't directly writable by `write_value` to
1795/// `Utf8`, on the bounded page batch. Covers temporals (Timestamp/Date/Time)
1796/// — kept native in resident memory to save RAM — and also any exotic dtype
1797/// (Float16, Binary, List, Struct, Decimal-with-unsupported-precision, …)
1798/// so the JSON serializer never falls back to writing literal `null`.
1799fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
1800    let schema = batch.schema();
1801    let to_cast: Vec<usize> = schema
1802        .fields()
1803        .iter()
1804        .enumerate()
1805        .filter_map(|(i, f)| {
1806            if writable_inline(f.data_type()) {
1807                None
1808            } else {
1809                Some(i)
1810            }
1811        })
1812        .collect();
1813    if to_cast.is_empty() {
1814        return Ok(batch.clone());
1815    }
1816    let new_fields: Vec<Field> = schema
1817        .fields()
1818        .iter()
1819        .enumerate()
1820        .map(|(i, f)| {
1821            if to_cast.contains(&i) {
1822                Field::new(f.name(), DataType::Utf8, f.is_nullable())
1823            } else {
1824                f.as_ref().clone()
1825            }
1826        })
1827        .collect();
1828    let new_schema = Arc::new(Schema::new(new_fields));
1829    let cols: Vec<ArrayRef> = batch
1830        .columns()
1831        .iter()
1832        .enumerate()
1833        .map(|(i, c)| {
1834            if to_cast.contains(&i) {
1835                compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
1836            } else {
1837                Ok(c.clone())
1838            }
1839        })
1840        .collect::<Result<_, _>>()?;
1841    RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
1842}
1843
1844// ---------------------------------------------------------------------------
1845// Compute helpers — retained for symmetry; reserved for future inline scan
1846// path. Currently the engine fallback handles all non-index queries.
1847// ---------------------------------------------------------------------------
1848
1849#[allow(dead_code)]
1850#[derive(Clone, Copy)]
1851enum CmpOp {
1852    Eq,
1853    Neq,
1854    Gt,
1855    Gte,
1856    Lt,
1857    Lte,
1858    Like,
1859    ILike,
1860}
1861
1862#[allow(dead_code)]
1863fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
1864    let arr = col
1865        .as_any()
1866        .downcast_ref::<StringArray>()
1867        .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
1868    let s = Scalar::new(StringArray::from(vec![val]));
1869    Ok(eq(arr, &s)?)
1870}
1871
1872#[allow(dead_code)]
1873fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
1874    macro_rules! num_cmp {
1875        ($arr_type:ty, $cast:ty) => {{
1876            let n = val
1877                .as_f64()
1878                .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
1879                as $cast;
1880            let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
1881            let s = Scalar::new(<$arr_type>::from(vec![n]));
1882            Ok(match op {
1883                CmpOp::Eq => eq(arr, &s)?,
1884                CmpOp::Neq => neq(arr, &s)?,
1885                CmpOp::Gt => gt(arr, &s)?,
1886                CmpOp::Gte => gt_eq(arr, &s)?,
1887                CmpOp::Lt => lt(arr, &s)?,
1888                CmpOp::Lte => lt_eq(arr, &s)?,
1889                CmpOp::Like | CmpOp::ILike => {
1890                    return Err(AppError::InvalidValue(
1891                        "LIKE requires a string column".into(),
1892                    ));
1893                }
1894            })
1895        }};
1896    }
1897    match col.data_type() {
1898        DataType::Utf8 => {
1899            let s = val
1900                .as_str()
1901                .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
1902            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1903            let sc = Scalar::new(StringArray::from(vec![s]));
1904            Ok(match op {
1905                CmpOp::Eq => eq(arr, &sc)?,
1906                CmpOp::Neq => neq(arr, &sc)?,
1907                CmpOp::Gt => gt(arr, &sc)?,
1908                CmpOp::Gte => gt_eq(arr, &sc)?,
1909                CmpOp::Lt => lt(arr, &sc)?,
1910                CmpOp::Lte => lt_eq(arr, &sc)?,
1911                CmpOp::Like => compute::like(arr, &sc)?,
1912                CmpOp::ILike => compute::ilike(arr, &sc)?,
1913            })
1914        }
1915        DataType::Int8 => num_cmp!(Int8Array, i8),
1916        DataType::Int16 => num_cmp!(Int16Array, i16),
1917        DataType::Int32 => num_cmp!(Int32Array, i32),
1918        DataType::Int64 => num_cmp!(Int64Array, i64),
1919        DataType::Float32 => num_cmp!(Float32Array, f32),
1920        DataType::Float64 => num_cmp!(Float64Array, f64),
1921        dt => Err(AppError::InvalidValue(format!(
1922            "unsupported type for comparison: {dt:?}"
1923        ))),
1924    }
1925}
1926
1927// ---------------------------------------------------------------------------
1928// Serialisation
1929// ---------------------------------------------------------------------------
1930
1931pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
1932    // Temporal columns are kept native in resident memory (compact). Cast
1933    // them — plus any other dtype `write_value` can't render directly — to
1934    // Utf8 here, on the bounded page batch, so the JSON output is faithful
1935    // without paying the load-time RAM cost.
1936    let batch = cast_for_serialize(batch)?;
1937    let schema = batch.schema();
1938    let n_rows = batch.num_rows();
1939
1940    let keys: Vec<Vec<u8>> = schema
1941        .fields()
1942        .iter()
1943        .map(|f| {
1944            let mut k = Vec::with_capacity(f.name().len() + 3);
1945            k.push(b'"');
1946            k.extend_from_slice(f.name().as_bytes());
1947            k.extend_from_slice(b"\":");
1948            k
1949        })
1950        .collect();
1951
1952    let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
1953    buf.push(b'[');
1954
1955    for row in 0..n_rows {
1956        if row > 0 {
1957            buf.push(b',');
1958        }
1959        buf.push(b'{');
1960        for (i, key) in keys.iter().enumerate() {
1961            if i > 0 {
1962                buf.push(b',');
1963            }
1964            buf.extend_from_slice(key);
1965            let col = batch.column(i);
1966            if col.is_null(row) {
1967                buf.extend_from_slice(b"null");
1968            } else {
1969                write_value(&mut buf, col.as_ref(), row);
1970            }
1971        }
1972        buf.push(b'}');
1973    }
1974
1975    buf.push(b']');
1976    Ok(unsafe { String::from_utf8_unchecked(buf) })
1977}
1978
1979#[inline]
1980fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
1981    match col.data_type() {
1982        DataType::Utf8 => write_str(
1983            buf,
1984            col.as_any()
1985                .downcast_ref::<StringArray>()
1986                .unwrap()
1987                .value(row),
1988        ),
1989        DataType::LargeUtf8 => write_str(
1990            buf,
1991            col.as_any()
1992                .downcast_ref::<LargeStringArray>()
1993                .unwrap()
1994                .value(row),
1995        ),
1996        DataType::Utf8View => write_str(
1997            buf,
1998            col.as_any()
1999                .downcast_ref::<StringViewArray>()
2000                .unwrap()
2001                .value(row),
2002        ),
2003        DataType::Dictionary(key, value)
2004            if matches!(key.as_ref(), DataType::Int32)
2005                && matches!(value.as_ref(), DataType::Utf8) =>
2006        {
2007            let dict = col
2008                .as_any()
2009                .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2010                .unwrap();
2011            let keys = dict.keys();
2012            let values = dict
2013                .values()
2014                .as_any()
2015                .downcast_ref::<StringArray>()
2016                .unwrap();
2017            let k = keys.value(row) as usize;
2018            write_str(buf, values.value(k));
2019        }
2020        DataType::Boolean => {
2021            let v = col
2022                .as_any()
2023                .downcast_ref::<BooleanArray>()
2024                .unwrap()
2025                .value(row);
2026            buf.extend_from_slice(if v { b"true" } else { b"false" });
2027        }
2028        DataType::Int8 => {
2029            let mut b = itoa::Buffer::new();
2030            buf.extend_from_slice(
2031                b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
2032                    .as_bytes(),
2033            );
2034        }
2035        DataType::Int16 => {
2036            let mut b = itoa::Buffer::new();
2037            buf.extend_from_slice(
2038                b.format(
2039                    col.as_any()
2040                        .downcast_ref::<Int16Array>()
2041                        .unwrap()
2042                        .value(row),
2043                )
2044                .as_bytes(),
2045            );
2046        }
2047        DataType::Int32 => {
2048            let mut b = itoa::Buffer::new();
2049            buf.extend_from_slice(
2050                b.format(
2051                    col.as_any()
2052                        .downcast_ref::<Int32Array>()
2053                        .unwrap()
2054                        .value(row),
2055                )
2056                .as_bytes(),
2057            );
2058        }
2059        DataType::Int64 => {
2060            let mut b = itoa::Buffer::new();
2061            buf.extend_from_slice(
2062                b.format(
2063                    col.as_any()
2064                        .downcast_ref::<Int64Array>()
2065                        .unwrap()
2066                        .value(row),
2067                )
2068                .as_bytes(),
2069            );
2070        }
2071        DataType::UInt8 => {
2072            let mut b = itoa::Buffer::new();
2073            buf.extend_from_slice(
2074                b.format(
2075                    col.as_any()
2076                        .downcast_ref::<UInt8Array>()
2077                        .unwrap()
2078                        .value(row),
2079                )
2080                .as_bytes(),
2081            );
2082        }
2083        DataType::UInt16 => {
2084            let mut b = itoa::Buffer::new();
2085            buf.extend_from_slice(
2086                b.format(
2087                    col.as_any()
2088                        .downcast_ref::<UInt16Array>()
2089                        .unwrap()
2090                        .value(row),
2091                )
2092                .as_bytes(),
2093            );
2094        }
2095        DataType::UInt32 => {
2096            let mut b = itoa::Buffer::new();
2097            buf.extend_from_slice(
2098                b.format(
2099                    col.as_any()
2100                        .downcast_ref::<UInt32Array>()
2101                        .unwrap()
2102                        .value(row),
2103                )
2104                .as_bytes(),
2105            );
2106        }
2107        DataType::UInt64 => {
2108            let mut b = itoa::Buffer::new();
2109            buf.extend_from_slice(
2110                b.format(
2111                    col.as_any()
2112                        .downcast_ref::<UInt64Array>()
2113                        .unwrap()
2114                        .value(row),
2115                )
2116                .as_bytes(),
2117            );
2118        }
2119        DataType::Decimal128(_, _) => {
2120            let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
2121            write_str(buf, &arr.value_as_string(row));
2122        }
2123        DataType::Decimal256(_, _) => {
2124            let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
2125            write_str(buf, &arr.value_as_string(row));
2126        }
2127        DataType::Float32 => {
2128            let v = col
2129                .as_any()
2130                .downcast_ref::<Float32Array>()
2131                .unwrap()
2132                .value(row);
2133            if v.is_finite() {
2134                let mut b = ryu::Buffer::new();
2135                buf.extend_from_slice(b.format_finite(v).as_bytes());
2136            } else {
2137                buf.extend_from_slice(b"null");
2138            }
2139        }
2140        DataType::Float64 => {
2141            let v = col
2142                .as_any()
2143                .downcast_ref::<Float64Array>()
2144                .unwrap()
2145                .value(row);
2146            if v.is_finite() {
2147                let mut b = ryu::Buffer::new();
2148                buf.extend_from_slice(b.format_finite(v).as_bytes());
2149            } else {
2150                buf.extend_from_slice(b"null");
2151            }
2152        }
2153        // Any dtype not handled above must have been pre-cast to Utf8 by
2154        // `cast_for_serialize`. Hitting this arm is a bug — surface it as a
2155        // visible JSON string rather than a silent null so it can't be
2156        // mistaken for a real NULL value.
2157        other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
2158    }
2159}
2160
2161#[inline]
2162fn write_str(buf: &mut Vec<u8>, s: &str) {
2163    buf.push(b'"');
2164    for &byte in s.as_bytes() {
2165        match byte {
2166            b'"' => buf.extend_from_slice(b"\\\""),
2167            b'\\' => buf.extend_from_slice(b"\\\\"),
2168            b'\n' => buf.extend_from_slice(b"\\n"),
2169            b'\r' => buf.extend_from_slice(b"\\r"),
2170            b'\t' => buf.extend_from_slice(b"\\t"),
2171            0x00..=0x1f => {
2172                buf.extend_from_slice(b"\\u00");
2173                const HEX: &[u8] = b"0123456789abcdef";
2174                buf.push(HEX[(byte >> 4) as usize]);
2175                buf.push(HEX[(byte & 0xf) as usize]);
2176            }
2177            b => buf.push(b),
2178        }
2179    }
2180    buf.push(b'"');
2181}
2182
2183// ---------------------------------------------------------------------------
2184// Backend trait impl — wires the store into the generic core handlers.
2185// ---------------------------------------------------------------------------
2186
2187#[async_trait]
2188impl Backend for Store {
2189    fn names(&self) -> Vec<String> {
2190        Store::names(self)
2191    }
2192
2193    fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
2194        let st = self.dataset(name)?;
2195        Ok(DatasetSummary {
2196            name: st.schema.name.clone(),
2197            columns: st.schema.columns.len(),
2198            rows: st.num_rows(),
2199        })
2200    }
2201
2202    fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
2203        let st = self.dataset(name)?;
2204        Ok(Arc::new(st.schema.clone()))
2205    }
2206
2207    fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
2208        let st = self.dataset(name)?;
2209        // Report indexed columns in the dataset's declared schema order
2210        // so the `/schema` response is deterministic.
2211        let mut cols: Vec<String> = st
2212            .schema
2213            .columns
2214            .iter()
2215            .map(|c| c.name.clone())
2216            .filter(|n| st.index.contains_key(n))
2217            .collect();
2218        // Any indexed columns not in `schema.columns` (shouldn't happen,
2219        // but be defensive) get appended sorted.
2220        let mut extras: Vec<String> = st
2221            .index
2222            .keys()
2223            .filter(|n| !cols.iter().any(|c| c == *n))
2224            .cloned()
2225            .collect();
2226        extras.sort();
2227        cols.extend(extras);
2228        Ok(cols)
2229    }
2230
2231    async fn sample(&self, name: &str) -> Result<String, AppError> {
2232        Store::sample(self, name).await
2233    }
2234
2235    async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
2236        Store::query(self, name, req).await
2237    }
2238
2239    async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
2240        Store::query_arrow(self, name, req).await
2241    }
2242
2243    async fn query_arrow_stream(
2244        &self,
2245        name: &str,
2246        req: &QueryRequest,
2247    ) -> Result<ArrowIpcStream, AppError> {
2248        Store::query_arrow_stream(self, name, req).await
2249    }
2250
2251    async fn query_arrow_stream_all(
2252        &self,
2253        name: &str,
2254        req: &QueryRequest,
2255    ) -> Result<ArrowIpcStream, AppError> {
2256        Store::query_arrow_stream_all(self, name, req).await
2257    }
2258
2259    async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
2260        Store::count(self, name, req).await
2261    }
2262
2263    async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
2264        Store::reload(self, name).await
2265    }
2266}