Skip to main content

datapress_datafusion/
store.rs

1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use arc_swap::ArcSwap;
8use arrow::array::{
9    Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
10    Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
11    StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
12};
13use arrow::compute;
14use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
15use arrow::datatypes::{DataType, Field, Schema};
16use async_trait::async_trait;
17use parquet::arrow::ProjectionMask;
18use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
19use serde_json::Value as JsonValue;
20
21use datafusion::datasource::file_format::parquet::ParquetFormat;
22use datafusion::datasource::listing::{
23    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
24};
25use datafusion::datasource::{MemTable, TableProvider};
26use datafusion::execution::cache::DefaultListFilesCache;
27use datafusion::execution::cache::cache_manager::CacheManagerConfig;
28use datafusion::execution::runtime_env::RuntimeEnvBuilder;
29use datafusion::prelude::{SessionConfig, SessionContext};
30use datafusion::scalar::ScalarValue;
31
32use object_store::aws::AmazonS3Builder;
33use url::Url;
34
35use datapress_core::backend::{
36    ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
37};
38use datapress_core::config::{
39    AddressingStyle, AppConfig, DataFusionConfig, DatasetConfig, IndexConfig, IndexMode,
40    Partitioning, ResolvedCreds, S3Config, ServerConfig, SourceKind,
41};
42use datapress_core::errors::AppError;
43use datapress_core::models::{CountRequest, Predicate, QueryRequest};
44use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
45
46// ---------------------------------------------------------------------------
47// Public types
48// ---------------------------------------------------------------------------
49
50/// Hasher-swapped map alias. The default `std::collections::HashMap` uses
51/// SipHash (DoS-resistant but slow). The equality index keys are our own
52/// column values, not untrusted hash-flood input, so we use `ahash` — a much
53/// faster non-cryptographic hasher — on this per-request hot path.
54type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
55
56/// Pre-built equality index: lowercase col name → string-encoded value → sorted row ids.
57type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
58
59/// Per-dataset state: schema metadata, the resident chunks, and the
60/// equality index built per the dataset's `[dataset.index]` policy.
61///
62/// `data` is the dataset as a `Vec<RecordBatch>` — exactly the chunks
63/// produced by the underlying reader, after temporal columns are cast to
64/// `Utf8`. We deliberately do **not** call `concat_batches` to fuse them
65/// into one batch: on wide schemas (hundreds of columns) that transiently
66/// allocates a second full copy of the decoded Arrow data, pushing peak
67/// RSS to ~2× the resident size and OOM-killing the process at startup.
68///
69/// When `lazy` is true the dataset is *not* materialised: `data` is empty,
70/// `index` is empty, and every query is dispatched to DataFusion SQL
71/// against a registered `ListingTable`. `arrow_schema` still carries the
72/// inferred schema so discovery endpoints work.
73pub struct DatasetState {
74    pub schema: DatasetSchema,
75    pub data: Vec<RecordBatch>,
76    pub arrow_schema: Arc<Schema>,
77    pub index: EqIndex,
78    pub lazy: bool,
79}
80
81impl DatasetState {
82    /// Sum of `num_rows()` across all resident chunks. `0` for lazy datasets.
83    pub fn num_rows(&self) -> usize {
84        self.data.iter().map(|b| b.num_rows()).sum()
85    }
86}
87
88/// Multi-dataset registry. Each dataset is registered in the shared
89/// `SessionContext` under its configured name. The per-dataset state is
90/// held behind `ArcSwap` so a reload can atomically replace it without
91/// blocking concurrent queries.
92pub struct Store {
93    ctx: SessionContext,
94    max_page_size: u64,
95    /// Original dataset configs, indexed by name. Reload reads the source
96    /// path from here — clients can't redirect a reload at an arbitrary file.
97    configs: HashMap<String, DatasetConfig>,
98    /// Hot-swappable snapshot of all currently loaded datasets.
99    datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
100    /// Per-name reload mutex. Serialises concurrent reloads of the same
101    /// dataset; reloads of different datasets proceed in parallel.
102    reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
103}
104
105impl Store {
106    /// Load every dataset declared in `cfg`.
107    pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
108        // One-shot init for the deltalake S3 backend. Safe to call more
109        // than once — the handlers are idempotent.
110        if cfg
111            .datasets
112            .iter()
113            .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
114        {
115            deltalake::aws::register_handlers(None);
116        }
117
118        // NOTE: identifier handling for the raw-SQL endpoint is done by
119        // rewriting schema column/table references to quoted canonical
120        // names before execution (see `query_sql`). DataFusion's default
121        // identifier normalization is left ON so unquoted *aliases* and
122        // CTE names stay case-insensitive, matching DuckDB.
123        let ctx = build_tuned_context(&cfg.datafusion);
124        let mut datasets = HashMap::with_capacity(cfg.datasets.len());
125        let mut configs = HashMap::with_capacity(cfg.datasets.len());
126
127        for d in &cfg.datasets {
128            log::info!(
129                "Loading dataset '{}' ({} @ {})",
130                d.name,
131                d.source.kind.as_str(),
132                d.source.location
133            );
134            // Force lazy when the source exceeds the server size threshold.
135            // S3-aware: local sources are stat'd, S3 sources are sized by
136            // listing the object store under their prefix.
137            let d: std::borrow::Cow<'_, DatasetConfig> = match should_force_lazy(d, &cfg.server)
138                .await
139            {
140                Some(bytes) => {
141                    log::info!(
142                        "dataset '{}': {:.1} MiB exceeds force_lazy_above_mb = {} → forcing lazy",
143                        d.name,
144                        bytes as f64 / (1024.0 * 1024.0),
145                        cfg.server.force_lazy_above_mb
146                    );
147                    let mut forced = d.clone();
148                    forced.lazy = true;
149                    std::borrow::Cow::Owned(forced)
150                }
151                None => std::borrow::Cow::Borrowed(d),
152            };
153            let d = d.as_ref();
154            let (state, provider) = match build_dataset(d, &ctx).await {
155                Ok(built) => built,
156                Err(AppError::EmptyDataset(msg)) => {
157                    log::warn!("skipping empty dataset '{}': {msg}", d.name);
158                    continue;
159                }
160                // An S3 source we're not authorized to read (bad creds, no
161                // bucket/prefix policy, expired token) returns a 403. Don't
162                // abort the whole registry for one inaccessible dataset —
163                // log and skip it, exactly like an empty source. The rest of
164                // the datasets still load and serve traffic.
165                Err(e) if d.source.is_s3() && is_s3_access_denied(&e.to_string()) => {
166                    log::warn!(
167                        "skipping dataset '{}': S3 access denied — check credentials \
168                         and bucket policy ({e})",
169                        d.name
170                    );
171                    continue;
172                }
173                Err(e) => return Err(e),
174            };
175            ctx.register_table(d.name.as_str(), provider)?;
176            datasets.insert(d.name.clone(), Arc::new(state));
177            configs.insert(d.name.clone(), d.clone());
178        }
179        Ok(Self {
180            ctx,
181            max_page_size: cfg.server.max_page_size.max(1),
182            configs,
183            datasets: ArcSwap::from_pointee(datasets),
184            reload_locks: Mutex::new(HashMap::new()),
185        })
186    }
187
188    /// Sorted list of dataset names.
189    pub fn names(&self) -> Vec<String> {
190        let snap = self.datasets.load();
191        let mut v: Vec<String> = snap.keys().cloned().collect();
192        v.sort();
193        v
194    }
195
196    pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
197        self.datasets
198            .load()
199            .get(name)
200            .cloned()
201            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
202    }
203
204    /// JSON for the first row of the dataset, or `null` if empty. Used by
205    /// `GET /api/datasets/{name}/schema` for discoverability.
206    pub async fn sample(&self, name: &str) -> Result<String, AppError> {
207        let st = self.dataset(name)?;
208
209        // Lazy datasets have no resident batch — pull one row via SQL.
210        if st.lazy {
211            let table = DatasetSchema::quote_ident(&st.schema.name);
212            let sql = format!("SELECT * FROM {table} LIMIT 1");
213            let df = self.ctx.sql(&sql).await?;
214            let batches = df.collect().await?;
215            if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
216                return Ok("null".into());
217            }
218            let arr = serialize(&batches[0].slice(0, 1))?;
219            let trimmed = arr.trim();
220            let inner = trimmed
221                .strip_prefix('[')
222                .and_then(|s| s.strip_suffix(']'))
223                .unwrap_or(trimmed);
224            return Ok(inner.to_string());
225        }
226
227        let first = match st.data.iter().find(|b| b.num_rows() > 0) {
228            Some(b) => b,
229            None => return Ok("null".into()),
230        };
231        let arr = serialize(&first.slice(0, 1))?;
232        // strip the outer [] to return a single object
233        let trimmed = arr.trim();
234        let inner = trimmed
235            .strip_prefix('[')
236            .and_then(|s| s.strip_suffix(']'))
237            .unwrap_or(trimmed);
238        Ok(inner.to_string())
239    }
240
241    /// Rebuild `name` from disk and atomically swap it in. Concurrent queries
242    /// against the same name continue to see the *old* `Arc<DatasetState>`
243    /// until they finish; the old data is dropped once the last reference
244    /// goes away.
245    pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
246        // 1. Look up the dataset config. Not finding it = 404.
247        let cfg = self
248            .configs
249            .get(name)
250            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
251            .clone();
252
253        // 2. Per-name lock: only one reload of this dataset at a time.
254        let lock = {
255            let mut locks = self.reload_locks.lock().unwrap();
256            locks
257                .entry(name.to_string())
258                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
259                .clone()
260        };
261        let _guard = lock.lock().await;
262
263        let started = std::time::Instant::now();
264
265        // Invalidate the object-store LIST cache before rebuilding. For an
266        // S3 *prefix* source the files are often rewritten with new UUID
267        // names; without dropping the cached listing the rebuilt
268        // `ListingTable` would replay the stale keys and fail with
269        // "object at location … not found". A plain prefix is therefore
270        // re-listed fresh; an explicit filename/glob source is re-resolved
271        // against the same configured pattern. Clearing the whole cache is
272        // cheap — other datasets simply re-list on their next query.
273        //
274        // NOTE: this must clear the cache that actually lives on the
275        // `RuntimeEnv`, not just the one we opt into via
276        // `[datafusion] list_files_cache`. DataFusion ≥ 53 installs a
277        // *default* `DefaultListFilesCache` (1 MiB, infinite TTL) on every
278        // `SessionContext` even when no cache is configured, so the staleness
279        // bites regardless of our flag.
280        if let Some(cache) = self.ctx.runtime_env().cache_manager.get_list_files_cache() {
281            cache.clear();
282        }
283
284        // 3. Heavy lifting (source read + index build). Parquet/delta
285        // readers are themselves async, so we don't wrap in `web::block`.
286        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
287        let rows = state.num_rows();
288
289        // 4. Atomic swap.
290        //   a) Replace the MemTable inside the SessionContext.
291        //   b) ArcSwap a new snapshot map with the updated Arc<DatasetState>.
292        // In-flight queries already hold the old provider + old Arc; they
293        // run to completion. New queries see the new data.
294        let _ = self.ctx.deregister_table(name)?;
295        self.ctx.register_table(name, provider)?;
296
297        let mut new_map = (**self.datasets.load()).clone();
298        new_map.insert(name.to_string(), Arc::new(state));
299        self.datasets.store(Arc::new(new_map));
300
301        let elapsed_ms = started.elapsed().as_millis();
302        log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
303        Ok(ReloadStats { rows, elapsed_ms })
304    }
305
306    /// Run a `QueryRequest` against `name`. Empty predicates → O(1) Arrow
307    /// slice. Otherwise → DataFusion SQL on the single registered table.
308    /// Lazy datasets skip the in-memory hot paths and always dispatch to SQL.
309    pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
310        let batch = self.query_batch(name, req).await?;
311        if batch.num_rows() == 0 {
312            return Ok("[]".to_string());
313        }
314        serialize(&batch)
315    }
316
317    /// Rewrite registered table / column references in a raw-SQL string to
318    /// their canonical, quoted spelling so matching is case-insensitive
319    /// (like DuckDB). Builds the lookup from every currently loaded
320    /// dataset; unknown identifiers (aliases, CTE names) are left for the
321    /// engine's default normalization to handle.
322    fn canonicalize_sql(&self, sql: &str) -> String {
323        let snap = self.datasets.load();
324        let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
325        let mut columns: HashMap<String, String> = HashMap::new();
326        for (name, state) in snap.iter() {
327            tables.insert(name.to_lowercase(), name.clone());
328            for col in &state.schema.columns {
329                columns
330                    .entry(col.name.to_lowercase())
331                    .or_insert_with(|| col.name.clone());
332            }
333        }
334        datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
335    }
336
337    /// Execute a pre-validated raw `SELECT` and return the JSON `data`
338    /// array. The statement has already passed
339    /// [`datapress_core::sql::validate`]; here it is wrapped in an outer
340    /// `LIMIT max_rows` so the result is bounded regardless of the user's
341    /// own clauses, executed through the shared `SessionContext`, and run
342    /// through the same fast JSON encoder as [`Self::query`].
343    pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
344        let cap = max_rows.max(1);
345        let sql = self.canonicalize_sql(sql);
346        // DESCRIBE yields a schema listing and cannot be nested in a
347        // subquery on DataFusion, so run it directly. Its row count is
348        // bounded by the column count and the slice below still enforces
349        // `cap`; everything else is wrapped in an outer LIMIT.
350        let wrapped = if datapress_core::sql::is_describe(&sql) {
351            sql
352        } else {
353            format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
354        };
355        let df = self.ctx.sql(&wrapped).await?;
356        let batches = df.collect().await?;
357        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
358            return Ok("[]".to_string());
359        }
360        let batch = if batches.len() == 1 {
361            batches.into_iter().next().expect("checked len")
362        } else {
363            compute::concat_batches(&batches[0].schema(), batches.iter())?
364        };
365        // Defence in depth: the outer LIMIT already bounds the row count,
366        // but slice anyway so a planning quirk can never blow the cap.
367        let batch = if batch.num_rows() as u64 > cap {
368            batch.slice(0, cap as usize)
369        } else {
370            batch
371        };
372        serialize(&batch)
373    }
374
375    /// Same plan as [`Self::query_sql`], but encode the bounded result as
376    /// an Arrow IPC stream instead of JSON. Backs the Arrow
377    /// content-negotiated branch of `POST /api/v1/sql`.
378    pub async fn query_sql_arrow_stream(
379        &self,
380        sql: &str,
381        max_rows: u64,
382    ) -> Result<ArrowIpcStream, AppError> {
383        let cap = max_rows.max(1);
384        let sql = self.canonicalize_sql(sql);
385        // DESCRIBE cannot be nested in a subquery on DataFusion (see
386        // `query_sql`); run it directly. Its output is bounded by the
387        // column count, so the missing outer LIMIT is harmless.
388        let wrapped = if datapress_core::sql::is_describe(&sql) {
389            sql
390        } else {
391            format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
392        };
393        let df = self.ctx.sql(&wrapped).await?;
394        let batches = df.collect().await?;
395        Ok(stream_arrow_batches(batches))
396    }
397
398    /// Same plan as [`Self::query`], but encode the result page as an
399    /// Arrow IPC stream (one schema message + one batch + EOS). Empty
400    /// results still produce a valid, self-describing zero-batch stream.
401    pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
402        let batch = self.query_batch(name, req).await?;
403        let schema = batch.schema();
404        let mut buf = Vec::with_capacity(8 * 1024);
405        {
406            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
407            if batch.num_rows() > 0 {
408                w.write(&batch)?;
409            }
410            w.finish()?;
411        }
412        Ok(buf)
413    }
414
415    pub async fn query_arrow_stream(
416        &self,
417        name: &str,
418        req: &QueryRequest,
419    ) -> Result<ArrowIpcStream, AppError> {
420        let batches = self.query_batches(name, req).await?;
421        Ok(stream_arrow_batches(batches))
422    }    pub async fn query_arrow_stream_all(
423        &self,
424        name: &str,
425        req: &QueryRequest,
426    ) -> Result<ArrowIpcStream, AppError> {
427        let batches = self.query_batches_all(name, req).await?;
428        Ok(stream_arrow_batches(batches))
429    }
430
431    /// Encode the entire dataset as a single self-contained Parquet file.
432    ///
433    /// Collects every row (all columns, no predicates, no paging) and runs
434    /// it through a single [`parquet::arrow::ArrowWriter`], so the result
435    /// carries the row-group + footer metadata a Parquet reader needs to
436    /// answer `count(*)` straight from the footer. Powers the cached
437    /// `GET /datasets/{name}/parquet` HTTP endpoint.
438    pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
439        // All rows, all columns, no predicates / ordering / limit.
440        let req = QueryRequest {
441            columns: Vec::new(),
442            predicates: Vec::new(),
443            group_by: Vec::new(),
444            aggregations: Vec::new(),
445            having: Vec::new(),
446            distinct: false,
447            order_by: Vec::new(),
448            limit: None,
449            page: 1,
450            page_size: 1,
451        };
452        let st = self.dataset(name)?;
453        let batches = self.query_batches_all(name, &req).await?;
454        // Use the actual batch schema when we have rows so the writer schema
455        // matches exactly (projection/nullability); fall back to the
456        // dataset schema for an empty dataset.
457        let schema = batches
458            .first()
459            .map(|b| b.schema())
460            .unwrap_or_else(|| st.arrow_schema.clone());
461
462        let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
463        {
464            let props = parquet::file::properties::WriterProperties::builder()
465                .set_compression(parquet::basic::Compression::SNAPPY)
466                .build();
467            let mut writer =
468                parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
469                    .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
470            for batch in &batches {
471                if batch.num_rows() > 0 {
472                    writer
473                        .write(batch)
474                        .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
475                }
476            }
477            writer
478                .close()
479                .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
480        }
481        Ok(bytes::Bytes::from(buf))
482    }
483
484    /// Compute the result page as a single `RecordBatch`. Shared between
485    /// the JSON and Arrow IPC encoders.
486    async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
487        let batches = self.query_batches(name, req).await?;
488        if batches.is_empty() {
489            return Ok(RecordBatch::new_empty(Arc::new(
490                arrow::datatypes::Schema::empty(),
491            )));
492        }
493        if batches.len() == 1 {
494            return Ok(batches.into_iter().next().expect("checked len"));
495        }
496        if batches.iter().all(|b| b.num_rows() == 0) {
497            return Ok(RecordBatch::new_empty(batches[0].schema()));
498        }
499        let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
500        Ok(batch)
501    }
502
503    /// Compute the result page as Arrow batches. Arrow IPC responses can
504    /// write these directly, while JSON callers concatenate via
505    /// [`Self::query_batch`] for the existing row conversion path.
506    async fn query_batches(
507        &self,
508        name: &str,
509        req: &QueryRequest,
510    ) -> Result<Vec<RecordBatch>, AppError> {
511        let st = self.dataset(name)?;
512
513        let page = req.page.max(1);
514        let page_size = req.page_size.clamp(1, self.max_page_size);
515        let offset = ((page - 1) * page_size) as usize;
516        let limit = page_size as usize;
517
518        self.query_batches_inner(st, req, Some((offset, limit)))
519            .await
520    }
521
522    /// Compute all matching rows as Arrow batches for the one-request
523    /// streaming endpoint. `page` and `page_size` are intentionally ignored;
524    /// optional `limit` still caps the total result size.
525    async fn query_batches_all(
526        &self,
527        name: &str,
528        req: &QueryRequest,
529    ) -> Result<Vec<RecordBatch>, AppError> {
530        let st = self.dataset(name)?;
531        self.query_batches_inner(st, req, None).await
532    }
533
534    async fn query_batches_inner(
535        &self,
536        st: Arc<DatasetState>,
537        req: &QueryRequest,
538        page_window: Option<(usize, usize)>,
539    ) -> Result<Vec<RecordBatch>, AppError> {
540        let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
541
542        // In-memory hot paths only fire when:
543        //   - the dataset is materialised,
544        //   - the caller did not ask for ordering,
545        //   - and did not ask for a hard `limit` cap on a paged request.
546        // Both of the latter two require sorting / capping that the SQL
547        // engine handles uniformly across all data types.
548        let can_fast_path = !st.lazy
549            && req.order_by.is_empty()
550            && (page_window.is_none() || req.limit.is_none())
551            && req.group_by.is_empty()
552            && !req.distinct;
553
554        if can_fast_path {
555            let total = st.num_rows();
556
557            // No predicates -> O(1) raw Arrow slices over resident batches,
558            // no engine overhead.
559            if req.predicates.is_empty() {
560                if page_window.is_none() && req.limit.is_none() {
561                    return st
562                        .data
563                        .iter()
564                        .cloned()
565                        .map(|batch| project(&st.schema, batch, &req.columns))
566                        .collect();
567                }
568                let start = offset.min(total);
569                let len = limit.min(total - start);
570                let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
571                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
572            }
573
574            // Index fast path: if every predicate is eq/in on an indexed column,
575            // resolve via the pre-built equality index.
576            if let Some(rows) = try_index(&st.index, &req.predicates) {
577                let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
578                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
579            }
580        }
581
582        // Fallback (and only path for lazy datasets): DataFusion SQL.
583        let (sql, params) = match page_window {
584            Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
585            None => build_query_stream_sql(&st.schema, req)?,
586        };
587        let mut df = self.ctx.sql(&sql).await?;
588        if !params.is_empty() {
589            df = df.with_param_values(params)?;
590        }
591        let batches = df.collect().await?;
592        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
593            let schema = batches
594                .first()
595                .map(|b| b.schema())
596                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
597            return Ok(vec![RecordBatch::new_empty(schema)]);
598        }
599        Ok(batches)
600    }
601}
602
603fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
604    let schema = batches
605        .first()
606        .map(|batch| batch.schema())
607        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
608    let (mut writer, stream) = arrow_ipc_stream_channel(8);
609
610    tokio::task::spawn_blocking(move || {
611        let result = (|| -> Result<(), AppError> {
612            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
613            for batch in batches {
614                if batch.num_rows() > 0 {
615                    w.write(&batch)?;
616                }
617            }
618            w.finish()?;
619            Ok(())
620        })();
621        if let Err(err) = result {
622            log::error!("datafusion arrow stream failed: {err}");
623            writer.send_error(err);
624        }
625    });
626
627    stream
628}
629
630impl Store {
631    /// Return the number of rows matching `req.predicates`. With no
632    /// predicates this is a cheap metadata lookup on materialised datasets
633    /// and a `SELECT COUNT(*)` on lazy ones.
634    pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
635        let st = self.dataset(name)?;
636
637        if !st.lazy {
638            // No predicates → resident row count, no scan.
639            if req.predicates.is_empty() {
640                return Ok(st.num_rows() as i64);
641            }
642            // Index fast path: same eligibility rules as `query`.
643            if let Some(rows) = try_index(&st.index, &req.predicates) {
644                return Ok(rows.len() as i64);
645            }
646        }
647
648        // Fallback: DataFusion SQL — same predicate translation as `query`,
649        // with predicate values bound as typed parameters.
650        let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
651        let mut df = self.ctx.sql(&sql).await?;
652        if !params.is_empty() {
653            df = df.with_param_values(params)?;
654        }
655        let batches = df.collect().await?;
656        let n = batches
657            .first()
658            .and_then(|b| {
659                b.column(0)
660                    .as_any()
661                    .downcast_ref::<arrow::array::Int64Array>()
662            })
663            .filter(|a| !a.is_empty())
664            .map(|a| a.value(0))
665            .unwrap_or(0);
666        Ok(n)
667    }
668}
669
670// ---------------------------------------------------------------------------
671// Dataset loading
672// ---------------------------------------------------------------------------
673
674/// Build a tuned `SessionContext` for the whole `Store`.
675///
676/// Everything here is opt-in via the `[datafusion]` config block; with the
677/// defaults this is equivalent to a plain `SessionContext::new()`. Lazy
678/// datasets (`ListingTable` over parquet, possibly on S3) benefit most:
679///
680/// * `pushdown_filters` — evaluate predicates *during* the parquet decode
681///   so filtered-out rows in a surviving row group are never materialised.
682/// * `reorder_filters` — let the scan reorder pushed-down predicates by
683///   selectivity (only meaningful with `pushdown_filters`).
684/// * `list_files_cache` — a `ListFilesCache` on the `RuntimeEnv` so repeated
685///   lazy queries reuse object-store `LIST` results instead of re-listing the
686///   prefix every time — the dominant per-query cost on S3.
687///
688/// Row-group / page-index / bloom-filter pruning and `metadata_size_hint`
689/// are already on by default in this DataFusion version, so they are left
690/// untouched.
691///
692/// Note: DataFusion ≥ 53 installs a *default* `DefaultListFilesCache` on the
693/// `RuntimeEnv` even with the defaults below, so a reload must explicitly
694/// invalidate it (see [`Store::reload`]) — otherwise a re-listed S3 prefix
695/// would keep serving stale, now-deleted object keys.
696fn build_tuned_context(cfg: &DataFusionConfig) -> SessionContext {
697    let mut config = SessionConfig::new();
698    {
699        let opts = config.options_mut();
700        opts.execution.parquet.pushdown_filters = cfg.pushdown_filters;
701        opts.execution.parquet.reorder_filters = cfg.reorder_filters;
702    }
703
704    if !cfg.list_files_cache {
705        return SessionContext::new_with_config(config);
706    }
707
708    // Cache object-store listings so repeated lazy/S3 queries skip the
709    // LIST round-trips. A zero TTL means infinite (never expires).
710    let ttl = (cfg.list_files_cache_ttl_secs > 0)
711        .then(|| Duration::from_secs(cfg.list_files_cache_ttl_secs));
712    let list_cache = Arc::new(DefaultListFilesCache::new(
713        cfg.list_files_cache_mb.saturating_mul(1024 * 1024),
714        ttl,
715    ));
716    let cache_manager = CacheManagerConfig::default().with_list_files_cache(Some(list_cache));
717
718    let runtime = RuntimeEnvBuilder::new()
719        .with_cache_manager(cache_manager)
720        .build_arc()
721        .expect("failed to build DataFusion runtime env");
722
723    SessionContext::new_with_config_rt(config, runtime)
724}
725
726async fn build_dataset(
727    d: &DatasetConfig,
728    ctx: &SessionContext,
729) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
730    // Lazy datasets: register a streaming provider straight against the
731    // source and skip the materialise / index / partition pipeline below.
732    // Parquet uses a `ListingTable`; delta uses deltalake's own DataFusion
733    // `TableProvider` (which reads the transaction log once for the file
734    // list, then streams parquet row groups per query with predicate
735    // pushdown + file skipping).
736    if d.lazy {
737        match (d.source.kind, d.source.is_s3()) {
738            (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
739            (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
740            (SourceKind::Delta, _) => return build_lazy_delta(d, ctx).await,
741        }
742    }
743
744    // Fetch raw RecordBatches from whichever backing store the dataset
745    // is configured to use. All four (parquet, delta) x (local, s3)
746    // combinations converge into one Vec<RecordBatch>; the materialisation
747    // / indexing / partitioning logic below is shared.
748    let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
749        (SourceKind::Parquet, false) => read_local_parquet(d)?,
750        (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
751        (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
752        (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
753    };
754    if raw_batches.is_empty() {
755        return Err(AppError::EmptyDataset(format!(
756            "dataset '{}': source produced no batches",
757            d.name
758        )));
759    }
760    // A source can also resolve to one or more *zero-row* batches (e.g. an
761    // empty Delta table, or a parquet file with only a schema). Treat that as
762    // empty too so it's logged and skipped rather than registered as a 0-row
763    // dataset that shows up in discovery / explore.
764    if raw_batches.iter().all(|b| b.num_rows() == 0) {
765        return Err(AppError::EmptyDataset(format!(
766            "dataset '{}': source has a schema but no rows",
767            d.name
768        )));
769    }
770
771    let chunks = raw_batches;
772    let arrow_sch = chunks[0].schema();
773
774    // Build DatasetSchema from the Arrow schema.
775    let columns: Vec<ColumnInfo> = arrow_sch
776        .fields()
777        .iter()
778        .map(|f| {
779            let dt = f.data_type();
780            ColumnInfo {
781                name: f.name().clone(),
782                logical: arrow_to_logical(dt),
783                sql_type: format!("{dt:?}"),
784                nullable: f.is_nullable(),
785            }
786        })
787        .collect();
788    let schema = DatasetSchema::new(&d.name, columns);
789
790    // Build the equality index per the per-dataset policy. Operates on the
791    // chunked representation directly so we never have to materialise a
792    // single concatenated batch (which would double peak RSS on wide
793    // schemas — see `DatasetState` docs).
794    let index = build_eq_index_with_policy(&chunks, &d.index);
795
796    // Partition for parallel scans by the SQL fallback path. We distribute
797    // the existing batches round-robin across `n_parts` partitions instead
798    // of re-slicing a concatenated batch — `clone()` on a RecordBatch is
799    // an Arc-clone of the column buffers, not a copy.
800    let n_parts = std::thread::available_parallelism()
801        .map(|n| n.get())
802        .unwrap_or(4);
803    let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
804    for (i, b) in chunks.iter().enumerate() {
805        if b.num_rows() == 0 {
806            continue;
807        }
808        parts[i % n_parts].push(b.clone());
809    }
810    parts.retain(|p| !p.is_empty());
811    let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
812
813    let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
814    let mem_mb: usize = chunks
815        .iter()
816        .flat_map(|b| b.columns().iter())
817        .map(|c| c.get_buffer_memory_size())
818        .sum::<usize>()
819        / 1_048_576;
820    log::info!(
821        "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
822        d.name,
823        d.source.kind.as_str(),
824        total_rows,
825        schema.columns.len(),
826        mem_mb,
827        chunks.len(),
828        index.len()
829    );
830
831    Ok((
832        DatasetState {
833            schema,
834            data: chunks,
835            arrow_schema: arrow_sch,
836            index,
837            lazy: false,
838        },
839        provider,
840    ))
841}
842
843/// Build a lazy state + `ListingTable` provider for a local parquet dataset.
844/// The dataset is never read into RAM; DataFusion streams row groups on
845/// each query. The returned `DatasetState.data` is an empty `Vec` —
846/// `arrow_schema` still carries the inferred Arrow schema for discovery.
847async fn build_lazy_local_parquet(
848    d: &DatasetConfig,
849    ctx: &SessionContext,
850) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
851    let (url, part_keys) = lazy_local_listing(d)?;
852
853    let mut opts =
854        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
855    if !part_keys.is_empty() {
856        opts = opts.with_table_partition_cols(
857            part_keys
858                .iter()
859                .map(|k| (k.clone(), DataType::Utf8))
860                .collect(),
861        );
862    }
863
864    let session_state = ctx.state();
865    // `infer_schema` returns the *file* schema (without partition columns);
866    // `ListingTable` appends the declared partition columns on top.
867    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
868        AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
869    })?;
870
871    // An empty listing yields an empty schema (DataFusion does not error on a
872    // prefix/glob that matches no files). Treat that as an empty dataset so
873    // the load loop logs and skips it rather than registering 0 columns.
874    if file_schema.fields().is_empty() {
875        return Err(AppError::EmptyDataset(format!(
876            "dataset '{}': no .parquet files at {}",
877            d.name, d.source.location
878        )));
879    }
880
881    let cfg = ListingTableConfig::new(url)
882        .with_listing_options(opts)
883        .with_schema(file_schema.clone());
884    let table = ListingTable::try_new(cfg).map_err(|e| {
885        AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
886    })?;
887    let provider: Arc<dyn TableProvider> = Arc::new(table);
888
889    // Discovery schema = file columns + partition columns (Utf8).
890    let mut fields: Vec<Field> = file_schema
891        .fields()
892        .iter()
893        .map(|f| f.as_ref().clone())
894        .collect();
895    for k in &part_keys {
896        if !fields.iter().any(|f| f.name() == k) {
897            fields.push(Field::new(k, DataType::Utf8, false));
898        }
899    }
900    let arrow_sch = Arc::new(Schema::new(fields));
901
902    let columns: Vec<ColumnInfo> = arrow_sch
903        .fields()
904        .iter()
905        .map(|f| {
906            let dt = f.data_type();
907            ColumnInfo {
908                name: f.name().clone(),
909                logical: arrow_to_logical(dt),
910                sql_type: format!("{dt:?}"),
911                nullable: f.is_nullable(),
912            }
913        })
914        .collect();
915    let schema = DatasetSchema::new(&d.name, columns);
916
917    log::info!(
918        "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
919        d.name,
920        d.source.kind.as_str(),
921        schema.columns.len(),
922        part_keys.len()
923    );
924
925    Ok((
926        DatasetState {
927            schema,
928            data: Vec::new(),
929            arrow_schema: arrow_sch,
930            index: EqIndex::default(),
931            lazy: true,
932        },
933        provider,
934    ))
935}
936
937/// Resolve a local lazy-parquet location into a single `ListingTableUrl`
938/// rooted at the dataset base plus the ordered hive partition keys (if any).
939/// Handles three shapes: a glob (`root/city=*/*.parquet`), a directory
940/// (hive root or flat folder of parquets), and a single `*.parquet` file.
941fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
942    let loc = &d.source.location;
943
944    if loc.contains('*') || loc.contains('?') || loc.contains('[') {
945        let parts: Vec<&str> = loc.split('/').collect();
946        let first_wild = parts
947            .iter()
948            .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
949            .unwrap_or(parts.len());
950        let base = parts[..first_wild].join("/");
951        let base = if base.is_empty() {
952            "/".to_string()
953        } else {
954            base
955        };
956        // Partition keys: `key=…` components between the base and the file
957        // pattern (the final component).
958        let upper = parts.len().saturating_sub(1);
959        let keys: Vec<String> = parts[first_wild.min(upper)..upper]
960            .iter()
961            .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
962            .filter(|k| !k.is_empty())
963            .collect();
964        return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
965    }
966
967    let path = std::path::Path::new(loc);
968    if path.is_dir() {
969        let keys = discover_hive_keys(path);
970        return Ok((dir_url(path, d)?, keys));
971    }
972
973    let url = ListingTableUrl::parse(loc)
974        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
975    Ok((url, Vec::new()))
976}
977
978/// Parse a directory path into a `ListingTableUrl` (trailing slash so
979/// DataFusion treats it as a directory root, not a single object).
980fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
981    let s = path.to_str().ok_or_else(|| {
982        AppError::Internal(format!(
983            "dataset '{}': non-utf8 path {}",
984            d.name,
985            path.display()
986        ))
987    })?;
988    let s = if s.ends_with('/') {
989        s.to_string()
990    } else {
991        format!("{s}/")
992    };
993    ListingTableUrl::parse(&s)
994        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
995}
996
997/// Walk down a directory following the first `key=value` subdirectory at
998/// each level to discover the ordered hive partition keys. Returns an empty
999/// vec for a flat (non-partitioned) folder.
1000fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
1001    let mut keys = Vec::new();
1002    let mut cur = base.to_path_buf();
1003    loop {
1004        let Ok(rd) = std::fs::read_dir(&cur) else {
1005            break;
1006        };
1007        let mut next: Option<(String, std::path::PathBuf)> = None;
1008        for entry in rd.flatten() {
1009            let p = entry.path();
1010            if !p.is_dir() {
1011                continue;
1012            }
1013            let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
1014                continue;
1015            };
1016            if let Some((k, v)) = name.split_once('=')
1017                && !k.is_empty()
1018                && !v.is_empty()
1019            {
1020                next = Some((k.to_string(), p));
1021                break;
1022            }
1023        }
1024        match next {
1025            Some((k, p)) => {
1026                keys.push(k);
1027                cur = p;
1028            }
1029            None => break,
1030        }
1031    }
1032    keys
1033}
1034
1035/// Lazy S3 parquet: register the dataset's S3 object store on `ctx`, then
1036/// build a `ListingTable` rooted at the `s3://bucket/prefix/` location with
1037/// any discovered hive partition columns. DataFusion does the directory
1038/// listing through the registered store and streams row groups on each
1039/// query — no local enumeration needed.
1040async fn build_lazy_s3_parquet(
1041    d: &DatasetConfig,
1042    ctx: &SessionContext,
1043) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1044    register_s3_object_store(d, ctx)?;
1045
1046    let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
1047
1048    // An empty S3 prefix yields an empty inferred schema (no error). Treat it
1049    // as an empty dataset so the load loop logs and skips it.
1050    if file_schema.fields().is_empty() {
1051        return Err(AppError::EmptyDataset(format!(
1052            "dataset '{}': no .parquet files at {}",
1053            d.name, d.source.location
1054        )));
1055    }
1056
1057    // Discovery schema = file columns + partition columns (Utf8).
1058    let mut fields: Vec<Field> = file_schema
1059        .fields()
1060        .iter()
1061        .map(|f| f.as_ref().clone())
1062        .collect();
1063    for k in &part_keys {
1064        if !fields.iter().any(|f| f.name() == k) {
1065            fields.push(Field::new(k, DataType::Utf8, false));
1066        }
1067    }
1068    let arrow_sch = Arc::new(Schema::new(fields));
1069
1070    let columns: Vec<ColumnInfo> = arrow_sch
1071        .fields()
1072        .iter()
1073        .map(|f| {
1074            let dt = f.data_type();
1075            ColumnInfo {
1076                name: f.name().clone(),
1077                logical: arrow_to_logical(dt),
1078                sql_type: format!("{dt:?}"),
1079                nullable: f.is_nullable(),
1080            }
1081        })
1082        .collect();
1083    let schema = DatasetSchema::new(&d.name, columns);
1084
1085    log::info!(
1086        "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
1087        d.name,
1088        d.source.kind.as_str(),
1089        schema.columns.len(),
1090        part_keys.len()
1091    );
1092
1093    Ok((
1094        DatasetState {
1095            schema,
1096            data: Vec::new(),
1097            arrow_schema: arrow_sch,
1098            index: EqIndex::default(),
1099            lazy: true,
1100        },
1101        provider,
1102    ))
1103}
1104
1105/// Build a `ListingTable` provider for an S3 parquet source, resolving the
1106/// base prefix and hive partition keys via [`s3_listing`]. The registered
1107/// S3 object store must already be present on `ctx`. Returns the provider,
1108/// the inferred *file* schema (without partition columns), and the ordered
1109/// partition keys.
1110async fn build_s3_listing_table(
1111    d: &DatasetConfig,
1112    ctx: &SessionContext,
1113) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
1114    let (url, part_keys) = s3_listing(d, ctx).await?;
1115
1116    let mut opts =
1117        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1118    if !part_keys.is_empty() {
1119        opts = opts.with_table_partition_cols(
1120            part_keys
1121                .iter()
1122                .map(|k| (k.clone(), DataType::Utf8))
1123                .collect(),
1124        );
1125    }
1126
1127    let session_state = ctx.state();
1128    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1129        AppError::Internal(format!(
1130            "dataset '{}': infer parquet schema on s3: {e}",
1131            d.name
1132        ))
1133    })?;
1134
1135    let cfg = ListingTableConfig::new(url)
1136        .with_listing_options(opts)
1137        .with_schema(file_schema.clone());
1138    let table = ListingTable::try_new(cfg).map_err(|e| {
1139        AppError::Internal(format!(
1140            "dataset '{}': ListingTable::try_new (s3): {e}",
1141            d.name
1142        ))
1143    })?;
1144    Ok((Arc::new(table), file_schema, part_keys))
1145}
1146
1147/// Resolve an S3 parquet source into a `(base ListingTableUrl, partition
1148/// keys)` pair. Hive keys come from the location glob when present
1149/// (`s3://bucket/events/year=*/...`), otherwise — for a plain prefix — they
1150/// are discovered by listing the registered object store. Honours the
1151/// dataset's `partitioning` mode (`auto` / `hive` / `none`).
1152async fn s3_listing(
1153    d: &DatasetConfig,
1154    ctx: &SessionContext,
1155) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1156    let s3 = d.s3.clone().unwrap_or_default();
1157    let want_partitions = !matches!(s3.partitioning, Partitioning::None);
1158    let loc = &d.source.location;
1159
1160    if d.source.has_glob() {
1161        let (base, keys) = split_glob_base_keys(loc);
1162        let base = format!("{}/", base.trim_end_matches('/'));
1163        let url = ListingTableUrl::parse(&base).map_err(|e| {
1164            AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1165        })?;
1166        let keys = if want_partitions { keys } else { Vec::new() };
1167        return Ok((url, keys));
1168    }
1169
1170    let base = if loc.ends_with('/') {
1171        loc.clone()
1172    } else {
1173        format!("{loc}/")
1174    };
1175    let url = ListingTableUrl::parse(&base).map_err(|e| {
1176        AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1177    })?;
1178    let keys = if want_partitions {
1179        discover_s3_hive_keys(ctx, &url).await
1180    } else {
1181        Vec::new()
1182    };
1183    Ok((url, keys))
1184}
1185
1186/// Split a glob location into its non-wildcard base path and the ordered
1187/// hive partition keys (`key=…` directory components between the base and
1188/// the final file pattern). Works for both local and `s3://` paths.
1189fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1190    let parts: Vec<&str> = loc.split('/').collect();
1191    let first_wild = parts
1192        .iter()
1193        .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1194        .unwrap_or(parts.len());
1195    let base = parts[..first_wild].join("/");
1196    let base = if base.is_empty() {
1197        "/".to_string()
1198    } else {
1199        base
1200    };
1201    let upper = parts.len().saturating_sub(1);
1202    let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1203        .iter()
1204        .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1205        .filter(|k| !k.is_empty())
1206        .collect();
1207    (base, keys)
1208}
1209
1210/// Discover ordered hive partition keys for an S3 prefix by walking the
1211/// object store one `key=value/` level at a time via delimiter listings.
1212/// Best-effort: any listing error stops discovery and returns what was found
1213/// so far (empty = treat as a flat folder).
1214async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1215    let store = match ctx.runtime_env().object_store(url.object_store()) {
1216        Ok(s) => s,
1217        Err(_) => return Vec::new(),
1218    };
1219    let mut keys = Vec::new();
1220    let mut prefix = url.prefix().clone();
1221    loop {
1222        let listing = match store.list_with_delimiter(Some(&prefix)).await {
1223            Ok(l) => l,
1224            Err(_) => break,
1225        };
1226        let mut next: Option<object_store::path::Path> = None;
1227        for cp in &listing.common_prefixes {
1228            if let Some(seg) = cp.parts().next_back() {
1229                let seg = seg.as_ref().to_string();
1230                if let Some((k, v)) = seg.split_once('=')
1231                    && !k.is_empty()
1232                    && !v.is_empty()
1233                {
1234                    keys.push(k.to_string());
1235                    next = Some(cp.clone());
1236                    break;
1237                }
1238            }
1239        }
1240        match next {
1241            Some(p) => prefix = p,
1242            None => break,
1243        }
1244    }
1245    keys
1246}
1247
1248/// Original local-parquet code path — sync file I/O. We set a large reader
1249/// batch size so wide schemas (hundreds of columns) don't pay per-array
1250/// metadata overhead on thousands of small (default 1024-row) batches.
1251///
1252/// Two memory-saving knobs are applied here:
1253///
1254/// * **Column projection** — if `d.columns` is non-empty, only those
1255///   columns are decoded; everything else is skipped at the parquet reader
1256///   level (no Arrow array is ever allocated for the dropped columns).
1257/// * **Dictionary preservation** — Utf8 columns whose parquet column chunks
1258///   carry a dictionary page are materialised as Arrow
1259///   `Dictionary(Int32, Utf8)` instead of plain `Utf8`. Low-cardinality
1260///   string columns (state, country, severity, …) stay represented as
1261///   `n_unique` string slots plus an Int32 index per row instead of
1262///   `n_rows` independent strings — typically 10×–50× smaller for
1263///   real-world data.
1264fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1265    let files = d.resolve_local_parquet_files()?;
1266    let mut all = Vec::new();
1267    let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1268        None
1269    } else {
1270        Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1271    };
1272
1273    for f in &files {
1274        let file = std::fs::File::open(f)
1275            .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1276
1277        // First pass: peek the parquet metadata + default Arrow schema so we
1278        // can (a) decide a column projection and (b) override Utf8 columns
1279        // that are dictionary-encoded in the file so the reader materialises
1280        // them as Arrow Dictionary arrays instead of expanding to plain Utf8.
1281        let probe = ParquetRecordBatchReaderBuilder::try_new(
1282            file.try_clone()
1283                .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1284        )?;
1285        let parquet_schema = probe.parquet_schema().clone();
1286        let arrow_schema = probe.schema().clone();
1287        let metadata = probe.metadata().clone();
1288        drop(probe);
1289
1290        // Column projection (top-level / leaf indices for flat schemas).
1291        let projection = if let Some(w) = &wanted {
1292            let indices: Vec<usize> = arrow_schema
1293                .fields()
1294                .iter()
1295                .enumerate()
1296                .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1297                .map(|(i, _)| i)
1298                .collect();
1299            if indices.is_empty() {
1300                return Err(AppError::Internal(format!(
1301                    "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1302                    d.name,
1303                    d.columns,
1304                    f.display()
1305                )));
1306            }
1307            ProjectionMask::roots(&parquet_schema, indices)
1308        } else {
1309            ProjectionMask::all()
1310        };
1311
1312        // Dictionary override: any Utf8 column whose first row group carries
1313        // a dictionary page is re-typed to Dictionary(Int32, Utf8). The
1314        // override schema must still describe every column in the parquet
1315        // file (projection is applied separately). Skipped entirely when
1316        // the dataset has `dict_encode = false` — escape hatch for cases
1317        // where the override interacts badly with null propagation in the
1318        // downstream engine.
1319        let mut new_fields: Vec<Field> = arrow_schema
1320            .fields()
1321            .iter()
1322            .map(|f| f.as_ref().clone())
1323            .collect();
1324        if d.dict_encode
1325            && let Some(rg0) = metadata.row_groups().first()
1326        {
1327            for (i, fld) in arrow_schema.fields().iter().enumerate() {
1328                if !matches!(
1329                    fld.data_type(),
1330                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1331                ) {
1332                    continue;
1333                }
1334                if let Some(col) = rg0.columns().get(i)
1335                    && col.dictionary_page_offset().is_some()
1336                {
1337                    new_fields[i] = Field::new(
1338                        fld.name(),
1339                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1340                        fld.is_nullable(),
1341                    );
1342                }
1343            }
1344        }
1345        let forced_schema = Arc::new(Schema::new(new_fields));
1346
1347        let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1348        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1349            .with_batch_size(65_536)
1350            .with_projection(projection)
1351            .build()?;
1352        // Hive-style partition columns (`city=NYC/…`) live in the path, not
1353        // the file. Fold them in as constant Utf8 columns so they show up in
1354        // the schema and are queryable — matching the DuckDB backend.
1355        let pairs = hive_pairs(f);
1356        for batch in reader {
1357            let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1358            all.push(if pairs.is_empty() {
1359                batch
1360            } else {
1361                append_partition_cols(&batch, &pairs)?
1362            });
1363        }
1364    }
1365    if all.is_empty() {
1366        return Err(AppError::Internal(format!(
1367            "dataset '{}': parquet source is empty",
1368            d.name
1369        )));
1370    }
1371    Ok(all)
1372}
1373
1374/// Ordered hive-style partition `(key, value)` pairs encoded in a path, i.e.
1375/// directory components shaped like `key=value` (e.g. `year=2024/city=NYC`).
1376fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1377    path.components()
1378        .filter_map(|c| c.as_os_str().to_str())
1379        .filter_map(|seg| {
1380            let (k, v) = seg.split_once('=')?;
1381            if k.is_empty() || v.is_empty() || v.contains('=') {
1382                return None;
1383            }
1384            Some((k.to_string(), v.to_string()))
1385        })
1386        .collect()
1387}
1388
1389/// Append constant Utf8 columns for each hive partition pair. A partition
1390/// key that collides with a real file column is skipped (the file wins).
1391fn append_partition_cols(
1392    batch: &RecordBatch,
1393    pairs: &[(String, String)],
1394) -> Result<RecordBatch, AppError> {
1395    let n = batch.num_rows();
1396    let mut fields: Vec<Field> = batch
1397        .schema()
1398        .fields()
1399        .iter()
1400        .map(|f| f.as_ref().clone())
1401        .collect();
1402    let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1403    for (k, v) in pairs {
1404        if fields.iter().any(|f| f.name() == k) {
1405            continue;
1406        }
1407        fields.push(Field::new(k, DataType::Utf8, false));
1408        cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1409    }
1410    RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1411        .map_err(|e| AppError::Internal(e.to_string()))
1412}
1413
1414/// Register an `AmazonS3` object store on the SessionContext, build a
1415/// `ListingTable` (with any hive partition columns) over the dataset prefix,
1416/// and stream the whole dataset back through `DataFrame::collect`. Using a
1417/// ListingTable here — rather than a bare `read_parquet` — means S3 sources
1418/// get the same hive-partition handling as local parquet.
1419async fn read_s3_parquet(
1420    d: &DatasetConfig,
1421    ctx: &SessionContext,
1422) -> Result<Vec<RecordBatch>, AppError> {
1423    register_s3_object_store(d, ctx)?;
1424    let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1425    let df = ctx
1426        .read_table(provider)
1427        .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1428    Ok(df.collect().await?)
1429}
1430
1431/// Open a Delta table (local or S3) and return the deltalake `DeltaTable`
1432/// handle. The transaction log is read here to resolve the current file
1433/// list + schema; the table's object store is registered on the session
1434/// `RuntimeEnv` lazily at scan time, so callers don't need to register a
1435/// store themselves. A location that doesn't exist, is empty, or was never
1436/// committed maps to [`AppError::EmptyDataset`] so startup can log-and-skip.
1437async fn open_delta_table(
1438    d: &DatasetConfig,
1439    opts: HashMap<String, String>,
1440) -> Result<deltalake::DeltaTable, AppError> {
1441    let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1442        AppError::Internal(format!(
1443            "dataset '{}': bad delta location '{}': {e}",
1444            d.name, d.source.location
1445        ))
1446    })?;
1447    deltalake::open_table_with_storage_options(url, opts)
1448        .await
1449        .map_err(|e| {
1450            // A Delta location that doesn't exist, is empty, or was never
1451            // committed has no files in its log segment. deltalake (kernel
1452            // 0.32) surfaces every one of these as:
1453            //   "Not a Delta table: Generic delta kernel error: No files in
1454            //    log segment"
1455            // — covering a missing local dir AND a non-existent S3 prefix
1456            // alike. Match case-insensitively (the kernel capitalises the
1457            // phrases) and treat it like any other empty dataset so startup
1458            // logs and skips instead of aborting the whole process.
1459            let msg = e.to_string();
1460            let low = msg.to_lowercase();
1461            if low.contains("no files in log segment") || low.contains("not a delta table") {
1462                AppError::EmptyDataset(format!(
1463                    "delta location '{}' has no committed files ({msg})",
1464                    d.source.location
1465                ))
1466            } else {
1467                AppError::Internal(format!(
1468                    "dataset '{}': delta open '{}': {msg}",
1469                    d.name, d.source.location
1470                ))
1471            }
1472        })
1473}
1474
1475/// Open a Delta table (local or S3) and return its DataFusion
1476/// `TableProvider`. The provider reads the transaction log to resolve the
1477/// current file list and registers the table's object store on the session
1478/// `RuntimeEnv` lazily at scan time, so callers don't need to register a
1479/// store themselves.
1480async fn open_delta_provider(
1481    d: &DatasetConfig,
1482    opts: HashMap<String, String>,
1483) -> Result<Arc<dyn TableProvider>, AppError> {
1484    let table = open_delta_table(d, opts).await?;
1485    table.table_provider().await.map_err(|e| {
1486        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1487    })
1488}
1489
1490/// Resolve the deltalake storage-options for a dataset: empty for local
1491/// tables, S3 credentials/endpoint for S3-backed ones.
1492fn delta_storage_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1493    if d.source.is_s3() {
1494        delta_s3_options(d)
1495    } else {
1496        Ok(HashMap::new())
1497    }
1498}
1499
1500/// Open a Delta table (local or S3) and stream every row back as a Vec of
1501/// `RecordBatch`. We materialise eagerly so the rest of the backend can
1502/// treat all datasets uniformly (single in-memory batch + eq-index).
1503async fn read_delta(
1504    d: &DatasetConfig,
1505    opts: HashMap<String, String>,
1506) -> Result<Vec<RecordBatch>, AppError> {
1507    let provider = open_delta_provider(d, opts).await?;
1508    // Drive a full scan via a throwaway SessionContext so we end up with
1509    // an in-memory Vec<RecordBatch> the shared materialise path can use.
1510    //
1511    // A Delta table can open cleanly (valid log + schema) yet still fail to
1512    // scan: its add actions may reference data files that no longer exist in
1513    // storage (e.g. vacuumed away), or are otherwise unreadable. Rather than
1514    // abort the whole registry for one broken table, map scan failures to
1515    // `EmptyDataset` so `Store::load` logs and skips it — mirroring the
1516    // bounded-probe behaviour on the lazy path.
1517    let scan_ctx = SessionContext::new();
1518    let df = scan_ctx.read_table(provider).map_err(|e| {
1519        AppError::EmptyDataset(format!(
1520            "delta location '{}' could not be scanned, skipping ({e})",
1521            d.source.location
1522        ))
1523    })?;
1524    df.collect().await.map_err(|e| {
1525        AppError::EmptyDataset(format!(
1526            "delta location '{}' could not be scanned, skipping ({e})",
1527            d.source.location
1528        ))
1529    })
1530}
1531
1532/// Build a lazy state + deltalake `TableProvider` for a Delta dataset
1533/// (local or S3). The table is never read into RAM: deltalake reads the
1534/// transaction log once here to resolve the file list and discovery schema,
1535/// then streams parquet row groups on each query (with predicate pushdown
1536/// and Delta file skipping). The returned `DatasetState.data` is empty.
1537async fn build_lazy_delta(
1538    d: &DatasetConfig,
1539    _ctx: &SessionContext,
1540) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1541    let table = open_delta_table(d, delta_storage_options(d)?).await?;
1542
1543    // An empty Delta table carries a valid schema in its log but resolves to
1544    // zero data files. The eager path catches this naturally (a full scan
1545    // yields no batches), but the lazy path never scans — so check the file
1546    // list explicitly and skip, matching the eager behaviour. Without this an
1547    // empty Delta table would register as a 0-row dataset and show up in
1548    // discovery / explore.
1549    let file_count = table
1550        .get_file_uris()
1551        .map(|it| it.count())
1552        .map_err(|e| AppError::Internal(format!("dataset '{}': delta file list: {e}", d.name)))?;
1553    if file_count == 0 {
1554        return Err(AppError::EmptyDataset(format!(
1555            "delta location '{}' has a schema but no data files",
1556            d.source.location
1557        )));
1558    }
1559
1560    let provider = table.table_provider().await.map_err(|e| {
1561        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1562    })?;
1563
1564    // The file-list check above only catches a Delta table with *no* data
1565    // files. A table can still be effectively empty in ways the log doesn't
1566    // make obvious: its add actions may reference files that contain zero
1567    // rows, or files that no longer exist in storage (e.g. vacuumed). Those
1568    // register fine here but then return nothing — or hard-error — the moment
1569    // they're queried, leaving a broken dataset visible in discovery/explore.
1570    //
1571    // The eager path catches all of this for free (a full scan yields no rows
1572    // or surfaces the read error up front), but the lazy path never scans. So
1573    // probe with a bounded one-row scan: it reads at most a single row group
1574    // from a single file, costing almost nothing on a healthy table, but lets
1575    // us log-and-skip an empty or unreadable table instead of registering it.
1576    {
1577        let probe_ctx = SessionContext::new();
1578        let probe = probe_ctx
1579            .read_table(provider.clone())
1580            .and_then(|df| df.limit(0, Some(1)));
1581        match probe {
1582            Ok(df) => match df.collect().await {
1583                Ok(batches) if batches.iter().all(|b| b.num_rows() == 0) => {
1584                    return Err(AppError::EmptyDataset(format!(
1585                        "delta location '{}' resolves to no rows",
1586                        d.source.location
1587                    )));
1588                }
1589                Ok(_) => {}
1590                Err(e) => {
1591                    return Err(AppError::EmptyDataset(format!(
1592                        "delta location '{}' could not be scanned, skipping ({e})",
1593                        d.source.location
1594                    )));
1595                }
1596            },
1597            Err(e) => {
1598                return Err(AppError::EmptyDataset(format!(
1599                    "delta location '{}' could not be scanned, skipping ({e})",
1600                    d.source.location
1601                )));
1602            }
1603        }
1604    }
1605
1606    // The provider's schema is the full logical schema (data + partition
1607    // columns), which is exactly the discovery schema we want.
1608    let arrow_sch = provider.schema();
1609    let columns: Vec<ColumnInfo> = arrow_sch
1610        .fields()
1611        .iter()
1612        .map(|f| {
1613            let dt = f.data_type();
1614            ColumnInfo {
1615                name: f.name().clone(),
1616                logical: arrow_to_logical(dt),
1617                sql_type: format!("{dt:?}"),
1618                nullable: f.is_nullable(),
1619            }
1620        })
1621        .collect();
1622    let schema = DatasetSchema::new(&d.name, columns);
1623
1624    log::info!(
1625        "dataset '{}' [{}, lazy]: {} cols, no materialise, no index",
1626        d.name,
1627        d.source.kind.as_str(),
1628        schema.columns.len()
1629    );
1630
1631    Ok((
1632        DatasetState {
1633            schema,
1634            data: Vec::new(),
1635            arrow_schema: arrow_sch,
1636            index: EqIndex::default(),
1637            lazy: true,
1638        },
1639        provider,
1640    ))
1641}
1642
1643/// Build the storage-options HashMap that `deltalake::open_table_with_storage_options`
1644/// expects for S3 access. Keys mirror the AWS env-var names; deltalake
1645/// passes them through to object_store internally.
1646fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1647    let creds = d.resolved_creds();
1648    let region = d.resolved_region();
1649    let s3 = d.s3.clone().unwrap_or_default();
1650    let (bucket, _) = d.source.s3_bucket()?;
1651
1652    let mut opts = HashMap::new();
1653    opts.insert("AWS_REGION".into(), region);
1654    if let Some(ep) = s3.effective_endpoint(bucket) {
1655        opts.insert("AWS_ENDPOINT_URL".into(), ep);
1656    }
1657    if s3.allow_http {
1658        opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1659    }
1660    opts.insert(
1661        "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1662        (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1663    );
1664    if let Some(k) = creds.access_key_id {
1665        opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1666    }
1667    if let Some(s) = creds.secret_access_key {
1668        opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1669    }
1670    if let Some(t) = creds.session_token {
1671        opts.insert("AWS_SESSION_TOKEN".into(), t);
1672    }
1673    // Read-only paths don't need the S3 lock-provider plumbing.
1674    opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1675    Ok(opts)
1676}
1677
1678/// Construct an `AmazonS3` object_store from the dataset's `[dataset.s3]`
1679/// block + resolved credentials and register it on `ctx` under
1680/// `s3://bucket/`.
1681fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1682    let (bucket, _key) = d.source.s3_bucket()?;
1683    let creds = d.resolved_creds();
1684    let region = d.resolved_region();
1685    let s3 = d.s3.clone().unwrap_or_default();
1686
1687    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1688        AppError::Internal(format!(
1689            "dataset '{}': build S3 store for '{bucket}': {e}",
1690            d.name
1691        ))
1692    })?;
1693
1694    let url = Url::parse(&format!("s3://{bucket}"))
1695        .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1696    ctx.register_object_store(&url, Arc::new(store));
1697    Ok(())
1698}
1699
1700/// Best-effort detection of an S3 authorization failure (HTTP 403) from an
1701/// error message. object_store, the AWS SDK, deltalake, and MinIO all
1702/// surface a denied read as some mix of "Access Denied" / "AccessDenied"
1703/// (the S3 error code), "Forbidden", or a bare "403" in the rendered error
1704/// chain. Matched case-insensitively. Only consulted for sources we already
1705/// know are S3, so a stray "forbidden" in unrelated text can't misfire.
1706fn is_s3_access_denied(msg: &str) -> bool {
1707    let low = msg.to_lowercase();
1708    low.contains("access denied")
1709        || low.contains("accessdenied")
1710        || low.contains("forbidden")
1711        || low.contains("403")
1712}
1713
1714
1715///
1716/// Local sources are sized with a cheap filesystem stat (`estimate_local_bytes`);
1717/// S3 sources are sized by listing the object store under their prefix. S3
1718/// sizing is best-effort: a listing failure logs a warning and returns `None`
1719/// (don't force) so a transient S3 error never blocks startup.
1720async fn should_force_lazy(d: &DatasetConfig, server: &ServerConfig) -> Option<u64> {
1721    if d.lazy || server.force_lazy_above_mb == 0 {
1722        return None;
1723    }
1724    let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1725
1726    let bytes = if d.source.is_s3() {
1727        match estimate_s3_bytes(d).await {
1728            Ok(b) => b,
1729            Err(e) => {
1730                log::warn!(
1731                    "dataset '{}': could not measure S3 size for force_lazy_above_mb: {e}",
1732                    d.name
1733                );
1734                return None;
1735            }
1736        }
1737    } else {
1738        d.estimate_local_bytes()?
1739    };
1740
1741    (bytes > threshold).then_some(bytes)
1742}
1743
1744/// Sum the byte size of a dataset's S3 backing data by listing the object
1745/// store under the source prefix and adding up every `.parquet` object.
1746///
1747/// Works for both parquet and delta sources: delta data files are parquet
1748/// objects under the table root, so listing the root and counting parquet
1749/// gives a coarse (possibly over-counting un-vacuumed files) but cheap size
1750/// suitable for the force-lazy gate. Any glob wildcard tail is stripped so the
1751/// listing starts at the non-wildcard base prefix.
1752async fn estimate_s3_bytes(d: &DatasetConfig) -> Result<u64, AppError> {
1753    use futures_util::StreamExt;
1754    use object_store::ObjectStore;
1755
1756    let (bucket, _key) = d.source.s3_bucket()?;
1757    let creds = d.resolved_creds();
1758    let region = d.resolved_region();
1759    let s3 = d.s3.clone().unwrap_or_default();
1760    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1761        AppError::Internal(format!(
1762            "dataset '{}': build S3 store for '{bucket}': {e}",
1763            d.name
1764        ))
1765    })?;
1766
1767    // Strip any glob wildcard tail, then reduce the base to a bucket-relative
1768    // key for object_store's `list` (which takes a key prefix, not a URL).
1769    let (base, _keys) = split_glob_base_keys(&d.source.location);
1770    let prefix_key = base
1771        .strip_prefix("s3://")
1772        .and_then(|rest| rest.split_once('/').map(|(_bucket, key)| key))
1773        .unwrap_or("")
1774        .trim_end_matches('/');
1775    let prefix =
1776        (!prefix_key.is_empty()).then(|| object_store::path::Path::from(prefix_key));
1777
1778    let mut total: u64 = 0;
1779    let mut stream = store.list(prefix.as_ref());
1780    while let Some(meta) = stream.next().await {
1781        let meta = meta.map_err(|e| {
1782            AppError::Internal(format!(
1783                "dataset '{}': s3 list under '{prefix_key}': {e}",
1784                d.name
1785            ))
1786        })?;
1787        if meta.location.as_ref().ends_with(".parquet") {
1788            total = total.saturating_add(meta.size);
1789        }
1790    }
1791    Ok(total)
1792}
1793
1794fn build_s3(
1795    bucket: &str,
1796    region: &str,
1797    s3: &S3Config,
1798    creds: &ResolvedCreds,
1799) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1800    let mut b = AmazonS3Builder::new()
1801        .with_bucket_name(bucket)
1802        .with_region(region)
1803        .with_allow_http(s3.allow_http)
1804        .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1805    if let Some(ep) = s3.effective_endpoint(bucket) {
1806        b = b.with_endpoint(ep);
1807    }
1808    if let Some(k) = creds.access_key_id.as_deref() {
1809        b = b.with_access_key_id(k);
1810    }
1811    if let Some(s) = creds.secret_access_key.as_deref() {
1812        b = b.with_secret_access_key(s);
1813    }
1814    if let Some(t) = creds.session_token.as_deref() {
1815        b = b.with_token(t);
1816    }
1817    b.build()
1818}
1819
1820fn arrow_to_logical(dt: &DataType) -> LogicalType {
1821    match dt {
1822        DataType::Boolean => LogicalType::Bool,
1823        DataType::Int8
1824        | DataType::Int16
1825        | DataType::Int32
1826        | DataType::Int64
1827        | DataType::UInt8
1828        | DataType::UInt16
1829        | DataType::UInt32
1830        | DataType::UInt64 => LogicalType::Int,
1831        DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1832        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1833        // Dictionary-encoded strings are reported as plain strings — clients
1834        // (and the rest of the backend) shouldn't have to care that we keep
1835        // a compressed representation in memory.
1836        DataType::Dictionary(_, v)
1837            if matches!(
1838                v.as_ref(),
1839                DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1840            ) =>
1841        {
1842            LogicalType::Utf8
1843        }
1844        DataType::Date32
1845        | DataType::Date64
1846        | DataType::Time32(_)
1847        | DataType::Time64(_)
1848        | DataType::Timestamp(_, _)
1849        | DataType::Duration(_)
1850        | DataType::Interval(_) => LogicalType::Temporal,
1851        _ => LogicalType::Other,
1852    }
1853}
1854
1855// ---------------------------------------------------------------------------
1856// Per-batch projection
1857// ---------------------------------------------------------------------------
1858
1859fn project(
1860    schema: &DatasetSchema,
1861    batch: RecordBatch,
1862    columns: &[String],
1863) -> Result<RecordBatch, AppError> {
1864    if columns.is_empty() {
1865        return Ok(batch);
1866    }
1867    let indices: Vec<usize> = columns
1868        .iter()
1869        .map(|c| {
1870            schema
1871                .find(c)
1872                .map(|info| schema.by_name[&info.name.to_lowercase()])
1873        })
1874        .collect::<Result<_, _>>()?;
1875    let fields: Vec<Field> = indices
1876        .iter()
1877        .map(|&i| batch.schema().field(i).clone())
1878        .collect();
1879    let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1880    Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1881}
1882
1883// ---------------------------------------------------------------------------
1884// SQL builder
1885// ---------------------------------------------------------------------------
1886
1887/// Accumulates the typed literal values for a parameterised query.
1888///
1889/// Predicate values are never interpolated into the SQL text. Instead each
1890/// value is pushed here and the builder emits a positional placeholder
1891/// (`$1`, `$2`, …) referencing it. The collected [`ScalarValue`]s are bound
1892/// to the logical plan via [`DataFrame::with_param_values`], so user input
1893/// reaches the engine as typed scalars and can never alter the query
1894/// structure (no SQL injection surface, no escaping to get wrong).
1895#[derive(Default)]
1896struct Params {
1897    values: Vec<ScalarValue>,
1898}
1899
1900impl Params {
1901    fn new() -> Self {
1902        Self::default()
1903    }
1904
1905    /// Bind `v` and return its `$N` placeholder token.
1906    fn bind(&mut self, v: ScalarValue) -> String {
1907        self.values.push(v);
1908        format!("${}", self.values.len())
1909    }
1910
1911    fn into_values(self) -> Vec<ScalarValue> {
1912        self.values
1913    }
1914}
1915
1916fn build_query_sql(
1917    schema: &DatasetSchema,
1918    req: &QueryRequest,
1919    max_page_size: u64,
1920) -> Result<(String, Vec<ScalarValue>), AppError> {
1921    let (limit, offset) = req.effective_limit_offset(max_page_size);
1922    build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1923}
1924
1925fn build_query_stream_sql(
1926    schema: &DatasetSchema,
1927    req: &QueryRequest,
1928) -> Result<(String, Vec<ScalarValue>), AppError> {
1929    let suffix = req
1930        .limit
1931        .map(|limit| format!(" LIMIT {limit}"))
1932        .unwrap_or_default();
1933    build_query_sql_with_suffix(schema, req, &suffix)
1934}
1935
1936fn build_query_sql_with_suffix(
1937    schema: &DatasetSchema,
1938    req: &QueryRequest,
1939    suffix: &str,
1940) -> Result<(String, Vec<ScalarValue>), AppError> {
1941    let agg_plan = req.agg_plan(schema)?;
1942
1943    let cols = if let Some(plan) = &agg_plan {
1944        // Group cols, then aggregations, each aliased to the JSON output key.
1945        let mut parts: Vec<String> = plan
1946            .group_cols
1947            .iter()
1948            .map(|c| DatasetSchema::quote_ident(c))
1949            .collect();
1950        for a in &plan.aggs {
1951            let expr = a.sql_expr()?;
1952            parts.push(format!(
1953                "{expr} AS {}",
1954                DatasetSchema::quote_ident(&a.alias)
1955            ));
1956        }
1957        parts.join(", ")
1958    } else if req.columns.is_empty() {
1959        if req.distinct {
1960            "DISTINCT *".to_string()
1961        } else {
1962            "*".to_string()
1963        }
1964    } else {
1965        let list = req
1966            .columns
1967            .iter()
1968            .map(|c| {
1969                schema
1970                    .find(c)
1971                    .map(|info| DatasetSchema::quote_ident(&info.name))
1972            })
1973            .collect::<Result<Vec<_>, _>>()?
1974            .join(", ");
1975        if req.distinct {
1976            format!("DISTINCT {list}")
1977        } else {
1978            list
1979        }
1980    };
1981
1982    let mut params = Params::new();
1983    let clauses: Vec<String> = req
1984        .predicates
1985        .iter()
1986        .map(|p| pred_to_sql(schema, p, &mut params))
1987        .collect::<Result<_, _>>()?;
1988
1989    let table = DatasetSchema::quote_ident(&schema.name);
1990    let where_clause = if clauses.is_empty() {
1991        String::new()
1992    } else {
1993        format!(" WHERE {}", clauses.join(" AND "))
1994    };
1995    let group_clause = match &agg_plan {
1996        Some(p) => format!(
1997            " GROUP BY {}",
1998            p.group_cols
1999                .iter()
2000                .map(|c| DatasetSchema::quote_ident(c))
2001                .collect::<Vec<_>>()
2002                .join(", "),
2003        ),
2004        None => String::new(),
2005    };
2006    let having_clause = {
2007        let resolved = req.having_plan(agg_plan.as_ref())?;
2008        if resolved.is_empty() {
2009            String::new()
2010        } else {
2011            let clauses: Vec<String> = resolved
2012                .iter()
2013                .map(|(lhs, p)| pred_to_sql_with_lhs(lhs, p, &mut params))
2014                .collect::<Result<_, _>>()?;
2015            format!(" HAVING {}", clauses.join(" AND "))
2016        }
2017    };
2018    let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
2019        Some(s) => format!(" ORDER BY {s}"),
2020        None => String::new(),
2021    };
2022    let sql =
2023        format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{having_clause}{order_clause}{suffix}");
2024    Ok((sql, params.into_values()))
2025}
2026
2027fn build_count_sql(
2028    schema: &DatasetSchema,
2029    predicates: &[Predicate],
2030) -> Result<(String, Vec<ScalarValue>), AppError> {
2031    let mut params = Params::new();
2032    let clauses: Vec<String> = predicates
2033        .iter()
2034        .map(|p| pred_to_sql(schema, p, &mut params))
2035        .collect::<Result<_, _>>()?;
2036    let table = DatasetSchema::quote_ident(&schema.name);
2037    let where_clause = if clauses.is_empty() {
2038        String::new()
2039    } else {
2040        format!(" WHERE {}", clauses.join(" AND "))
2041    };
2042    let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
2043    Ok((sql, params.into_values()))
2044}
2045
2046fn pred_to_sql(
2047    schema: &DatasetSchema,
2048    pred: &Predicate,
2049    params: &mut Params,
2050) -> Result<String, AppError> {
2051    let info = schema.find(&pred.col)?;
2052    let col = DatasetSchema::quote_ident(&info.name);
2053    pred_to_sql_with_lhs(&col, pred, params)
2054}
2055
2056/// Render a predicate against a pre-resolved left-hand-side SQL
2057/// expression. The dataset-`WHERE` path resolves a quoted column name as
2058/// the LHS (see [`pred_to_sql`]); the `HAVING` path passes an aggregate
2059/// expression such as `COUNT(*)` instead. Both share the operator and
2060/// value-binding logic here.
2061fn pred_to_sql_with_lhs(
2062    col: &str,
2063    pred: &Predicate,
2064    params: &mut Params,
2065) -> Result<String, AppError> {
2066    match pred.op.as_str() {
2067        "is_null" => return Ok(format!("{col} IS NULL")),
2068        "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
2069        _ => {}
2070    }
2071
2072    let val = pred
2073        .val
2074        .as_ref()
2075        .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
2076
2077    if pred.op == "in" {
2078        let items = val
2079            .as_array()
2080            .filter(|a| !a.is_empty())
2081            .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
2082        let placeholders: Vec<String> = items
2083            .iter()
2084            .map(|item| Ok(params.bind(json_to_scalar(item)?)))
2085            .collect::<Result<_, AppError>>()?;
2086        return Ok(format!("{col} IN ({})", placeholders.join(", ")));
2087    }
2088
2089    let sql_op = match pred.op.as_str() {
2090        "eq" => "=",
2091        "neq" => "!=",
2092        "gt" => ">",
2093        "gte" => ">=",
2094        "lt" => "<",
2095        "lte" => "<=",
2096        "like" => "LIKE",
2097        "ilike" => "ILIKE",
2098        other => return Err(AppError::UnknownOperator(other.into())),
2099    };
2100    let placeholder = params.bind(json_to_scalar(val)?);
2101    Ok(format!("{col} {sql_op} {placeholder}"))
2102}
2103
2104/// Convert a JSON predicate value into a typed Arrow [`ScalarValue`] for
2105/// binding as a query parameter. The engine applies the usual numeric
2106/// widening / comparison coercion against the target column type.
2107fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
2108    match val {
2109        JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
2110        JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
2111        JsonValue::Null => Ok(ScalarValue::Null),
2112        JsonValue::Number(n) => {
2113            if let Some(i) = n.as_i64() {
2114                Ok(ScalarValue::Int64(Some(i)))
2115            } else if let Some(u) = n.as_u64() {
2116                Ok(ScalarValue::UInt64(Some(u)))
2117            } else if let Some(f) = n.as_f64() {
2118                Ok(ScalarValue::Float64(Some(f)))
2119            } else {
2120                Err(AppError::InvalidValue(
2121                    "unsupported numeric literal in predicate".into(),
2122                ))
2123            }
2124        }
2125        _ => Err(AppError::InvalidValue(
2126            "unsupported literal type in predicate".into(),
2127        )),
2128    }
2129}
2130
2131// ---------------------------------------------------------------------------
2132// Equality index — built once at startup, queried on every predicate request
2133// ---------------------------------------------------------------------------
2134
2135fn json_index_key(val: &JsonValue) -> Option<String> {
2136    match val {
2137        JsonValue::String(s) => Some(s.clone()),
2138        JsonValue::Number(n) => Some(n.to_string()),
2139        JsonValue::Bool(b) => Some(b.to_string()),
2140        _ => None,
2141    }
2142}
2143
2144fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2145    let mut out = Vec::new();
2146    let (mut i, mut j) = (0, 0);
2147    while i < a.len() && j < b.len() {
2148        match a[i].cmp(&b[j]) {
2149            Ordering::Equal => {
2150                out.push(a[i]);
2151                i += 1;
2152                j += 1;
2153            }
2154            Ordering::Less => i += 1,
2155            Ordering::Greater => j += 1,
2156        }
2157    }
2158    out
2159}
2160
2161fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2162    let mut out = Vec::with_capacity(a.len() + b.len());
2163    let (mut i, mut j) = (0, 0);
2164    while i < a.len() && j < b.len() {
2165        match a[i].cmp(&b[j]) {
2166            Ordering::Less => {
2167                out.push(a[i]);
2168                i += 1;
2169            }
2170            Ordering::Greater => {
2171                out.push(b[j]);
2172                j += 1;
2173            }
2174            Ordering::Equal => {
2175                out.push(a[i]);
2176                i += 1;
2177                j += 1;
2178            }
2179        }
2180    }
2181    out.extend_from_slice(&a[i..]);
2182    out.extend_from_slice(&b[j..]);
2183    out
2184}
2185
2186fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2187    if predicates.is_empty() || index.is_empty() {
2188        return None;
2189    }
2190
2191    // Fast path: a single `eq` predicate resolves to exactly one index bucket,
2192    // so borrow it directly instead of cloning the row-id vector.
2193    if let [pred] = predicates
2194        && pred.op.as_str() == "eq"
2195    {
2196        let col_lower = pred.col.to_lowercase();
2197        let col_map = index.get(&col_lower)?;
2198        let key = json_index_key(pred.val.as_ref()?)?;
2199        return Some(match col_map.get(&key) {
2200            Some(rows) => Cow::Borrowed(rows.as_slice()),
2201            None => Cow::Owned(Vec::new()),
2202        });
2203    }
2204
2205    let mut result: Option<Vec<u32>> = None;
2206    for pred in predicates {
2207        let col_lower = pred.col.to_lowercase();
2208        let col_map = index.get(&col_lower)?;
2209
2210        let rows: Vec<u32> = match pred.op.as_str() {
2211            "eq" => {
2212                let key = json_index_key(pred.val.as_ref()?)?;
2213                col_map.get(&key).cloned().unwrap_or_default()
2214            }
2215            "in" => {
2216                let items = pred.val.as_ref()?.as_array()?;
2217                let mut merged: Vec<u32> = Vec::new();
2218                for item in items {
2219                    if let Some(r) = col_map.get(&json_index_key(item)?) {
2220                        merged = union_sorted(&merged, r);
2221                    }
2222                }
2223                merged
2224            }
2225            _ => return None,
2226        };
2227
2228        result = Some(match result {
2229            None => rows,
2230            Some(r) => intersect_sorted(&r, &rows),
2231        });
2232    }
2233    result.map(Cow::Owned)
2234}
2235
2236/// Benchmark-only hooks for the equality-index hot path. Hidden from docs and
2237/// not part of the stable API — exposed solely so `benches/index.rs` can build
2238/// an `EqIndex` and exercise `try_index` directly.
2239#[doc(hidden)]
2240pub mod bench {
2241    use super::{EqIndex, FastMap, json_index_key, try_index};
2242    use datapress_core::models::Predicate;
2243    use serde_json::Value as JsonValue;
2244    use std::borrow::Cow;
2245
2246    /// Opaque equality index wrapper for benches (the inner alias is private).
2247    pub struct BenchIndex(EqIndex);
2248
2249    /// Build an index with a single `col` whose `val` bucket holds `rows`.
2250    /// The bucket key is derived with the same `json_index_key` the query path
2251    /// uses, so a matching `eq` predicate is guaranteed to hit.
2252    pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
2253        let key = json_index_key(val).expect("benchable index key");
2254        let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
2255        col_map.insert(key, rows);
2256        let mut index: EqIndex = EqIndex::default();
2257        index.insert(col.to_string(), col_map);
2258        BenchIndex(index)
2259    }
2260
2261    /// Resolve `predicates` against the index — the timed operation.
2262    pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2263        try_index(&idx.0, predicates)
2264    }
2265
2266    /// Reference implementation of the pre-`Cow` behaviour: always clones the
2267    /// matched bucket into an owned `Vec` (what `try_index` did before the
2268    /// single-`eq` borrow fast path). Used as the `clone-before` baseline so
2269    /// the borrow win is measured against the old allocation cost in-process.
2270    pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
2271        let [pred] = predicates else { return None };
2272        if pred.op.as_str() != "eq" {
2273            return None;
2274        }
2275        let col_lower = pred.col.to_lowercase();
2276        let col_map = idx.0.get(&col_lower)?;
2277        let key = json_index_key(pred.val.as_ref()?)?;
2278        Some(col_map.get(&key).cloned().unwrap_or_default())
2279    }
2280}
2281
2282/// Return rows `[offset, offset+limit)` from a chunked dataset by slicing
2283/// the underlying batches (zero-copy) and concatenating the (small) page.
2284fn slice_global(
2285    chunks: &[RecordBatch],
2286    schema: &Arc<Schema>,
2287    offset: usize,
2288    limit: usize,
2289) -> Result<RecordBatch, AppError> {
2290    if limit == 0 || chunks.is_empty() {
2291        return Ok(RecordBatch::new_empty(schema.clone()));
2292    }
2293    let mut out = Vec::new();
2294    let mut to_skip = offset;
2295    let mut remaining = limit;
2296    for b in chunks {
2297        if remaining == 0 {
2298            break;
2299        }
2300        let n = b.num_rows();
2301        if to_skip >= n {
2302            to_skip -= n;
2303            continue;
2304        }
2305        let take = remaining.min(n - to_skip);
2306        out.push(b.slice(to_skip, take));
2307        to_skip = 0;
2308        remaining -= take;
2309    }
2310    if out.is_empty() {
2311        return Ok(RecordBatch::new_empty(schema.clone()));
2312    }
2313    compute::concat_batches(schema, out.iter()).map_err(AppError::from)
2314}
2315
2316/// Materialise the page `rows[offset..offset+limit]` from a chunked dataset.
2317/// Row ids are global (across the concatenation of all chunks). We map each
2318/// requested row to its (chunk, local-index), `take` per chunk, then stitch
2319/// the per-chunk results back together preserving the original row order.
2320fn take_page(
2321    chunks: &[RecordBatch],
2322    schema: &Arc<Schema>,
2323    rows: &[u32],
2324    offset: usize,
2325    limit: usize,
2326) -> Result<RecordBatch, AppError> {
2327    let start = offset.min(rows.len());
2328    let len = limit.min(rows.len() - start);
2329    if len == 0 || chunks.is_empty() {
2330        return Ok(RecordBatch::new_empty(schema.clone()));
2331    }
2332
2333    // Prefix-sum table: `offsets[i]` is the first global row id of chunk `i`,
2334    // and `offsets.last()` is the total row count.
2335    let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
2336    let mut acc: u32 = 0;
2337    offsets.push(0);
2338    for b in chunks {
2339        acc = acc
2340            .checked_add(b.num_rows() as u32)
2341            .expect("row count exceeds u32::MAX");
2342        offsets.push(acc);
2343    }
2344
2345    // Bucket each global row id into the chunk that contains it, remembering
2346    // the original output position so we can restore page order at the end.
2347    let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
2348    for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
2349        let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
2350        let local = gid - offsets[bi];
2351        buckets[bi].push((out_pos as u32, local));
2352    }
2353
2354    // Per-chunk take, recording the destination index for each emitted row.
2355    let mut takens: Vec<RecordBatch> = Vec::new();
2356    let mut dest: Vec<u32> = Vec::with_capacity(len);
2357    for (bi, bucket) in buckets.iter().enumerate() {
2358        if bucket.is_empty() {
2359            continue;
2360        }
2361        let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
2362        let cols: Vec<ArrayRef> = chunks[bi]
2363            .columns()
2364            .iter()
2365            .map(|c| {
2366                arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
2367                    .map_err(AppError::from)
2368            })
2369            .collect::<Result<_, _>>()?;
2370        takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
2371        dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
2372    }
2373
2374    // Stitch per-chunk results then permute to restore the requested order.
2375    let stitched = compute::concat_batches(schema, takens.iter())?;
2376    let mut inv = vec![0u32; len];
2377    for (i, &d) in dest.iter().enumerate() {
2378        inv[d as usize] = i as u32;
2379    }
2380    let perm = UInt32Array::from(inv);
2381    let cols: Vec<ArrayRef> = stitched
2382        .columns()
2383        .iter()
2384        .map(|c| {
2385            arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
2386                .map_err(AppError::from)
2387        })
2388        .collect::<Result<_, _>>()?;
2389    RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
2390}
2391
2392/// Build the equality index per the dataset's policy, against the chunked
2393/// representation. Row ids are global across the concatenation of all
2394/// chunks (so they remain compatible with `take_page` / `slice_global`).
2395fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
2396    use rayon::prelude::*;
2397
2398    if cfg.mode == IndexMode::None || chunks.is_empty() {
2399        return EqIndex::default();
2400    }
2401
2402    let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
2403        Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
2404    } else {
2405        None
2406    };
2407
2408    let max_card = if cfg.mode == IndexMode::Auto {
2409        Some(cfg.max_cardinality)
2410    } else {
2411        None
2412    };
2413
2414    // Per-chunk starting global row id.
2415    let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
2416    let mut acc: u32 = 0;
2417    for b in chunks {
2418        batch_offsets.push(acc);
2419        acc = acc
2420            .checked_add(b.num_rows() as u32)
2421            .expect("row count exceeds u32::MAX");
2422    }
2423
2424    let schema = chunks[0].schema();
2425
2426    schema
2427        .fields()
2428        .par_iter()
2429        .enumerate()
2430        .filter_map(|(ci, field)| {
2431            let col_lower = field.name().to_lowercase();
2432            if let Some(a) = &allow
2433                && !a.contains_key(&col_lower)
2434            {
2435                return None;
2436            }
2437
2438            // Only build for index-friendly types; skip everything else
2439            // up-front so we don't pay the per-chunk dispatch cost.
2440            let dtype = field.data_type();
2441            let dict_utf8 = matches!(dtype,
2442                DataType::Dictionary(k, v)
2443                    if matches!(k.as_ref(), DataType::Int32)
2444                    && matches!(v.as_ref(), DataType::Utf8));
2445            match dtype {
2446                DataType::Utf8
2447                | DataType::Utf8View
2448                | DataType::Boolean
2449                | DataType::Int8
2450                | DataType::Int16
2451                | DataType::Int32
2452                | DataType::Int64 => {}
2453                _ if dict_utf8 => {}
2454                _ => return None,
2455            }
2456
2457            let mut map: FastMap<String, Vec<u32>> = FastMap::default();
2458
2459            for (bi, batch) in chunks.iter().enumerate() {
2460                let base = batch_offsets[bi];
2461                let col = batch.column(ci);
2462
2463                macro_rules! index_col {
2464                    ($arr_ty:ty) => {{
2465                        let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2466                        for row in 0..arr.len() {
2467                            if arr.is_null(row) {
2468                                continue;
2469                            }
2470                            let key = arr.value(row).to_string();
2471                            let gid = base + row as u32;
2472                            if let Some(v) = map.get_mut(&key) {
2473                                v.push(gid);
2474                            } else {
2475                                if let Some(mc) = max_card {
2476                                    if map.len() >= mc {
2477                                        return None;
2478                                    }
2479                                }
2480                                map.insert(key, vec![gid]);
2481                            }
2482                        }
2483                    }};
2484                }
2485
2486                if dict_utf8 {
2487                    // Dictionary(Int32, Utf8): iterate keys + look up the
2488                    // string value from the (small) dictionary. We allocate
2489                    // the key string only when the value is new — repeated
2490                    // values reuse the existing HashMap entry by hash, but
2491                    // `HashMap::get_mut` still needs the key, so we use a
2492                    // borrowed lookup via `get` first to avoid the alloc.
2493                    let arr = col
2494                        .as_any()
2495                        .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2496                        )?;
2497                    let keys = arr.keys();
2498                    let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2499                    for row in 0..arr.len() {
2500                        if arr.is_null(row) {
2501                            continue;
2502                        }
2503                        let k = keys.value(row) as usize;
2504                        let s = values.value(k);
2505                        let gid = base + row as u32;
2506                        if let Some(v) = map.get_mut(s) {
2507                            v.push(gid);
2508                        } else {
2509                            if let Some(mc) = max_card
2510                                && map.len() >= mc
2511                            {
2512                                return None;
2513                            }
2514                            map.insert(s.to_string(), vec![gid]);
2515                        }
2516                    }
2517                } else {
2518                    match dtype {
2519                        DataType::Utf8 => index_col!(StringArray),
2520                        DataType::Utf8View => index_col!(StringViewArray),
2521                        DataType::Boolean => index_col!(BooleanArray),
2522                        DataType::Int8 => index_col!(Int8Array),
2523                        DataType::Int16 => index_col!(Int16Array),
2524                        DataType::Int32 => index_col!(Int32Array),
2525                        DataType::Int64 => index_col!(Int64Array),
2526                        _ => unreachable!(),
2527                    }
2528                }
2529            }
2530
2531            Some((col_lower, map))
2532        })
2533        .collect()
2534}
2535
2536// ---------------------------------------------------------------------------
2537// Serialise-time temporal cast: convert Timestamp/Date/Time columns to Utf8
2538// on the page batch right before JSON encoding. We deliberately do **not**
2539// pay this cost at load time — a `Date32` is 4 bytes per row, its ISO-8601
2540// rendering is ~10–24 bytes per row, and a wide dataset full of temporal
2541// columns would balloon resident RAM. The cast is applied per returned page
2542// after pagination, so the cost is paid only for rows the caller requested.
2543// ---------------------------------------------------------------------------
2544
2545/// Returns true for Arrow types that `write_value` can render directly. Any
2546/// type returning false is pre-cast to Utf8 in [`cast_for_serialize`] so the
2547/// JSON output is faithful rather than silently `null`.
2548fn writable_inline(dt: &DataType) -> bool {
2549    match dt {
2550        DataType::Utf8
2551        | DataType::LargeUtf8
2552        | DataType::Utf8View
2553        | DataType::Boolean
2554        | DataType::Int8
2555        | DataType::Int16
2556        | DataType::Int32
2557        | DataType::Int64
2558        | DataType::UInt8
2559        | DataType::UInt16
2560        | DataType::UInt32
2561        | DataType::UInt64
2562        | DataType::Float32
2563        | DataType::Float64
2564        | DataType::Decimal128(_, _)
2565        | DataType::Decimal256(_, _) => true,
2566        DataType::Dictionary(k, v)
2567            if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2568        {
2569            true
2570        }
2571        _ => false,
2572    }
2573}
2574
2575/// Cast any column whose dtype isn't directly writable by `write_value` to
2576/// `Utf8`, on the bounded page batch. Covers temporals (Timestamp/Date/Time)
2577/// — kept native in resident memory to save RAM — and also any exotic dtype
2578/// (Float16, Binary, List, Struct, Decimal-with-unsupported-precision, …)
2579/// so the JSON serializer never falls back to writing literal `null`.
2580fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2581    let schema = batch.schema();
2582    let to_cast: Vec<usize> = schema
2583        .fields()
2584        .iter()
2585        .enumerate()
2586        .filter_map(|(i, f)| {
2587            if writable_inline(f.data_type()) {
2588                None
2589            } else {
2590                Some(i)
2591            }
2592        })
2593        .collect();
2594    if to_cast.is_empty() {
2595        return Ok(batch.clone());
2596    }
2597    let new_fields: Vec<Field> = schema
2598        .fields()
2599        .iter()
2600        .enumerate()
2601        .map(|(i, f)| {
2602            if to_cast.contains(&i) {
2603                Field::new(f.name(), DataType::Utf8, f.is_nullable())
2604            } else {
2605                f.as_ref().clone()
2606            }
2607        })
2608        .collect();
2609    let new_schema = Arc::new(Schema::new(new_fields));
2610    let cols: Vec<ArrayRef> = batch
2611        .columns()
2612        .iter()
2613        .enumerate()
2614        .map(|(i, c)| {
2615            if to_cast.contains(&i) {
2616                compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2617            } else {
2618                Ok(c.clone())
2619            }
2620        })
2621        .collect::<Result<_, _>>()?;
2622    RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2623}
2624
2625// ---------------------------------------------------------------------------
2626// Compute helpers — retained for symmetry; reserved for future inline scan
2627// path. Currently the engine fallback handles all non-index queries.
2628// ---------------------------------------------------------------------------
2629
2630#[allow(dead_code)]
2631#[derive(Clone, Copy)]
2632enum CmpOp {
2633    Eq,
2634    Neq,
2635    Gt,
2636    Gte,
2637    Lt,
2638    Lte,
2639    Like,
2640    ILike,
2641}
2642
2643#[allow(dead_code)]
2644fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2645    let arr = col
2646        .as_any()
2647        .downcast_ref::<StringArray>()
2648        .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2649    let s = Scalar::new(StringArray::from(vec![val]));
2650    Ok(eq(arr, &s)?)
2651}
2652
2653#[allow(dead_code)]
2654fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2655    macro_rules! num_cmp {
2656        ($arr_type:ty, $cast:ty) => {{
2657            let n = val
2658                .as_f64()
2659                .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2660                as $cast;
2661            let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2662            let s = Scalar::new(<$arr_type>::from(vec![n]));
2663            Ok(match op {
2664                CmpOp::Eq => eq(arr, &s)?,
2665                CmpOp::Neq => neq(arr, &s)?,
2666                CmpOp::Gt => gt(arr, &s)?,
2667                CmpOp::Gte => gt_eq(arr, &s)?,
2668                CmpOp::Lt => lt(arr, &s)?,
2669                CmpOp::Lte => lt_eq(arr, &s)?,
2670                CmpOp::Like | CmpOp::ILike => {
2671                    return Err(AppError::InvalidValue(
2672                        "LIKE requires a string column".into(),
2673                    ));
2674                }
2675            })
2676        }};
2677    }
2678    match col.data_type() {
2679        DataType::Utf8 => {
2680            let s = val
2681                .as_str()
2682                .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2683            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2684            let sc = Scalar::new(StringArray::from(vec![s]));
2685            Ok(match op {
2686                CmpOp::Eq => eq(arr, &sc)?,
2687                CmpOp::Neq => neq(arr, &sc)?,
2688                CmpOp::Gt => gt(arr, &sc)?,
2689                CmpOp::Gte => gt_eq(arr, &sc)?,
2690                CmpOp::Lt => lt(arr, &sc)?,
2691                CmpOp::Lte => lt_eq(arr, &sc)?,
2692                CmpOp::Like => compute::like(arr, &sc)?,
2693                CmpOp::ILike => compute::ilike(arr, &sc)?,
2694            })
2695        }
2696        DataType::Int8 => num_cmp!(Int8Array, i8),
2697        DataType::Int16 => num_cmp!(Int16Array, i16),
2698        DataType::Int32 => num_cmp!(Int32Array, i32),
2699        DataType::Int64 => num_cmp!(Int64Array, i64),
2700        DataType::Float32 => num_cmp!(Float32Array, f32),
2701        DataType::Float64 => num_cmp!(Float64Array, f64),
2702        dt => Err(AppError::InvalidValue(format!(
2703            "unsupported type for comparison: {dt:?}"
2704        ))),
2705    }
2706}
2707
2708// ---------------------------------------------------------------------------
2709// Serialisation
2710// ---------------------------------------------------------------------------
2711
2712pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2713    // Temporal columns are kept native in resident memory (compact). Cast
2714    // them — plus any other dtype `write_value` can't render directly — to
2715    // Utf8 here, on the bounded page batch, so the JSON output is faithful
2716    // without paying the load-time RAM cost.
2717    let batch = cast_for_serialize(batch)?;
2718    let schema = batch.schema();
2719    let n_rows = batch.num_rows();
2720
2721    let keys: Vec<Vec<u8>> = schema
2722        .fields()
2723        .iter()
2724        .map(|f| {
2725            let mut k = Vec::with_capacity(f.name().len() + 3);
2726            k.push(b'"');
2727            k.extend_from_slice(f.name().as_bytes());
2728            k.extend_from_slice(b"\":");
2729            k
2730        })
2731        .collect();
2732
2733    // Resolve every column's concrete Arrow array type exactly ONCE here,
2734    // instead of paying a `data_type()` match + `downcast_ref` for every
2735    // single cell. The inner row loop then dispatches over a small enum,
2736    // which the compiler turns into a cheap jump table.
2737    let encoders: Vec<ColEnc> = batch
2738        .columns()
2739        .iter()
2740        .map(|c| ColEnc::new(c.as_ref()))
2741        .collect();
2742
2743    let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2744    let mut itoa_buf = itoa::Buffer::new();
2745    let mut ryu_buf = ryu::Buffer::new();
2746    buf.push(b'[');
2747
2748    for row in 0..n_rows {
2749        if row > 0 {
2750            buf.push(b',');
2751        }
2752        buf.push(b'{');
2753        for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2754            if i > 0 {
2755                buf.push(b',');
2756            }
2757            buf.extend_from_slice(key);
2758            enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2759        }
2760        buf.push(b'}');
2761    }
2762
2763    buf.push(b']');
2764    Ok(unsafe { String::from_utf8_unchecked(buf) })
2765}
2766
2767/// Per-column JSON encoder. The concrete Arrow array type is resolved once
2768/// (in [`ColEnc::new`]) so the hot row loop dispatches over this small enum
2769/// instead of repeating `data_type()` matching + `downcast_ref` for every
2770/// cell. Each variant borrows the already-downcast typed array.
2771enum ColEnc<'a> {
2772    Utf8(&'a StringArray),
2773    LargeUtf8(&'a LargeStringArray),
2774    Utf8View(&'a StringViewArray),
2775    /// Dictionary-encoded UTF-8 (Int32 keys). Holds the keys array and the
2776    /// pre-downcast string values so neither is re-resolved per row.
2777    DictI32Utf8(
2778        &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2779        &'a StringArray,
2780    ),
2781    Bool(&'a BooleanArray),
2782    I8(&'a Int8Array),
2783    I16(&'a Int16Array),
2784    I32(&'a Int32Array),
2785    I64(&'a Int64Array),
2786    U8(&'a UInt8Array),
2787    U16(&'a UInt16Array),
2788    U32(&'a UInt32Array),
2789    U64(&'a UInt64Array),
2790    Dec128(&'a Decimal128Array),
2791    Dec256(&'a Decimal256Array),
2792    F32(&'a Float32Array),
2793    F64(&'a Float64Array),
2794    /// Anything else: fall back to the generic `write_value` dispatch.
2795    Other(&'a dyn Array),
2796}
2797
2798impl<'a> ColEnc<'a> {
2799    fn new(col: &'a dyn Array) -> ColEnc<'a> {
2800        macro_rules! dc {
2801            ($t:ty) => {
2802                col.as_any().downcast_ref::<$t>().unwrap()
2803            };
2804        }
2805        match col.data_type() {
2806            DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2807            DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2808            DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2809            DataType::Dictionary(key, value)
2810                if matches!(key.as_ref(), DataType::Int32)
2811                    && matches!(value.as_ref(), DataType::Utf8) =>
2812            {
2813                let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2814                let values = dict
2815                    .values()
2816                    .as_any()
2817                    .downcast_ref::<StringArray>()
2818                    .unwrap();
2819                ColEnc::DictI32Utf8(dict, values)
2820            }
2821            DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2822            DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2823            DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2824            DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2825            DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2826            DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2827            DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2828            DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2829            DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2830            DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2831            DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2832            DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2833            DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2834            _ => ColEnc::Other(col),
2835        }
2836    }
2837
2838    #[inline]
2839    fn write(
2840        &self,
2841        buf: &mut Vec<u8>,
2842        row: usize,
2843        itoa_buf: &mut itoa::Buffer,
2844        ryu_buf: &mut ryu::Buffer,
2845    ) {
2846        macro_rules! int {
2847            ($arr:expr) => {{
2848                if $arr.is_null(row) {
2849                    buf.extend_from_slice(b"null");
2850                } else {
2851                    buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
2852                }
2853            }};
2854        }
2855        match self {
2856            ColEnc::Utf8(a) => {
2857                if a.is_null(row) {
2858                    buf.extend_from_slice(b"null");
2859                } else {
2860                    write_str(buf, a.value(row));
2861                }
2862            }
2863            ColEnc::LargeUtf8(a) => {
2864                if a.is_null(row) {
2865                    buf.extend_from_slice(b"null");
2866                } else {
2867                    write_str(buf, a.value(row));
2868                }
2869            }
2870            ColEnc::Utf8View(a) => {
2871                if a.is_null(row) {
2872                    buf.extend_from_slice(b"null");
2873                } else {
2874                    write_str(buf, a.value(row));
2875                }
2876            }
2877            ColEnc::DictI32Utf8(keys, values) => {
2878                if keys.is_null(row) {
2879                    buf.extend_from_slice(b"null");
2880                } else {
2881                    let k = keys.keys().value(row) as usize;
2882                    write_str(buf, values.value(k));
2883                }
2884            }
2885            ColEnc::Bool(a) => {
2886                if a.is_null(row) {
2887                    buf.extend_from_slice(b"null");
2888                } else {
2889                    buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
2890                }
2891            }
2892            ColEnc::I8(a) => int!(a),
2893            ColEnc::I16(a) => int!(a),
2894            ColEnc::I32(a) => int!(a),
2895            ColEnc::I64(a) => int!(a),
2896            ColEnc::U8(a) => int!(a),
2897            ColEnc::U16(a) => int!(a),
2898            ColEnc::U32(a) => int!(a),
2899            ColEnc::U64(a) => int!(a),
2900            ColEnc::Dec128(a) => {
2901                if a.is_null(row) {
2902                    buf.extend_from_slice(b"null");
2903                } else {
2904                    write_str(buf, &a.value_as_string(row));
2905                }
2906            }
2907            ColEnc::Dec256(a) => {
2908                if a.is_null(row) {
2909                    buf.extend_from_slice(b"null");
2910                } else {
2911                    write_str(buf, &a.value_as_string(row));
2912                }
2913            }
2914            ColEnc::F32(a) => {
2915                if a.is_null(row) {
2916                    buf.extend_from_slice(b"null");
2917                } else {
2918                    let v = a.value(row);
2919                    if v.is_finite() {
2920                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2921                    } else {
2922                        buf.extend_from_slice(b"null");
2923                    }
2924                }
2925            }
2926            ColEnc::F64(a) => {
2927                if a.is_null(row) {
2928                    buf.extend_from_slice(b"null");
2929                } else {
2930                    let v = a.value(row);
2931                    if v.is_finite() {
2932                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2933                    } else {
2934                        buf.extend_from_slice(b"null");
2935                    }
2936                }
2937            }
2938            ColEnc::Other(col) => {
2939                if col.is_null(row) {
2940                    buf.extend_from_slice(b"null");
2941                } else {
2942                    write_value(buf, *col, row);
2943                }
2944            }
2945        }
2946    }
2947}
2948
2949#[inline]
2950fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
2951    match col.data_type() {
2952        DataType::Utf8 => write_str(
2953            buf,
2954            col.as_any()
2955                .downcast_ref::<StringArray>()
2956                .unwrap()
2957                .value(row),
2958        ),
2959        DataType::LargeUtf8 => write_str(
2960            buf,
2961            col.as_any()
2962                .downcast_ref::<LargeStringArray>()
2963                .unwrap()
2964                .value(row),
2965        ),
2966        DataType::Utf8View => write_str(
2967            buf,
2968            col.as_any()
2969                .downcast_ref::<StringViewArray>()
2970                .unwrap()
2971                .value(row),
2972        ),
2973        DataType::Dictionary(key, value)
2974            if matches!(key.as_ref(), DataType::Int32)
2975                && matches!(value.as_ref(), DataType::Utf8) =>
2976        {
2977            let dict = col
2978                .as_any()
2979                .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2980                .unwrap();
2981            let keys = dict.keys();
2982            let values = dict
2983                .values()
2984                .as_any()
2985                .downcast_ref::<StringArray>()
2986                .unwrap();
2987            let k = keys.value(row) as usize;
2988            write_str(buf, values.value(k));
2989        }
2990        DataType::Boolean => {
2991            let v = col
2992                .as_any()
2993                .downcast_ref::<BooleanArray>()
2994                .unwrap()
2995                .value(row);
2996            buf.extend_from_slice(if v { b"true" } else { b"false" });
2997        }
2998        DataType::Int8 => {
2999            let mut b = itoa::Buffer::new();
3000            buf.extend_from_slice(
3001                b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
3002                    .as_bytes(),
3003            );
3004        }
3005        DataType::Int16 => {
3006            let mut b = itoa::Buffer::new();
3007            buf.extend_from_slice(
3008                b.format(
3009                    col.as_any()
3010                        .downcast_ref::<Int16Array>()
3011                        .unwrap()
3012                        .value(row),
3013                )
3014                .as_bytes(),
3015            );
3016        }
3017        DataType::Int32 => {
3018            let mut b = itoa::Buffer::new();
3019            buf.extend_from_slice(
3020                b.format(
3021                    col.as_any()
3022                        .downcast_ref::<Int32Array>()
3023                        .unwrap()
3024                        .value(row),
3025                )
3026                .as_bytes(),
3027            );
3028        }
3029        DataType::Int64 => {
3030            let mut b = itoa::Buffer::new();
3031            buf.extend_from_slice(
3032                b.format(
3033                    col.as_any()
3034                        .downcast_ref::<Int64Array>()
3035                        .unwrap()
3036                        .value(row),
3037                )
3038                .as_bytes(),
3039            );
3040        }
3041        DataType::UInt8 => {
3042            let mut b = itoa::Buffer::new();
3043            buf.extend_from_slice(
3044                b.format(
3045                    col.as_any()
3046                        .downcast_ref::<UInt8Array>()
3047                        .unwrap()
3048                        .value(row),
3049                )
3050                .as_bytes(),
3051            );
3052        }
3053        DataType::UInt16 => {
3054            let mut b = itoa::Buffer::new();
3055            buf.extend_from_slice(
3056                b.format(
3057                    col.as_any()
3058                        .downcast_ref::<UInt16Array>()
3059                        .unwrap()
3060                        .value(row),
3061                )
3062                .as_bytes(),
3063            );
3064        }
3065        DataType::UInt32 => {
3066            let mut b = itoa::Buffer::new();
3067            buf.extend_from_slice(
3068                b.format(
3069                    col.as_any()
3070                        .downcast_ref::<UInt32Array>()
3071                        .unwrap()
3072                        .value(row),
3073                )
3074                .as_bytes(),
3075            );
3076        }
3077        DataType::UInt64 => {
3078            let mut b = itoa::Buffer::new();
3079            buf.extend_from_slice(
3080                b.format(
3081                    col.as_any()
3082                        .downcast_ref::<UInt64Array>()
3083                        .unwrap()
3084                        .value(row),
3085                )
3086                .as_bytes(),
3087            );
3088        }
3089        DataType::Decimal128(_, _) => {
3090            let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
3091            write_str(buf, &arr.value_as_string(row));
3092        }
3093        DataType::Decimal256(_, _) => {
3094            let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
3095            write_str(buf, &arr.value_as_string(row));
3096        }
3097        DataType::Float32 => {
3098            let v = col
3099                .as_any()
3100                .downcast_ref::<Float32Array>()
3101                .unwrap()
3102                .value(row);
3103            if v.is_finite() {
3104                let mut b = ryu::Buffer::new();
3105                buf.extend_from_slice(b.format_finite(v).as_bytes());
3106            } else {
3107                buf.extend_from_slice(b"null");
3108            }
3109        }
3110        DataType::Float64 => {
3111            let v = col
3112                .as_any()
3113                .downcast_ref::<Float64Array>()
3114                .unwrap()
3115                .value(row);
3116            if v.is_finite() {
3117                let mut b = ryu::Buffer::new();
3118                buf.extend_from_slice(b.format_finite(v).as_bytes());
3119            } else {
3120                buf.extend_from_slice(b"null");
3121            }
3122        }
3123        // Any dtype not handled above must have been pre-cast to Utf8 by
3124        // `cast_for_serialize`. Hitting this arm is a bug — surface it as a
3125        // visible JSON string rather than a silent null so it can't be
3126        // mistaken for a real NULL value.
3127        other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
3128    }
3129}
3130
3131#[inline]
3132fn write_str(buf: &mut Vec<u8>, s: &str) {
3133    buf.push(b'"');
3134    for &byte in s.as_bytes() {
3135        match byte {
3136            b'"' => buf.extend_from_slice(b"\\\""),
3137            b'\\' => buf.extend_from_slice(b"\\\\"),
3138            b'\n' => buf.extend_from_slice(b"\\n"),
3139            b'\r' => buf.extend_from_slice(b"\\r"),
3140            b'\t' => buf.extend_from_slice(b"\\t"),
3141            0x00..=0x1f => {
3142                buf.extend_from_slice(b"\\u00");
3143                const HEX: &[u8] = b"0123456789abcdef";
3144                buf.push(HEX[(byte >> 4) as usize]);
3145                buf.push(HEX[(byte & 0xf) as usize]);
3146            }
3147            b => buf.push(b),
3148        }
3149    }
3150    buf.push(b'"');
3151}
3152
3153// ---------------------------------------------------------------------------
3154// Backend trait impl — wires the store into the generic core handlers.
3155// ---------------------------------------------------------------------------
3156
3157#[async_trait]
3158impl Backend for Store {
3159    fn names(&self) -> Vec<String> {
3160        Store::names(self)
3161    }
3162
3163    fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
3164        let st = self.dataset(name)?;
3165        Ok(DatasetSummary {
3166            name: st.schema.name.clone(),
3167            columns: st.schema.columns.len(),
3168            rows: st.num_rows(),
3169        })
3170    }
3171
3172    fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
3173        let st = self.dataset(name)?;
3174        Ok(Arc::new(st.schema.clone()))
3175    }
3176
3177    fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
3178        let st = self.dataset(name)?;
3179        // Report indexed columns in the dataset's declared schema order
3180        // so the `/schema` response is deterministic.
3181        let mut cols: Vec<String> = st
3182            .schema
3183            .columns
3184            .iter()
3185            .map(|c| c.name.clone())
3186            .filter(|n| st.index.contains_key(n))
3187            .collect();
3188        // Any indexed columns not in `schema.columns` (shouldn't happen,
3189        // but be defensive) get appended sorted.
3190        let mut extras: Vec<String> = st
3191            .index
3192            .keys()
3193            .filter(|n| !cols.iter().any(|c| c == *n))
3194            .cloned()
3195            .collect();
3196        extras.sort();
3197        cols.extend(extras);
3198        Ok(cols)
3199    }
3200
3201    async fn sample(&self, name: &str) -> Result<String, AppError> {
3202        Store::sample(self, name).await
3203    }
3204
3205    async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
3206        Store::query(self, name, req).await
3207    }
3208
3209    async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
3210        Store::query_arrow(self, name, req).await
3211    }
3212
3213    async fn query_arrow_stream(
3214        &self,
3215        name: &str,
3216        req: &QueryRequest,
3217    ) -> Result<ArrowIpcStream, AppError> {
3218        Store::query_arrow_stream(self, name, req).await
3219    }
3220
3221    async fn query_arrow_stream_all(
3222        &self,
3223        name: &str,
3224        req: &QueryRequest,
3225    ) -> Result<ArrowIpcStream, AppError> {
3226        Store::query_arrow_stream_all(self, name, req).await
3227    }
3228
3229    async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
3230        Store::count(self, name, req).await
3231    }
3232
3233    async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
3234        Store::query_sql(self, sql, max_rows).await
3235    }
3236
3237    async fn query_sql_arrow_stream(
3238        &self,
3239        sql: &str,
3240        max_rows: u64,
3241    ) -> Result<ArrowIpcStream, AppError> {
3242        Store::query_sql_arrow_stream(self, sql, max_rows).await
3243    }
3244
3245    async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
3246        Store::parquet(self, name).await
3247    }
3248
3249    async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
3250        Store::reload(self, name).await
3251    }
3252}
3253
3254#[cfg(test)]
3255mod tests {
3256    use super::is_s3_access_denied;
3257
3258    #[test]
3259    fn detects_s3_access_denied_variants() {
3260        // Representative messages from object_store / AWS / deltalake / MinIO.
3261        for msg in [
3262            "Generic S3 error: Error performing get request: response error \"<Error><Code>AccessDenied</Code></Error>\", status: 403",
3263            "Client error with status 403 Forbidden",
3264            "S3 error: Access Denied",
3265            "request failed: 403 Forbidden",
3266        ] {
3267            assert!(is_s3_access_denied(msg), "should flag: {msg}");
3268        }
3269    }
3270
3271    #[test]
3272    fn ignores_unrelated_errors() {
3273        for msg in [
3274            "Not a Delta table: Generic delta kernel error: No files in log segment",
3275            "object at location data/part.parquet not found",
3276            "failed to infer parquet schema: invalid magic bytes",
3277        ] {
3278            assert!(!is_s3_access_denied(msg), "should not flag: {msg}");
3279        }
3280    }
3281}
3282