Skip to main content

datapress_datafusion/
store.rs

1use std::any::Any;
2use std::borrow::Cow;
3use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex, RwLock};
6use std::time::Duration;
7
8use arc_swap::ArcSwap;
9use arrow::array::{
10    Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
11    Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
12    StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
13};
14use arrow::compute;
15use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
16use arrow::datatypes::{DataType, Field, Schema};
17use async_trait::async_trait;
18use parquet::arrow::ProjectionMask;
19use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
20use serde_json::Value as JsonValue;
21
22use datafusion::catalog::information_schema::InformationSchemaProvider;
23use datafusion::catalog::{CatalogProviderList, SchemaProvider};
24use datafusion::datasource::file_format::parquet::ParquetFormat;
25use datafusion::datasource::listing::{
26    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
27};
28use datafusion::datasource::{MemTable, TableProvider};
29use datafusion::error::Result as DfResult;
30use datafusion::execution::cache::DefaultListFilesCache;
31use datafusion::execution::cache::cache_manager::CacheManagerConfig;
32use datafusion::execution::runtime_env::RuntimeEnvBuilder;
33use datafusion::logical_expr::{
34    ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
35};
36use datafusion::prelude::{SessionConfig, SessionContext};
37use datafusion::scalar::ScalarValue;
38
39use object_store::aws::AmazonS3Builder;
40use url::Url;
41
42use datapress_core::backend::{
43    ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
44};
45use datapress_core::config::{
46    AddressingStyle, AppConfig, DataFusionConfig, DatasetConfig, IndexConfig, IndexMode,
47    Partitioning, ResolvedCreds, S3Config, ServerConfig, SourceKind,
48};
49use datapress_core::errors::AppError;
50use datapress_core::models::{CountRequest, Predicate, QueryRequest};
51use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
52
53// ---------------------------------------------------------------------------
54// Public types
55// ---------------------------------------------------------------------------
56
57/// Hasher-swapped map alias. The default `std::collections::HashMap` uses
58/// SipHash (DoS-resistant but slow). The equality index keys are our own
59/// column values, not untrusted hash-flood input, so we use `ahash` — a much
60/// faster non-cryptographic hasher — on this per-request hot path.
61type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
62
63/// Pre-built equality index: lowercase col name → string-encoded value → sorted row ids.
64type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
65
66/// Per-dataset state: schema metadata, the resident chunks, and the
67/// equality index built per the dataset's `[dataset.index]` policy.
68///
69/// `data` is the dataset as a `Vec<RecordBatch>` — exactly the chunks
70/// produced by the underlying reader, after temporal columns are cast to
71/// `Utf8`. We deliberately do **not** call `concat_batches` to fuse them
72/// into one batch: on wide schemas (hundreds of columns) that transiently
73/// allocates a second full copy of the decoded Arrow data, pushing peak
74/// RSS to ~2× the resident size and OOM-killing the process at startup.
75///
76/// When `lazy` is true the dataset is *not* materialised: `data` is empty,
77/// `index` is empty, and every query is dispatched to DataFusion SQL
78/// against a registered `ListingTable`. `arrow_schema` still carries the
79/// inferred schema so discovery endpoints work.
80pub struct DatasetState {
81    pub schema: DatasetSchema,
82    pub data: Vec<RecordBatch>,
83    pub arrow_schema: Arc<Schema>,
84    pub index: EqIndex,
85    pub lazy: bool,
86}
87
88impl DatasetState {
89    /// Sum of `num_rows()` across all resident chunks. `0` for lazy datasets.
90    pub fn num_rows(&self) -> usize {
91        self.data.iter().map(|b| b.num_rows()).sum()
92    }
93}
94
95/// Multi-dataset registry. Each dataset is registered in the shared
96/// `SessionContext` under its configured name. The per-dataset state is
97/// held behind `ArcSwap` so a reload can atomically replace it without
98/// blocking concurrent queries.
99pub struct Store {
100    ctx: SessionContext,
101    max_page_size: u64,
102    /// Original dataset configs, indexed by name. Reload reads the source
103    /// path from here — clients can't redirect a reload at an arbitrary file.
104    /// Behind an `RwLock` so datasets registered at runtime can be added
105    /// without a restart.
106    configs: RwLock<HashMap<String, DatasetConfig>>,
107    /// Hot-swappable snapshot of all currently loaded datasets.
108    datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
109    /// Per-name reload mutex. Serialises concurrent reloads of the same
110    /// dataset; reloads of different datasets proceed in parallel.
111    reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
112}
113
114impl Store {
115    /// Shared `SessionContext` all datasets are registered in. Handed to the
116    /// optional pgwire server so it queries the exact same tables as the HTTP
117    /// API (never a fresh context). Clone it before moving it into a task.
118    pub fn session_context(&self) -> &SessionContext {
119        &self.ctx
120    }
121
122    /// Load every dataset declared in `cfg`.
123    pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
124        // One-shot init for the deltalake S3 backend. Safe to call more
125        // than once — the handlers are idempotent.
126        if cfg
127            .datasets
128            .iter()
129            .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
130        {
131            deltalake::aws::register_handlers(None);
132        }
133
134        // NOTE: identifier handling for the raw-SQL endpoint is done by
135        // rewriting schema column/table references to quoted canonical
136        // names before execution (see `query_sql`). DataFusion's default
137        // identifier normalization is left ON so unquoted *aliases* and
138        // CTE names stay case-insensitive, matching DuckDB.
139        let ctx = build_tuned_context(&cfg.datafusion);
140        let mut datasets = HashMap::with_capacity(cfg.datasets.len());
141        let mut configs = HashMap::with_capacity(cfg.datasets.len());
142
143        for d in &cfg.datasets {
144            log::info!(
145                "Loading dataset '{}' ({} @ {})",
146                d.name,
147                d.source.kind.as_str(),
148                d.source.location
149            );
150            // Force lazy when the source exceeds the server size threshold.
151            // S3-aware: local sources are stat'd, S3 sources are sized by
152            // listing the object store under their prefix.
153            let d: std::borrow::Cow<'_, DatasetConfig> = match should_force_lazy(d, &cfg.server)
154                .await
155            {
156                Some(bytes) => {
157                    log::info!(
158                        "dataset '{}': {:.1} MiB exceeds force_lazy_above_mb = {} → forcing lazy",
159                        d.name,
160                        bytes as f64 / (1024.0 * 1024.0),
161                        cfg.server.force_lazy_above_mb
162                    );
163                    let mut forced = d.clone();
164                    forced.lazy = true;
165                    std::borrow::Cow::Owned(forced)
166                }
167                None => std::borrow::Cow::Borrowed(d),
168            };
169            let d = d.as_ref();
170            let (state, provider) = match build_dataset(d, &ctx).await {
171                Ok(built) => built,
172                Err(AppError::EmptyDataset(msg)) => {
173                    log::warn!("skipping empty dataset '{}': {msg}", d.name);
174                    continue;
175                }
176                // An S3 source we're not authorized to read (bad creds, no
177                // bucket/prefix policy, expired token) returns a 403. Don't
178                // abort the whole registry for one inaccessible dataset —
179                // log and skip it, exactly like an empty source. The rest of
180                // the datasets still load and serve traffic.
181                Err(e) if d.source.is_s3() && is_s3_access_denied(&e.to_string()) => {
182                    log::warn!(
183                        "skipping dataset '{}': S3 access denied — check credentials \
184                         and bucket policy ({e})",
185                        d.name
186                    );
187                    continue;
188                }
189                Err(e) => return Err(e),
190            };
191            ctx.register_table(d.name.as_str(), provider)?;
192            datasets.insert(d.name.clone(), Arc::new(state));
193            configs.insert(d.name.clone(), d.clone());
194        }
195        Ok(Self {
196            ctx,
197            max_page_size: cfg.server.max_page_size.max(1),
198            configs: RwLock::new(configs),
199            datasets: ArcSwap::from_pointee(datasets),
200            reload_locks: Mutex::new(HashMap::new()),
201        })
202    }
203
204    /// Sorted list of dataset names.
205    pub fn names(&self) -> Vec<String> {
206        let snap = self.datasets.load();
207        let mut v: Vec<String> = snap.keys().cloned().collect();
208        v.sort();
209        v
210    }
211
212    pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
213        self.datasets
214            .load()
215            .get(name)
216            .cloned()
217            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
218    }
219
220    /// JSON for the first row of the dataset, or `null` if empty. Used by
221    /// `GET /api/datasets/{name}/schema` for discoverability.
222    pub async fn sample(&self, name: &str) -> Result<String, AppError> {
223        let st = self.dataset(name)?;
224
225        // Lazy datasets have no resident batch — pull one row via SQL.
226        if st.lazy {
227            let table = DatasetSchema::quote_ident(&st.schema.name);
228            let sql = format!("SELECT * FROM {table} LIMIT 1");
229            let df = self.ctx.sql(&sql).await?;
230            let batches = df.collect().await?;
231            if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
232                return Ok("null".into());
233            }
234            let arr = serialize(&batches[0].slice(0, 1))?;
235            let trimmed = arr.trim();
236            let inner = trimmed
237                .strip_prefix('[')
238                .and_then(|s| s.strip_suffix(']'))
239                .unwrap_or(trimmed);
240            return Ok(inner.to_string());
241        }
242
243        let first = match st.data.iter().find(|b| b.num_rows() > 0) {
244            Some(b) => b,
245            None => return Ok("null".into()),
246        };
247        let arr = serialize(&first.slice(0, 1))?;
248        // strip the outer [] to return a single object
249        let trimmed = arr.trim();
250        let inner = trimmed
251            .strip_prefix('[')
252            .and_then(|s| s.strip_suffix(']'))
253            .unwrap_or(trimmed);
254        Ok(inner.to_string())
255    }
256
257    /// Rebuild `name` from disk and atomically swap it in. Concurrent queries
258    /// against the same name continue to see the *old* `Arc<DatasetState>`
259    /// until they finish; the old data is dropped once the last reference
260    /// goes away.
261    pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
262        // 1. Look up the dataset config. Not finding it = 404.
263        let cfg = self
264            .configs
265            .read()
266            .unwrap()
267            .get(name)
268            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
269            .clone();
270
271        // 2. Per-name lock: only one reload of this dataset at a time.
272        let lock = {
273            let mut locks = self.reload_locks.lock().unwrap();
274            locks
275                .entry(name.to_string())
276                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
277                .clone()
278        };
279        let _guard = lock.lock().await;
280
281        let started = std::time::Instant::now();
282
283        // Invalidate the object-store LIST cache before rebuilding. For an
284        // S3 *prefix* source the files are often rewritten with new UUID
285        // names; without dropping the cached listing the rebuilt
286        // `ListingTable` would replay the stale keys and fail with
287        // "object at location … not found". A plain prefix is therefore
288        // re-listed fresh; an explicit filename/glob source is re-resolved
289        // against the same configured pattern. Clearing the whole cache is
290        // cheap — other datasets simply re-list on their next query.
291        //
292        // NOTE: this must clear the cache that actually lives on the
293        // `RuntimeEnv`, not just the one we opt into via
294        // `[datafusion] list_files_cache`. DataFusion ≥ 53 installs a
295        // *default* `DefaultListFilesCache` (1 MiB, infinite TTL) on every
296        // `SessionContext` even when no cache is configured, so the staleness
297        // bites regardless of our flag.
298        if let Some(cache) = self.ctx.runtime_env().cache_manager.get_list_files_cache() {
299            cache.clear();
300        }
301
302        // 3. Heavy lifting (source read + index build). Parquet/delta
303        // readers are themselves async, so we don't wrap in `web::block`.
304        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
305        let rows = state.num_rows();
306
307        // 4. Atomic swap.
308        //   a) Replace the MemTable inside the SessionContext.
309        //   b) ArcSwap a new snapshot map with the updated Arc<DatasetState>.
310        // In-flight queries already hold the old provider + old Arc; they
311        // run to completion. New queries see the new data.
312        let _ = self.ctx.deregister_table(name)?;
313        self.ctx.register_table(name, provider)?;
314
315        let mut new_map = (**self.datasets.load()).clone();
316        new_map.insert(name.to_string(), Arc::new(state));
317        self.datasets.store(Arc::new(new_map));
318
319        let elapsed_ms = started.elapsed().as_millis();
320        log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
321        Ok(ReloadStats { rows, elapsed_ms })
322    }
323
324    /// Register a brand-new dataset from `cfg` at runtime. Opens the source,
325    /// registers a provider in the shared `SessionContext`, and inserts it
326    /// into the live snapshot so it is immediately queryable — no restart.
327    pub async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
328        cfg.validate_for_register()?;
329
330        // Fast pre-check before taking the (async) per-name lock.
331        if self.datasets.load().contains_key(&cfg.name) {
332            return Err(AppError::InvalidValue(format!(
333                "dataset '{}' already exists",
334                cfg.name
335            )));
336        }
337
338        // Serialise against a concurrent register/reload of the same name.
339        let lock = {
340            let mut locks = self.reload_locks.lock().unwrap();
341            locks
342                .entry(cfg.name.clone())
343                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
344                .clone()
345        };
346        let _guard = lock.lock().await;
347
348        // Re-check under the lock — another task may have won the race.
349        if self.datasets.load().contains_key(&cfg.name) {
350            return Err(AppError::InvalidValue(format!(
351                "dataset '{}' already exists",
352                cfg.name
353            )));
354        }
355
356        // One-shot init for the deltalake S3 backend, mirroring `load`.
357        // Idempotent — safe to call again for a runtime-registered dataset.
358        if cfg.source.kind == SourceKind::Delta && cfg.source.is_s3() {
359            deltalake::aws::register_handlers(None);
360        }
361
362        let started = std::time::Instant::now();
363        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
364        let rows = state.num_rows();
365        let columns = state.schema.columns.len();
366
367        self.ctx.register_table(cfg.name.as_str(), provider)?;
368
369        let mut new_map = (**self.datasets.load()).clone();
370        new_map.insert(cfg.name.clone(), Arc::new(state));
371        self.datasets.store(Arc::new(new_map));
372        self.configs
373            .write()
374            .unwrap()
375            .insert(cfg.name.clone(), cfg.clone());
376
377        let elapsed_ms = started.elapsed().as_millis();
378        log::info!(
379            "registered dataset '{}' ({} @ {}): {rows} rows in {elapsed_ms} ms",
380            cfg.name,
381            cfg.source.kind.as_str(),
382            cfg.source.location
383        );
384        Ok(DatasetSummary {
385            name: cfg.name,
386            columns,
387            rows,
388        })
389    }
390
391    /// Run a `QueryRequest` against `name`. Empty predicates → O(1) Arrow
392    /// slice. Otherwise → DataFusion SQL on the single registered table.
393    /// Lazy datasets skip the in-memory hot paths and always dispatch to SQL.
394    pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
395        let batch = self.query_batch(name, req).await?;
396        if batch.num_rows() == 0 {
397            return Ok("[]".to_string());
398        }
399        serialize(&batch)
400    }
401
402    /// Rewrite registered table / column references in a raw-SQL string to
403    /// their canonical, quoted spelling so matching is case-insensitive
404    /// (like DuckDB). Builds the lookup from every currently loaded
405    /// dataset; unknown identifiers (aliases, CTE names) are left for the
406    /// engine's default normalization to handle.
407    fn canonicalize_sql(&self, sql: &str) -> String {
408        let snap = self.datasets.load();
409        let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
410        let mut columns: HashMap<String, String> = HashMap::new();
411        for (name, state) in snap.iter() {
412            tables.insert(name.to_lowercase(), name.clone());
413            for col in &state.schema.columns {
414                columns
415                    .entry(col.name.to_lowercase())
416                    .or_insert_with(|| col.name.clone());
417            }
418        }
419        datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
420    }
421
422    /// Execute a pre-validated raw `SELECT` and return the JSON `data`
423    /// array. The statement has already passed
424    /// [`datapress_core::sql::validate`]; here it is wrapped in an outer
425    /// `LIMIT max_rows` so the result is bounded regardless of the user's
426    /// own clauses, executed through the shared `SessionContext`, and run
427    /// through the same fast JSON encoder as [`Self::query`].
428    pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
429        let cap = max_rows.max(1);
430        let sql = self.canonicalize_sql(sql);
431        // DESCRIBE yields a schema listing and cannot be nested in a
432        // subquery on DataFusion, so run it directly. Its row count is
433        // bounded by the column count and the slice below still enforces
434        // `cap`; everything else is wrapped in an outer LIMIT.
435        let wrapped = if datapress_core::sql::is_describe(&sql) {
436            sql
437        } else {
438            format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
439        };
440        let df = self.ctx.sql(&wrapped).await?;
441        let batches = df.collect().await?;
442        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
443            return Ok("[]".to_string());
444        }
445        let batch = if batches.len() == 1 {
446            batches.into_iter().next().expect("checked len")
447        } else {
448            compute::concat_batches(&batches[0].schema(), batches.iter())?
449        };
450        // Defence in depth: the outer LIMIT already bounds the row count,
451        // but slice anyway so a planning quirk can never blow the cap.
452        let batch = if batch.num_rows() as u64 > cap {
453            batch.slice(0, cap as usize)
454        } else {
455            batch
456        };
457        serialize(&batch)
458    }
459
460    /// Same plan as [`Self::query_sql`], but encode the bounded result as
461    /// an Arrow IPC stream instead of JSON. Backs the Arrow
462    /// content-negotiated branch of `POST /api/v1/sql`.
463    pub async fn query_sql_arrow_stream(
464        &self,
465        sql: &str,
466        max_rows: u64,
467    ) -> Result<ArrowIpcStream, AppError> {
468        let cap = max_rows.max(1);
469        let sql = self.canonicalize_sql(sql);
470        // DESCRIBE cannot be nested in a subquery on DataFusion (see
471        // `query_sql`); run it directly. Its output is bounded by the
472        // column count, so the missing outer LIMIT is harmless.
473        let wrapped = if datapress_core::sql::is_describe(&sql) {
474            sql
475        } else {
476            format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
477        };
478        let df = self.ctx.sql(&wrapped).await?;
479        let batches = df.collect().await?;
480        Ok(stream_arrow_batches(batches))
481    }
482
483    /// Same plan as [`Self::query`], but encode the result page as an
484    /// Arrow IPC stream (one schema message + one batch + EOS). Empty
485    /// results still produce a valid, self-describing zero-batch stream.
486    pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
487        let batch = self.query_batch(name, req).await?;
488        let schema = batch.schema();
489        let mut buf = Vec::with_capacity(8 * 1024);
490        {
491            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
492            if batch.num_rows() > 0 {
493                w.write(&batch)?;
494            }
495            w.finish()?;
496        }
497        Ok(buf)
498    }
499
500    pub async fn query_arrow_stream(
501        &self,
502        name: &str,
503        req: &QueryRequest,
504    ) -> Result<ArrowIpcStream, AppError> {
505        let batches = self.query_batches(name, req).await?;
506        Ok(stream_arrow_batches(batches))
507    }    pub async fn query_arrow_stream_all(
508        &self,
509        name: &str,
510        req: &QueryRequest,
511    ) -> Result<ArrowIpcStream, AppError> {
512        let batches = self.query_batches_all(name, req).await?;
513        Ok(stream_arrow_batches(batches))
514    }
515
516    /// Encode the entire dataset as a single self-contained Parquet file.
517    ///
518    /// Collects every row (all columns, no predicates, no paging) and runs
519    /// it through a single [`parquet::arrow::ArrowWriter`], so the result
520    /// carries the row-group + footer metadata a Parquet reader needs to
521    /// answer `count(*)` straight from the footer. Powers the cached
522    /// `GET /datasets/{name}/parquet` HTTP endpoint.
523    pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
524        // All rows, all columns, no predicates / ordering / limit.
525        let req = QueryRequest {
526            columns: Vec::new(),
527            predicates: Vec::new(),
528            group_by: Vec::new(),
529            aggregations: Vec::new(),
530            having: Vec::new(),
531            distinct: false,
532            order_by: Vec::new(),
533            limit: None,
534            page: 1,
535            page_size: 1,
536        };
537        let st = self.dataset(name)?;
538        let batches = self.query_batches_all(name, &req).await?;
539        // Use the actual batch schema when we have rows so the writer schema
540        // matches exactly (projection/nullability); fall back to the
541        // dataset schema for an empty dataset.
542        let schema = batches
543            .first()
544            .map(|b| b.schema())
545            .unwrap_or_else(|| st.arrow_schema.clone());
546
547        let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
548        {
549            let props = parquet::file::properties::WriterProperties::builder()
550                .set_compression(parquet::basic::Compression::SNAPPY)
551                .build();
552            let mut writer =
553                parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
554                    .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
555            for batch in &batches {
556                if batch.num_rows() > 0 {
557                    writer
558                        .write(batch)
559                        .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
560                }
561            }
562            writer
563                .close()
564                .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
565        }
566        Ok(bytes::Bytes::from(buf))
567    }
568
569    /// Compute the result page as a single `RecordBatch`. Shared between
570    /// the JSON and Arrow IPC encoders.
571    async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
572        let batches = self.query_batches(name, req).await?;
573        if batches.is_empty() {
574            return Ok(RecordBatch::new_empty(Arc::new(
575                arrow::datatypes::Schema::empty(),
576            )));
577        }
578        if batches.len() == 1 {
579            return Ok(batches.into_iter().next().expect("checked len"));
580        }
581        if batches.iter().all(|b| b.num_rows() == 0) {
582            return Ok(RecordBatch::new_empty(batches[0].schema()));
583        }
584        let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
585        Ok(batch)
586    }
587
588    /// Compute the result page as Arrow batches. Arrow IPC responses can
589    /// write these directly, while JSON callers concatenate via
590    /// [`Self::query_batch`] for the existing row conversion path.
591    async fn query_batches(
592        &self,
593        name: &str,
594        req: &QueryRequest,
595    ) -> Result<Vec<RecordBatch>, AppError> {
596        let st = self.dataset(name)?;
597
598        let page = req.page.max(1);
599        let page_size = req.page_size.clamp(1, self.max_page_size);
600        let offset = ((page - 1) * page_size) as usize;
601        let limit = page_size as usize;
602
603        self.query_batches_inner(st, req, Some((offset, limit)))
604            .await
605    }
606
607    /// Compute all matching rows as Arrow batches for the one-request
608    /// streaming endpoint. `page` and `page_size` are intentionally ignored;
609    /// optional `limit` still caps the total result size.
610    async fn query_batches_all(
611        &self,
612        name: &str,
613        req: &QueryRequest,
614    ) -> Result<Vec<RecordBatch>, AppError> {
615        let st = self.dataset(name)?;
616        self.query_batches_inner(st, req, None).await
617    }
618
619    async fn query_batches_inner(
620        &self,
621        st: Arc<DatasetState>,
622        req: &QueryRequest,
623        page_window: Option<(usize, usize)>,
624    ) -> Result<Vec<RecordBatch>, AppError> {
625        let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
626
627        // In-memory hot paths only fire when:
628        //   - the dataset is materialised,
629        //   - the caller did not ask for ordering,
630        //   - and did not ask for a hard `limit` cap on a paged request.
631        // Both of the latter two require sorting / capping that the SQL
632        // engine handles uniformly across all data types.
633        let can_fast_path = !st.lazy
634            && req.order_by.is_empty()
635            && (page_window.is_none() || req.limit.is_none())
636            && req.group_by.is_empty()
637            && !req.distinct;
638
639        if can_fast_path {
640            let total = st.num_rows();
641
642            // No predicates -> O(1) raw Arrow slices over resident batches,
643            // no engine overhead.
644            if req.predicates.is_empty() {
645                if page_window.is_none() && req.limit.is_none() {
646                    return st
647                        .data
648                        .iter()
649                        .cloned()
650                        .map(|batch| project(&st.schema, batch, &req.columns))
651                        .collect();
652                }
653                let start = offset.min(total);
654                let len = limit.min(total - start);
655                let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
656                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
657            }
658
659            // Index fast path: if every predicate is eq/in on an indexed column,
660            // resolve via the pre-built equality index.
661            if let Some(rows) = try_index(&st.index, &req.predicates) {
662                let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
663                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
664            }
665        }
666
667        // Fallback (and only path for lazy datasets): DataFusion SQL.
668        let (sql, params) = match page_window {
669            Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
670            None => build_query_stream_sql(&st.schema, req)?,
671        };
672        let mut df = self.ctx.sql(&sql).await?;
673        if !params.is_empty() {
674            df = df.with_param_values(params)?;
675        }
676        let batches = df.collect().await?;
677        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
678            let schema = batches
679                .first()
680                .map(|b| b.schema())
681                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
682            return Ok(vec![RecordBatch::new_empty(schema)]);
683        }
684        Ok(batches)
685    }
686}
687
688fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
689    let schema = batches
690        .first()
691        .map(|batch| batch.schema())
692        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
693    let (mut writer, stream) = arrow_ipc_stream_channel(8);
694
695    tokio::task::spawn_blocking(move || {
696        let result = (|| -> Result<(), AppError> {
697            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
698            for batch in batches {
699                if batch.num_rows() > 0 {
700                    w.write(&batch)?;
701                }
702            }
703            w.finish()?;
704            Ok(())
705        })();
706        if let Err(err) = result {
707            log::error!("datafusion arrow stream failed: {err}");
708            writer.send_error(err);
709        }
710    });
711
712    stream
713}
714
715impl Store {
716    /// Return the number of rows matching `req.predicates`. With no
717    /// predicates this is a cheap metadata lookup on materialised datasets
718    /// and a `SELECT COUNT(*)` on lazy ones.
719    pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
720        let st = self.dataset(name)?;
721
722        if !st.lazy {
723            // No predicates → resident row count, no scan.
724            if req.predicates.is_empty() {
725                return Ok(st.num_rows() as i64);
726            }
727            // Index fast path: same eligibility rules as `query`.
728            if let Some(rows) = try_index(&st.index, &req.predicates) {
729                return Ok(rows.len() as i64);
730            }
731        }
732
733        // Fallback: DataFusion SQL — same predicate translation as `query`,
734        // with predicate values bound as typed parameters.
735        let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
736        let mut df = self.ctx.sql(&sql).await?;
737        if !params.is_empty() {
738            df = df.with_param_values(params)?;
739        }
740        let batches = df.collect().await?;
741        let n = batches
742            .first()
743            .and_then(|b| {
744                b.column(0)
745                    .as_any()
746                    .downcast_ref::<arrow::array::Int64Array>()
747            })
748            .filter(|a| !a.is_empty())
749            .map(|a| a.value(0))
750            .unwrap_or(0);
751        Ok(n)
752    }
753}
754
755// ---------------------------------------------------------------------------
756// Dataset loading
757// ---------------------------------------------------------------------------
758
759/// Build a tuned `SessionContext` for the whole `Store`.
760///
761/// Everything here is opt-in via the `[datafusion]` config block; with the
762/// defaults this is equivalent to a plain `SessionContext::new()`. Lazy
763/// datasets (`ListingTable` over parquet, possibly on S3) benefit most:
764///
765/// * `pushdown_filters` — evaluate predicates *during* the parquet decode
766///   so filtered-out rows in a surviving row group are never materialised.
767/// * `reorder_filters` — let the scan reorder pushed-down predicates by
768///   selectivity (only meaningful with `pushdown_filters`).
769/// * `list_files_cache` — a `ListFilesCache` on the `RuntimeEnv` so repeated
770///   lazy queries reuse object-store `LIST` results instead of re-listing the
771///   prefix every time — the dominant per-query cost on S3.
772///
773/// Row-group / page-index / bloom-filter pruning and `metadata_size_hint`
774/// are already on by default in this DataFusion version, so they are left
775/// untouched.
776///
777/// Note: DataFusion ≥ 53 installs a *default* `DefaultListFilesCache` on the
778/// `RuntimeEnv` even with the defaults below, so a reload must explicitly
779/// invalidate it (see [`Store::reload`]) — otherwise a re-listed S3 prefix
780/// would keep serving stale, now-deleted object keys.
781fn build_tuned_context(cfg: &DataFusionConfig) -> SessionContext {
782    let mut config = SessionConfig::new();
783    {
784        let opts = config.options_mut();
785        opts.execution.parquet.pushdown_filters = cfg.pushdown_filters;
786        opts.execution.parquet.reorder_filters = cfg.reorder_filters;
787        // Expose the `information_schema.tables`/`columns` virtual tables. BI
788        // navigators (Npgsql/Power BI, DBeaver) query them alongside
789        // `pg_catalog` to enumerate tables, and the pgwire front-end needs
790        // them for schema browsing. They live in their own `information_schema`
791        // schema, so they don't affect dataset listing (which reads the
792        // Store's own registry, not the catalog) or the HTTP endpoints.
793        //
794        // We deliberately leave DataFusion's *built-in* `information_schema`
795        // short-circuit OFF and register our own provider instead (see
796        // `register_information_schema_shim`). The built-in provider only
797        // implements seven views and can't be extended; BI tools also probe
798        // `table_constraints` & friends, so our provider delegates to the
799        // built-in views and adds those missing (empty) constraint relations.
800        opts.catalog.information_schema = false;
801    }
802
803    // Name of the session's default schema, surfaced by the `current_schema()`
804    // compatibility UDF registered below.
805    let default_schema = config.options().catalog.default_schema.clone();
806
807    if !cfg.list_files_cache {
808        let ctx = SessionContext::new_with_config(config);
809        register_compat_udfs(&ctx, default_schema);
810        register_information_schema_shim(&ctx);
811        return ctx;
812    }
813
814    // Cache object-store listings so repeated lazy/S3 queries skip the
815    // LIST round-trips. A zero TTL means infinite (never expires).
816    let ttl = (cfg.list_files_cache_ttl_secs > 0)
817        .then(|| Duration::from_secs(cfg.list_files_cache_ttl_secs));
818    let list_cache = Arc::new(DefaultListFilesCache::new(
819        cfg.list_files_cache_mb.saturating_mul(1024 * 1024),
820        ttl,
821    ));
822    let cache_manager = CacheManagerConfig::default().with_list_files_cache(Some(list_cache));
823
824    let runtime = RuntimeEnvBuilder::new()
825        .with_cache_manager(cache_manager)
826        .build_arc()
827        .expect("failed to build DataFusion runtime env");
828
829    let ctx = SessionContext::new_with_config_rt(config, runtime);
830    register_compat_udfs(&ctx, default_schema);
831    register_information_schema_shim(&ctx);
832    ctx
833}
834
835/// Register scalar UDFs that exist on DuckDB but not on DataFusion, so the
836/// same portable smoke-test / introspection SQL works on both backends.
837///
838/// * `current_schema()` — DuckDB returns the active schema (`main`);
839///   DataFusion has no such function, so we return the session's default
840///   schema name (`public`).
841///
842/// Note: when the optional `pgwire` feature is enabled and the listener is
843/// started, `datafusion_pg_catalog::setup_pg_catalog` runs *after* this on the
844/// same shared context and re-registers `current_schema()` (plus siblings) with
845/// its own implementation, which also returns `public`. The library version
846/// wins there by construction (later `register_udf` replaces by name); we keep
847/// ours here so the behavior is identical whether or not pgwire is compiled in
848/// or enabled, and so the DuckDB-parity `/api/v1/sql` contract holds by default.
849fn register_compat_udfs(ctx: &SessionContext, default_schema: String) {
850    ctx.register_udf(ScalarUDF::from(CurrentSchemaUdf::new(default_schema)));
851}
852
853// ---------------------------------------------------------------------------
854// information_schema constraint views
855// ---------------------------------------------------------------------------
856
857/// Register an `information_schema` schema provider that serves DataFusion's
858/// seven built-in virtual views *and* the empty constraint relations BI tools
859/// probe when loading a table.
860///
861/// Why not simply `opts.catalog.information_schema = true`? DataFusion's
862/// built-in `information_schema` is special-cased in the planner:
863/// `SessionState::schema_for_ref` short-circuits *any* `information_schema`
864/// reference straight to a fresh built-in `InformationSchemaProvider`, so a
865/// schema registered under that name would be silently ignored. We therefore
866/// leave that flag OFF (see `build_tuned_context`) and register our own
867/// provider here, which *delegates* to the same built-in provider for its
868/// seven views (tables/columns/views/schemata/routines/parameters/df_settings)
869/// while adding the three constraint views below.
870///
871/// Power BI / Npgsql query `information_schema.table_constraints` — and,
872/// immediately after, `key_column_usage` and `referential_constraints` — when
873/// loading a table. DataFusion implements none of them, so the table load
874/// fails with "table 'information_schema.table_constraints' not found". These
875/// relations are served empty and correctly-shaped: DataPress datasets have no
876/// declared constraints, so zero rows is the *truthful* answer, not a stub. If
877/// DataPress ever gains declared keys, these views are where they'd surface.
878fn register_information_schema_shim(ctx: &SessionContext) {
879    let default_catalog = ctx
880        .state()
881        .config()
882        .options()
883        .catalog
884        .default_catalog
885        .clone();
886    let Some(catalog) = ctx.catalog(&default_catalog) else {
887        return;
888    };
889    let catalog_list = Arc::clone(ctx.state().catalog_list());
890    let provider = Arc::new(InformationSchemaWithConstraints::new(catalog_list));
891    // Registering under an existing schema name replaces it; the default
892    // catalog has no `information_schema` (the built-in one is virtual), so
893    // this is an insert. Ignore the returned previous provider, if any.
894    let _ = catalog.register_schema("information_schema", provider);
895}
896
897/// An `information_schema` provider that delegates to DataFusion's built-in
898/// [`InformationSchemaProvider`] and adds empty PostgreSQL constraint views.
899#[derive(Debug)]
900struct InformationSchemaWithConstraints {
901    /// The built-in provider; serves the seven standard virtual views on
902    /// demand from the session's catalog list.
903    inner: InformationSchemaProvider,
904    /// Empty, correctly-shaped constraint relations keyed by lowercase name.
905    constraints: HashMap<&'static str, Arc<dyn TableProvider>>,
906}
907
908impl InformationSchemaWithConstraints {
909    fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
910        let mut constraints: HashMap<&'static str, Arc<dyn TableProvider>> = HashMap::new();
911        constraints.insert("table_constraints", empty_table(table_constraints_schema()));
912        constraints.insert("key_column_usage", empty_table(key_column_usage_schema()));
913        constraints.insert(
914            "referential_constraints",
915            empty_table(referential_constraints_schema()),
916        );
917        Self {
918            inner: InformationSchemaProvider::new(catalog_list),
919            constraints,
920        }
921    }
922}
923
924#[async_trait]
925impl SchemaProvider for InformationSchemaWithConstraints {
926    fn as_any(&self) -> &dyn Any {
927        self
928    }
929
930    fn table_names(&self) -> Vec<String> {
931        let mut names = self.inner.table_names();
932        names.extend(self.constraints.keys().map(|k| (*k).to_string()));
933        names
934    }
935
936    async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
937        if let Some(table) = self.constraints.get(name.to_ascii_lowercase().as_str()) {
938            return Ok(Some(Arc::clone(table)));
939        }
940        self.inner.table(name).await
941    }
942
943    fn table_exist(&self, name: &str) -> bool {
944        self.constraints
945            .contains_key(name.to_ascii_lowercase().as_str())
946            || self.inner.table_exist(name)
947    }
948}
949
950/// Build a zero-row [`MemTable`] with the given schema.
951fn empty_table(schema: Arc<Schema>) -> Arc<dyn TableProvider> {
952    Arc::new(MemTable::try_new(schema, vec![Vec::new()]).expect("empty MemTable schema is valid"))
953}
954
955/// `information_schema.table_constraints` columns, per the SQL standard /
956/// PostgreSQL. All columns are `character_data`/`sql_identifier`/`yes_or_no`,
957/// which we surface as nullable `Utf8`.
958fn table_constraints_schema() -> Arc<Schema> {
959    Arc::new(Schema::new(vec![
960        Field::new("constraint_catalog", DataType::Utf8, true),
961        Field::new("constraint_schema", DataType::Utf8, true),
962        Field::new("constraint_name", DataType::Utf8, true),
963        Field::new("table_catalog", DataType::Utf8, true),
964        Field::new("table_schema", DataType::Utf8, true),
965        Field::new("table_name", DataType::Utf8, true),
966        Field::new("constraint_type", DataType::Utf8, true),
967        Field::new("is_deferrable", DataType::Utf8, true),
968        Field::new("initially_deferred", DataType::Utf8, true),
969        Field::new("enforced", DataType::Utf8, true),
970    ]))
971}
972
973/// `information_schema.key_column_usage` columns. The two positional columns
974/// are `cardinal_number` (a positive integer) → nullable `Int32`.
975fn key_column_usage_schema() -> Arc<Schema> {
976    Arc::new(Schema::new(vec![
977        Field::new("constraint_catalog", DataType::Utf8, true),
978        Field::new("constraint_schema", DataType::Utf8, true),
979        Field::new("constraint_name", DataType::Utf8, true),
980        Field::new("table_catalog", DataType::Utf8, true),
981        Field::new("table_schema", DataType::Utf8, true),
982        Field::new("table_name", DataType::Utf8, true),
983        Field::new("column_name", DataType::Utf8, true),
984        Field::new("ordinal_position", DataType::Int32, true),
985        Field::new("position_in_unique_constraint", DataType::Int32, true),
986    ]))
987}
988
989/// `information_schema.referential_constraints` columns. All columns are
990/// `sql_identifier`/`character_data` → nullable `Utf8`.
991fn referential_constraints_schema() -> Arc<Schema> {
992    Arc::new(Schema::new(vec![
993        Field::new("constraint_catalog", DataType::Utf8, true),
994        Field::new("constraint_schema", DataType::Utf8, true),
995        Field::new("constraint_name", DataType::Utf8, true),
996        Field::new("unique_constraint_catalog", DataType::Utf8, true),
997        Field::new("unique_constraint_schema", DataType::Utf8, true),
998        Field::new("unique_constraint_name", DataType::Utf8, true),
999        Field::new("match_option", DataType::Utf8, true),
1000        Field::new("update_rule", DataType::Utf8, true),
1001        Field::new("delete_rule", DataType::Utf8, true),
1002    ]))
1003}
1004
1005/// `current_schema()` — a no-argument scalar UDF returning the session's
1006/// default schema name as a constant `Utf8`.
1007#[derive(Debug, PartialEq, Eq, Hash)]
1008struct CurrentSchemaUdf {
1009    signature: Signature,
1010    schema: String,
1011}
1012
1013impl CurrentSchemaUdf {
1014    fn new(schema: String) -> Self {
1015        Self {
1016            signature: Signature::nullary(Volatility::Stable),
1017            schema,
1018        }
1019    }
1020}
1021
1022impl ScalarUDFImpl for CurrentSchemaUdf {
1023    fn as_any(&self) -> &dyn Any {
1024        self
1025    }
1026
1027    fn name(&self) -> &str {
1028        "current_schema"
1029    }
1030
1031    fn signature(&self) -> &Signature {
1032        &self.signature
1033    }
1034
1035    fn return_type(&self, _arg_types: &[DataType]) -> DfResult<DataType> {
1036        Ok(DataType::Utf8)
1037    }
1038
1039    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> DfResult<ColumnarValue> {
1040        Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(
1041            self.schema.clone(),
1042        ))))
1043    }
1044}
1045
1046async fn build_dataset(
1047    d: &DatasetConfig,
1048    ctx: &SessionContext,
1049) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1050    // Lazy datasets: register a streaming provider straight against the
1051    // source and skip the materialise / index / partition pipeline below.
1052    // Parquet uses a `ListingTable`; delta uses deltalake's own DataFusion
1053    // `TableProvider` (which reads the transaction log once for the file
1054    // list, then streams parquet row groups per query with predicate
1055    // pushdown + file skipping).
1056    if d.lazy {
1057        match (d.source.kind, d.source.is_s3()) {
1058            (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
1059            (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
1060            (SourceKind::Delta, _) => return build_lazy_delta(d, ctx).await,
1061        }
1062    }
1063
1064    // Fetch raw RecordBatches from whichever backing store the dataset
1065    // is configured to use. All four (parquet, delta) x (local, s3)
1066    // combinations converge into one Vec<RecordBatch>; the materialisation
1067    // / indexing / partitioning logic below is shared.
1068    let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
1069        (SourceKind::Parquet, false) => read_local_parquet(d)?,
1070        (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
1071        (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
1072        (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
1073    };
1074    if raw_batches.is_empty() {
1075        return Err(AppError::EmptyDataset(format!(
1076            "dataset '{}': source produced no batches",
1077            d.name
1078        )));
1079    }
1080    // A source can also resolve to one or more *zero-row* batches (e.g. an
1081    // empty Delta table, or a parquet file with only a schema). Treat that as
1082    // empty too so it's logged and skipped rather than registered as a 0-row
1083    // dataset that shows up in discovery / explore.
1084    if raw_batches.iter().all(|b| b.num_rows() == 0) {
1085        return Err(AppError::EmptyDataset(format!(
1086            "dataset '{}': source has a schema but no rows",
1087            d.name
1088        )));
1089    }
1090
1091    let chunks = raw_batches;
1092    let arrow_sch = chunks[0].schema();
1093
1094    // Build DatasetSchema from the Arrow schema.
1095    let columns: Vec<ColumnInfo> = arrow_sch
1096        .fields()
1097        .iter()
1098        .map(|f| {
1099            let dt = f.data_type();
1100            ColumnInfo {
1101                name: f.name().clone(),
1102                logical: arrow_to_logical(dt),
1103                sql_type: format!("{dt:?}"),
1104                nullable: f.is_nullable(),
1105            }
1106        })
1107        .collect();
1108    let schema = DatasetSchema::new(&d.name, columns)
1109        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1110
1111    // Build the equality index per the per-dataset policy. Operates on the
1112    // chunked representation directly so we never have to materialise a
1113    // single concatenated batch (which would double peak RSS on wide
1114    // schemas — see `DatasetState` docs).
1115    let index = build_eq_index_with_policy(&chunks, &d.index);
1116
1117    // Partition for parallel scans by the SQL fallback path. We distribute
1118    // the existing batches round-robin across `n_parts` partitions instead
1119    // of re-slicing a concatenated batch — `clone()` on a RecordBatch is
1120    // an Arc-clone of the column buffers, not a copy.
1121    let n_parts = std::thread::available_parallelism()
1122        .map(|n| n.get())
1123        .unwrap_or(4);
1124    let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
1125    for (i, b) in chunks.iter().enumerate() {
1126        if b.num_rows() == 0 {
1127            continue;
1128        }
1129        parts[i % n_parts].push(b.clone());
1130    }
1131    parts.retain(|p| !p.is_empty());
1132    let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
1133
1134    let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
1135    let mem_mb: usize = chunks
1136        .iter()
1137        .flat_map(|b| b.columns().iter())
1138        .map(|c| c.get_buffer_memory_size())
1139        .sum::<usize>()
1140        / 1_048_576;
1141    log::info!(
1142        "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
1143        d.name,
1144        d.source.kind.as_str(),
1145        total_rows,
1146        schema.columns.len(),
1147        mem_mb,
1148        chunks.len(),
1149        index.len()
1150    );
1151
1152    Ok((
1153        DatasetState {
1154            schema,
1155            data: chunks,
1156            arrow_schema: arrow_sch,
1157            index,
1158            lazy: false,
1159        },
1160        provider,
1161    ))
1162}
1163
1164/// Build a lazy state + `ListingTable` provider for a local parquet dataset.
1165/// The dataset is never read into RAM; DataFusion streams row groups on
1166/// each query. The returned `DatasetState.data` is an empty `Vec` —
1167/// `arrow_schema` still carries the inferred Arrow schema for discovery.
1168async fn build_lazy_local_parquet(
1169    d: &DatasetConfig,
1170    ctx: &SessionContext,
1171) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1172    let (url, part_keys) = lazy_local_listing(d)?;
1173
1174    let mut opts =
1175        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1176    if !part_keys.is_empty() {
1177        opts = opts.with_table_partition_cols(
1178            part_keys
1179                .iter()
1180                .map(|k| (k.clone(), DataType::Utf8))
1181                .collect(),
1182        );
1183    }
1184
1185    let session_state = ctx.state();
1186    // `infer_schema` returns the *file* schema (without partition columns);
1187    // `ListingTable` appends the declared partition columns on top.
1188    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1189        AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
1190    })?;
1191
1192    // An empty listing yields an empty schema (DataFusion does not error on a
1193    // prefix/glob that matches no files). Treat that as an empty dataset so
1194    // the load loop logs and skips it rather than registering 0 columns.
1195    if file_schema.fields().is_empty() {
1196        return Err(AppError::EmptyDataset(format!(
1197            "dataset '{}': no .parquet files at {}",
1198            d.name, d.source.location
1199        )));
1200    }
1201
1202    let cfg = ListingTableConfig::new(url)
1203        .with_listing_options(opts)
1204        .with_schema(file_schema.clone());
1205    let table = ListingTable::try_new(cfg).map_err(|e| {
1206        AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
1207    })?;
1208    let provider: Arc<dyn TableProvider> = Arc::new(table);
1209
1210    // Discovery schema = file columns + partition columns (Utf8).
1211    let mut fields: Vec<Field> = file_schema
1212        .fields()
1213        .iter()
1214        .map(|f| f.as_ref().clone())
1215        .collect();
1216    for k in &part_keys {
1217        if !fields.iter().any(|f| f.name() == k) {
1218            fields.push(Field::new(k, DataType::Utf8, false));
1219        }
1220    }
1221    let arrow_sch = Arc::new(Schema::new(fields));
1222
1223    let columns: Vec<ColumnInfo> = arrow_sch
1224        .fields()
1225        .iter()
1226        .map(|f| {
1227            let dt = f.data_type();
1228            ColumnInfo {
1229                name: f.name().clone(),
1230                logical: arrow_to_logical(dt),
1231                sql_type: format!("{dt:?}"),
1232                nullable: f.is_nullable(),
1233            }
1234        })
1235        .collect();
1236    let schema = DatasetSchema::new(&d.name, columns)
1237        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1238
1239    log::info!(
1240        "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
1241        d.name,
1242        d.source.kind.as_str(),
1243        schema.columns.len(),
1244        part_keys.len()
1245    );
1246
1247    Ok((
1248        DatasetState {
1249            schema,
1250            data: Vec::new(),
1251            arrow_schema: arrow_sch,
1252            index: EqIndex::default(),
1253            lazy: true,
1254        },
1255        provider,
1256    ))
1257}
1258
1259/// Resolve a local lazy-parquet location into a single `ListingTableUrl`
1260/// rooted at the dataset base plus the ordered hive partition keys (if any).
1261/// Handles three shapes: a glob (`root/city=*/*.parquet`), a directory
1262/// (hive root or flat folder of parquets), and a single `*.parquet` file.
1263fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1264    let loc = &d.source.location;
1265
1266    if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1267        let parts: Vec<&str> = loc.split('/').collect();
1268        let first_wild = parts
1269            .iter()
1270            .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1271            .unwrap_or(parts.len());
1272        let base = parts[..first_wild].join("/");
1273        let base = if base.is_empty() {
1274            "/".to_string()
1275        } else {
1276            base
1277        };
1278        // Partition keys: `key=…` components between the base and the file
1279        // pattern (the final component).
1280        let upper = parts.len().saturating_sub(1);
1281        let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1282            .iter()
1283            .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1284            .filter(|k| !k.is_empty())
1285            .collect();
1286        return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
1287    }
1288
1289    let path = std::path::Path::new(loc);
1290    if path.is_dir() {
1291        let keys = discover_hive_keys(path);
1292        return Ok((dir_url(path, d)?, keys));
1293    }
1294
1295    let url = ListingTableUrl::parse(loc)
1296        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
1297    Ok((url, Vec::new()))
1298}
1299
1300/// Parse a directory path into a `ListingTableUrl` (trailing slash so
1301/// DataFusion treats it as a directory root, not a single object).
1302fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
1303    let s = path.to_str().ok_or_else(|| {
1304        AppError::Internal(format!(
1305            "dataset '{}': non-utf8 path {}",
1306            d.name,
1307            path.display()
1308        ))
1309    })?;
1310    let s = if s.ends_with('/') {
1311        s.to_string()
1312    } else {
1313        format!("{s}/")
1314    };
1315    ListingTableUrl::parse(&s)
1316        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
1317}
1318
1319/// Walk down a directory following the first `key=value` subdirectory at
1320/// each level to discover the ordered hive partition keys. Returns an empty
1321/// vec for a flat (non-partitioned) folder.
1322fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
1323    let mut keys = Vec::new();
1324    let mut cur = base.to_path_buf();
1325    loop {
1326        let Ok(rd) = std::fs::read_dir(&cur) else {
1327            break;
1328        };
1329        let mut next: Option<(String, std::path::PathBuf)> = None;
1330        for entry in rd.flatten() {
1331            let p = entry.path();
1332            if !p.is_dir() {
1333                continue;
1334            }
1335            let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
1336                continue;
1337            };
1338            if let Some((k, v)) = name.split_once('=')
1339                && !k.is_empty()
1340                && !v.is_empty()
1341            {
1342                next = Some((k.to_string(), p));
1343                break;
1344            }
1345        }
1346        match next {
1347            Some((k, p)) => {
1348                keys.push(k);
1349                cur = p;
1350            }
1351            None => break,
1352        }
1353    }
1354    keys
1355}
1356
1357/// Lazy S3 parquet: register the dataset's S3 object store on `ctx`, then
1358/// build a `ListingTable` rooted at the `s3://bucket/prefix/` location with
1359/// any discovered hive partition columns. DataFusion does the directory
1360/// listing through the registered store and streams row groups on each
1361/// query — no local enumeration needed.
1362async fn build_lazy_s3_parquet(
1363    d: &DatasetConfig,
1364    ctx: &SessionContext,
1365) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1366    register_s3_object_store(d, ctx)?;
1367
1368    let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
1369
1370    // An empty S3 prefix yields an empty inferred schema (no error). Treat it
1371    // as an empty dataset so the load loop logs and skips it.
1372    if file_schema.fields().is_empty() {
1373        return Err(AppError::EmptyDataset(format!(
1374            "dataset '{}': no .parquet files at {}",
1375            d.name, d.source.location
1376        )));
1377    }
1378
1379    // Discovery schema = file columns + partition columns (Utf8).
1380    let mut fields: Vec<Field> = file_schema
1381        .fields()
1382        .iter()
1383        .map(|f| f.as_ref().clone())
1384        .collect();
1385    for k in &part_keys {
1386        if !fields.iter().any(|f| f.name() == k) {
1387            fields.push(Field::new(k, DataType::Utf8, false));
1388        }
1389    }
1390    let arrow_sch = Arc::new(Schema::new(fields));
1391
1392    let columns: Vec<ColumnInfo> = arrow_sch
1393        .fields()
1394        .iter()
1395        .map(|f| {
1396            let dt = f.data_type();
1397            ColumnInfo {
1398                name: f.name().clone(),
1399                logical: arrow_to_logical(dt),
1400                sql_type: format!("{dt:?}"),
1401                nullable: f.is_nullable(),
1402            }
1403        })
1404        .collect();
1405    let schema = DatasetSchema::new(&d.name, columns)
1406        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1407
1408    log::info!(
1409        "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
1410        d.name,
1411        d.source.kind.as_str(),
1412        schema.columns.len(),
1413        part_keys.len()
1414    );
1415
1416    Ok((
1417        DatasetState {
1418            schema,
1419            data: Vec::new(),
1420            arrow_schema: arrow_sch,
1421            index: EqIndex::default(),
1422            lazy: true,
1423        },
1424        provider,
1425    ))
1426}
1427
1428/// Build a `ListingTable` provider for an S3 parquet source, resolving the
1429/// base prefix and hive partition keys via [`s3_listing`]. The registered
1430/// S3 object store must already be present on `ctx`. Returns the provider,
1431/// the inferred *file* schema (without partition columns), and the ordered
1432/// partition keys.
1433async fn build_s3_listing_table(
1434    d: &DatasetConfig,
1435    ctx: &SessionContext,
1436) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
1437    let (url, part_keys) = s3_listing(d, ctx).await?;
1438
1439    let mut opts =
1440        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1441    if !part_keys.is_empty() {
1442        opts = opts.with_table_partition_cols(
1443            part_keys
1444                .iter()
1445                .map(|k| (k.clone(), DataType::Utf8))
1446                .collect(),
1447        );
1448    }
1449
1450    let session_state = ctx.state();
1451    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1452        AppError::Internal(format!(
1453            "dataset '{}': infer parquet schema on s3: {e}",
1454            d.name
1455        ))
1456    })?;
1457
1458    let cfg = ListingTableConfig::new(url)
1459        .with_listing_options(opts)
1460        .with_schema(file_schema.clone());
1461    let table = ListingTable::try_new(cfg).map_err(|e| {
1462        AppError::Internal(format!(
1463            "dataset '{}': ListingTable::try_new (s3): {e}",
1464            d.name
1465        ))
1466    })?;
1467    Ok((Arc::new(table), file_schema, part_keys))
1468}
1469
1470/// Resolve an S3 parquet source into a `(base ListingTableUrl, partition
1471/// keys)` pair. Hive keys come from the location glob when present
1472/// (`s3://bucket/events/year=*/...`), otherwise — for a plain prefix — they
1473/// are discovered by listing the registered object store. Honours the
1474/// dataset's `partitioning` mode (`auto` / `hive` / `none`).
1475async fn s3_listing(
1476    d: &DatasetConfig,
1477    ctx: &SessionContext,
1478) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1479    let s3 = d.s3.clone().unwrap_or_default();
1480    let want_partitions = !matches!(s3.partitioning, Partitioning::None);
1481    let loc = &d.source.location;
1482
1483    if d.source.has_glob() {
1484        let (base, keys) = split_glob_base_keys(loc);
1485        let base = format!("{}/", base.trim_end_matches('/'));
1486        let url = ListingTableUrl::parse(&base).map_err(|e| {
1487            AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1488        })?;
1489        let keys = if want_partitions { keys } else { Vec::new() };
1490        return Ok((url, keys));
1491    }
1492
1493    let base = if loc.ends_with('/') {
1494        loc.clone()
1495    } else {
1496        format!("{loc}/")
1497    };
1498    let url = ListingTableUrl::parse(&base).map_err(|e| {
1499        AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1500    })?;
1501    let keys = if want_partitions {
1502        discover_s3_hive_keys(ctx, &url).await
1503    } else {
1504        Vec::new()
1505    };
1506    Ok((url, keys))
1507}
1508
1509/// Split a glob location into its non-wildcard base path and the ordered
1510/// hive partition keys (`key=…` directory components between the base and
1511/// the final file pattern). Works for both local and `s3://` paths.
1512fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1513    let parts: Vec<&str> = loc.split('/').collect();
1514    let first_wild = parts
1515        .iter()
1516        .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1517        .unwrap_or(parts.len());
1518    let base = parts[..first_wild].join("/");
1519    let base = if base.is_empty() {
1520        "/".to_string()
1521    } else {
1522        base
1523    };
1524    let upper = parts.len().saturating_sub(1);
1525    let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1526        .iter()
1527        .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1528        .filter(|k| !k.is_empty())
1529        .collect();
1530    (base, keys)
1531}
1532
1533/// Discover ordered hive partition keys for an S3 prefix by walking the
1534/// object store one `key=value/` level at a time via delimiter listings.
1535/// Best-effort: any listing error stops discovery and returns what was found
1536/// so far (empty = treat as a flat folder).
1537async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1538    let store = match ctx.runtime_env().object_store(url.object_store()) {
1539        Ok(s) => s,
1540        Err(_) => return Vec::new(),
1541    };
1542    let mut keys = Vec::new();
1543    let mut prefix = url.prefix().clone();
1544    loop {
1545        let listing = match store.list_with_delimiter(Some(&prefix)).await {
1546            Ok(l) => l,
1547            Err(_) => break,
1548        };
1549        let mut next: Option<object_store::path::Path> = None;
1550        for cp in &listing.common_prefixes {
1551            if let Some(seg) = cp.parts().next_back() {
1552                let seg = seg.as_ref().to_string();
1553                if let Some((k, v)) = seg.split_once('=')
1554                    && !k.is_empty()
1555                    && !v.is_empty()
1556                {
1557                    keys.push(k.to_string());
1558                    next = Some(cp.clone());
1559                    break;
1560                }
1561            }
1562        }
1563        match next {
1564            Some(p) => prefix = p,
1565            None => break,
1566        }
1567    }
1568    keys
1569}
1570
1571/// Original local-parquet code path — sync file I/O. We set a large reader
1572/// batch size so wide schemas (hundreds of columns) don't pay per-array
1573/// metadata overhead on thousands of small (default 1024-row) batches.
1574///
1575/// Two memory-saving knobs are applied here:
1576///
1577/// * **Column projection** — if `d.columns` is non-empty, only those
1578///   columns are decoded; everything else is skipped at the parquet reader
1579///   level (no Arrow array is ever allocated for the dropped columns).
1580/// * **Dictionary preservation** — Utf8 columns whose parquet column chunks
1581///   carry a dictionary page are materialised as Arrow
1582///   `Dictionary(Int32, Utf8)` instead of plain `Utf8`. Low-cardinality
1583///   string columns (state, country, severity, …) stay represented as
1584///   `n_unique` string slots plus an Int32 index per row instead of
1585///   `n_rows` independent strings — typically 10×–50× smaller for
1586///   real-world data.
1587fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1588    let files = d.resolve_local_parquet_files()?;
1589    let mut all = Vec::new();
1590    let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1591        None
1592    } else {
1593        Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1594    };
1595
1596    for f in &files {
1597        let file = std::fs::File::open(f)
1598            .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1599
1600        // First pass: peek the parquet metadata + default Arrow schema so we
1601        // can (a) decide a column projection and (b) override Utf8 columns
1602        // that are dictionary-encoded in the file so the reader materialises
1603        // them as Arrow Dictionary arrays instead of expanding to plain Utf8.
1604        let probe = ParquetRecordBatchReaderBuilder::try_new(
1605            file.try_clone()
1606                .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1607        )?;
1608        let parquet_schema = probe.parquet_schema().clone();
1609        let arrow_schema = probe.schema().clone();
1610        let metadata = probe.metadata().clone();
1611        drop(probe);
1612
1613        // Column projection (top-level / leaf indices for flat schemas).
1614        let projection = if let Some(w) = &wanted {
1615            let indices: Vec<usize> = arrow_schema
1616                .fields()
1617                .iter()
1618                .enumerate()
1619                .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1620                .map(|(i, _)| i)
1621                .collect();
1622            if indices.is_empty() {
1623                return Err(AppError::Internal(format!(
1624                    "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1625                    d.name,
1626                    d.columns,
1627                    f.display()
1628                )));
1629            }
1630            ProjectionMask::roots(&parquet_schema, indices)
1631        } else {
1632            ProjectionMask::all()
1633        };
1634
1635        // Dictionary override: any Utf8 column whose first row group carries
1636        // a dictionary page is re-typed to Dictionary(Int32, Utf8). The
1637        // override schema must still describe every column in the parquet
1638        // file (projection is applied separately). Skipped entirely when
1639        // the dataset has `dict_encode = false` — escape hatch for cases
1640        // where the override interacts badly with null propagation in the
1641        // downstream engine.
1642        let mut new_fields: Vec<Field> = arrow_schema
1643            .fields()
1644            .iter()
1645            .map(|f| f.as_ref().clone())
1646            .collect();
1647        if d.dict_encode
1648            && let Some(rg0) = metadata.row_groups().first()
1649        {
1650            for (i, fld) in arrow_schema.fields().iter().enumerate() {
1651                if !matches!(
1652                    fld.data_type(),
1653                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1654                ) {
1655                    continue;
1656                }
1657                if let Some(col) = rg0.columns().get(i)
1658                    && col.dictionary_page_offset().is_some()
1659                {
1660                    new_fields[i] = Field::new(
1661                        fld.name(),
1662                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1663                        fld.is_nullable(),
1664                    );
1665                }
1666            }
1667        }
1668        let forced_schema = Arc::new(Schema::new(new_fields));
1669
1670        let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1671        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1672            .with_batch_size(65_536)
1673            .with_projection(projection)
1674            .build()?;
1675        // Hive-style partition columns (`city=NYC/…`) live in the path, not
1676        // the file. Fold them in as constant Utf8 columns so they show up in
1677        // the schema and are queryable — matching the DuckDB backend.
1678        let pairs = hive_pairs(f);
1679        for batch in reader {
1680            let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1681            all.push(if pairs.is_empty() {
1682                batch
1683            } else {
1684                append_partition_cols(&batch, &pairs)?
1685            });
1686        }
1687    }
1688    if all.is_empty() {
1689        return Err(AppError::Internal(format!(
1690            "dataset '{}': parquet source is empty",
1691            d.name
1692        )));
1693    }
1694    Ok(all)
1695}
1696
1697/// Ordered hive-style partition `(key, value)` pairs encoded in a path, i.e.
1698/// directory components shaped like `key=value` (e.g. `year=2024/city=NYC`).
1699fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1700    path.components()
1701        .filter_map(|c| c.as_os_str().to_str())
1702        .filter_map(|seg| {
1703            let (k, v) = seg.split_once('=')?;
1704            if k.is_empty() || v.is_empty() || v.contains('=') {
1705                return None;
1706            }
1707            Some((k.to_string(), v.to_string()))
1708        })
1709        .collect()
1710}
1711
1712/// Append constant Utf8 columns for each hive partition pair. A partition
1713/// key that collides with a real file column is skipped (the file wins).
1714fn append_partition_cols(
1715    batch: &RecordBatch,
1716    pairs: &[(String, String)],
1717) -> Result<RecordBatch, AppError> {
1718    let n = batch.num_rows();
1719    let mut fields: Vec<Field> = batch
1720        .schema()
1721        .fields()
1722        .iter()
1723        .map(|f| f.as_ref().clone())
1724        .collect();
1725    let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1726    for (k, v) in pairs {
1727        if fields.iter().any(|f| f.name() == k) {
1728            continue;
1729        }
1730        fields.push(Field::new(k, DataType::Utf8, false));
1731        cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1732    }
1733    RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1734        .map_err(|e| AppError::Internal(e.to_string()))
1735}
1736
1737/// Register an `AmazonS3` object store on the SessionContext, build a
1738/// `ListingTable` (with any hive partition columns) over the dataset prefix,
1739/// and stream the whole dataset back through `DataFrame::collect`. Using a
1740/// ListingTable here — rather than a bare `read_parquet` — means S3 sources
1741/// get the same hive-partition handling as local parquet.
1742async fn read_s3_parquet(
1743    d: &DatasetConfig,
1744    ctx: &SessionContext,
1745) -> Result<Vec<RecordBatch>, AppError> {
1746    register_s3_object_store(d, ctx)?;
1747    let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1748    let df = ctx
1749        .read_table(provider)
1750        .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1751    Ok(df.collect().await?)
1752}
1753
1754/// Open a Delta table (local or S3) and return the deltalake `DeltaTable`
1755/// handle. The transaction log is read here to resolve the current file
1756/// list + schema; the table's object store is registered on the session
1757/// `RuntimeEnv` lazily at scan time, so callers don't need to register a
1758/// store themselves. A location that doesn't exist, is empty, or was never
1759/// committed maps to [`AppError::EmptyDataset`] so startup can log-and-skip.
1760async fn open_delta_table(
1761    d: &DatasetConfig,
1762    opts: HashMap<String, String>,
1763) -> Result<deltalake::DeltaTable, AppError> {
1764    let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1765        AppError::Internal(format!(
1766            "dataset '{}': bad delta location '{}': {e}",
1767            d.name, d.source.location
1768        ))
1769    })?;
1770    deltalake::open_table_with_storage_options(url, opts)
1771        .await
1772        .map_err(|e| {
1773            // A Delta location that doesn't exist, is empty, or was never
1774            // committed has no files in its log segment. deltalake (kernel
1775            // 0.32) surfaces every one of these as:
1776            //   "Not a Delta table: Generic delta kernel error: No files in
1777            //    log segment"
1778            // — covering a missing local dir AND a non-existent S3 prefix
1779            // alike. Match case-insensitively (the kernel capitalises the
1780            // phrases) and treat it like any other empty dataset so startup
1781            // logs and skips instead of aborting the whole process.
1782            let msg = e.to_string();
1783            let low = msg.to_lowercase();
1784            if low.contains("no files in log segment") || low.contains("not a delta table") {
1785                AppError::EmptyDataset(format!(
1786                    "delta location '{}' has no committed files ({msg})",
1787                    d.source.location
1788                ))
1789            } else {
1790                AppError::Internal(format!(
1791                    "dataset '{}': delta open '{}': {msg}",
1792                    d.name, d.source.location
1793                ))
1794            }
1795        })
1796}
1797
1798/// Open a Delta table (local or S3) and return its DataFusion
1799/// `TableProvider`. The provider reads the transaction log to resolve the
1800/// current file list and registers the table's object store on the session
1801/// `RuntimeEnv` lazily at scan time, so callers don't need to register a
1802/// store themselves.
1803async fn open_delta_provider(
1804    d: &DatasetConfig,
1805    opts: HashMap<String, String>,
1806) -> Result<Arc<dyn TableProvider>, AppError> {
1807    let table = open_delta_table(d, opts).await?;
1808    table.table_provider().await.map_err(|e| {
1809        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1810    })
1811}
1812
1813/// Resolve the deltalake storage-options for a dataset: empty for local
1814/// tables, S3 credentials/endpoint for S3-backed ones.
1815fn delta_storage_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1816    if d.source.is_s3() {
1817        delta_s3_options(d)
1818    } else {
1819        Ok(HashMap::new())
1820    }
1821}
1822
1823/// Open a Delta table (local or S3) and stream every row back as a Vec of
1824/// `RecordBatch`. We materialise eagerly so the rest of the backend can
1825/// treat all datasets uniformly (single in-memory batch + eq-index).
1826async fn read_delta(
1827    d: &DatasetConfig,
1828    opts: HashMap<String, String>,
1829) -> Result<Vec<RecordBatch>, AppError> {
1830    let provider = open_delta_provider(d, opts).await?;
1831    // Drive a full scan via a throwaway SessionContext so we end up with
1832    // an in-memory Vec<RecordBatch> the shared materialise path can use.
1833    //
1834    // A Delta table can open cleanly (valid log + schema) yet still fail to
1835    // scan: its add actions may reference data files that no longer exist in
1836    // storage (e.g. vacuumed away), or are otherwise unreadable. Rather than
1837    // abort the whole registry for one broken table, map scan failures to
1838    // `EmptyDataset` so `Store::load` logs and skips it — mirroring the
1839    // bounded-probe behaviour on the lazy path.
1840    let scan_ctx = SessionContext::new();
1841    let df = scan_ctx.read_table(provider).map_err(|e| {
1842        AppError::EmptyDataset(format!(
1843            "delta location '{}' could not be scanned, skipping ({e})",
1844            d.source.location
1845        ))
1846    })?;
1847    df.collect().await.map_err(|e| {
1848        AppError::EmptyDataset(format!(
1849            "delta location '{}' could not be scanned, skipping ({e})",
1850            d.source.location
1851        ))
1852    })
1853}
1854
1855/// Build a lazy state + deltalake `TableProvider` for a Delta dataset
1856/// (local or S3). The table is never read into RAM: deltalake reads the
1857/// transaction log once here to resolve the file list and discovery schema,
1858/// then streams parquet row groups on each query (with predicate pushdown
1859/// and Delta file skipping). The returned `DatasetState.data` is empty.
1860async fn build_lazy_delta(
1861    d: &DatasetConfig,
1862    _ctx: &SessionContext,
1863) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1864    let table = open_delta_table(d, delta_storage_options(d)?).await?;
1865
1866    // An empty Delta table carries a valid schema in its log but resolves to
1867    // zero data files. The eager path catches this naturally (a full scan
1868    // yields no batches), but the lazy path never scans — so check the file
1869    // list explicitly and skip, matching the eager behaviour. Without this an
1870    // empty Delta table would register as a 0-row dataset and show up in
1871    // discovery / explore.
1872    let file_count = table
1873        .get_file_uris()
1874        .map(|it| it.count())
1875        .map_err(|e| AppError::Internal(format!("dataset '{}': delta file list: {e}", d.name)))?;
1876    if file_count == 0 {
1877        return Err(AppError::EmptyDataset(format!(
1878            "delta location '{}' has a schema but no data files",
1879            d.source.location
1880        )));
1881    }
1882
1883    let provider = table.table_provider().await.map_err(|e| {
1884        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1885    })?;
1886
1887    // The file-list check above only catches a Delta table with *no* data
1888    // files. A table can still be effectively empty in ways the log doesn't
1889    // make obvious: its add actions may reference files that contain zero
1890    // rows, or files that no longer exist in storage (e.g. vacuumed). Those
1891    // register fine here but then return nothing — or hard-error — the moment
1892    // they're queried, leaving a broken dataset visible in discovery/explore.
1893    //
1894    // The eager path catches all of this for free (a full scan yields no rows
1895    // or surfaces the read error up front), but the lazy path never scans. So
1896    // probe with a bounded one-row scan: it reads at most a single row group
1897    // from a single file, costing almost nothing on a healthy table, but lets
1898    // us log-and-skip an empty or unreadable table instead of registering it.
1899    {
1900        let probe_ctx = SessionContext::new();
1901        let probe = probe_ctx
1902            .read_table(provider.clone())
1903            .and_then(|df| df.limit(0, Some(1)));
1904        match probe {
1905            Ok(df) => match df.collect().await {
1906                Ok(batches) if batches.iter().all(|b| b.num_rows() == 0) => {
1907                    return Err(AppError::EmptyDataset(format!(
1908                        "delta location '{}' resolves to no rows",
1909                        d.source.location
1910                    )));
1911                }
1912                Ok(_) => {}
1913                Err(e) => {
1914                    return Err(AppError::EmptyDataset(format!(
1915                        "delta location '{}' could not be scanned, skipping ({e})",
1916                        d.source.location
1917                    )));
1918                }
1919            },
1920            Err(e) => {
1921                return Err(AppError::EmptyDataset(format!(
1922                    "delta location '{}' could not be scanned, skipping ({e})",
1923                    d.source.location
1924                )));
1925            }
1926        }
1927    }
1928
1929    // The provider's schema is the full logical schema (data + partition
1930    // columns), which is exactly the discovery schema we want.
1931    let arrow_sch = provider.schema();
1932    let columns: Vec<ColumnInfo> = arrow_sch
1933        .fields()
1934        .iter()
1935        .map(|f| {
1936            let dt = f.data_type();
1937            ColumnInfo {
1938                name: f.name().clone(),
1939                logical: arrow_to_logical(dt),
1940                sql_type: format!("{dt:?}"),
1941                nullable: f.is_nullable(),
1942            }
1943        })
1944        .collect();
1945    let schema = DatasetSchema::new(&d.name, columns)
1946        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1947
1948    log::info!(
1949        "dataset '{}' [{}, lazy]: {} cols, no materialise, no index",
1950        d.name,
1951        d.source.kind.as_str(),
1952        schema.columns.len()
1953    );
1954
1955    Ok((
1956        DatasetState {
1957            schema,
1958            data: Vec::new(),
1959            arrow_schema: arrow_sch,
1960            index: EqIndex::default(),
1961            lazy: true,
1962        },
1963        provider,
1964    ))
1965}
1966
1967/// Build the storage-options HashMap that `deltalake::open_table_with_storage_options`
1968/// expects for S3 access. Keys mirror the AWS env-var names; deltalake
1969/// passes them through to object_store internally.
1970fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1971    let creds = d.resolved_creds();
1972    let region = d.resolved_region();
1973    let s3 = d.s3.clone().unwrap_or_default();
1974    let (bucket, _) = d.source.s3_bucket()?;
1975
1976    let mut opts = HashMap::new();
1977    opts.insert("AWS_REGION".into(), region);
1978    if let Some(ep) = s3.effective_endpoint(bucket) {
1979        opts.insert("AWS_ENDPOINT_URL".into(), ep);
1980    }
1981    if s3.allow_http {
1982        opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1983    }
1984    opts.insert(
1985        "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1986        (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1987    );
1988    if let Some(k) = creds.access_key_id {
1989        opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1990    }
1991    if let Some(s) = creds.secret_access_key {
1992        opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1993    }
1994    if let Some(t) = creds.session_token {
1995        opts.insert("AWS_SESSION_TOKEN".into(), t);
1996    }
1997    // Read-only paths don't need the S3 lock-provider plumbing.
1998    opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1999    Ok(opts)
2000}
2001
2002/// Construct an `AmazonS3` object_store from the dataset's `[dataset.s3]`
2003/// block + resolved credentials and register it on `ctx` under
2004/// `s3://bucket/`.
2005fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
2006    let (bucket, _key) = d.source.s3_bucket()?;
2007    let creds = d.resolved_creds();
2008    let region = d.resolved_region();
2009    let s3 = d.s3.clone().unwrap_or_default();
2010
2011    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
2012        AppError::Internal(format!(
2013            "dataset '{}': build S3 store for '{bucket}': {e}",
2014            d.name
2015        ))
2016    })?;
2017
2018    let url = Url::parse(&format!("s3://{bucket}"))
2019        .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
2020    ctx.register_object_store(&url, Arc::new(store));
2021    Ok(())
2022}
2023
2024/// Best-effort detection of an S3 authorization failure (HTTP 403) from an
2025/// error message. object_store, the AWS SDK, deltalake, and MinIO all
2026/// surface a denied read as some mix of "Access Denied" / "AccessDenied"
2027/// (the S3 error code), "Forbidden", or a bare "403" in the rendered error
2028/// chain. Matched case-insensitively. Only consulted for sources we already
2029/// know are S3, so a stray "forbidden" in unrelated text can't misfire.
2030fn is_s3_access_denied(msg: &str) -> bool {
2031    let low = msg.to_lowercase();
2032    low.contains("access denied")
2033        || low.contains("accessdenied")
2034        || low.contains("forbidden")
2035        || low.contains("403")
2036}
2037
2038
2039///
2040/// Local sources are sized with a cheap filesystem stat (`estimate_local_bytes`);
2041/// S3 sources are sized by listing the object store under their prefix. S3
2042/// sizing is best-effort: a listing failure logs a warning and returns `None`
2043/// (don't force) so a transient S3 error never blocks startup.
2044async fn should_force_lazy(d: &DatasetConfig, server: &ServerConfig) -> Option<u64> {
2045    if d.lazy || server.force_lazy_above_mb == 0 {
2046        return None;
2047    }
2048    let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
2049
2050    let bytes = if d.source.is_s3() {
2051        match estimate_s3_bytes(d).await {
2052            Ok(b) => b,
2053            Err(e) => {
2054                log::warn!(
2055                    "dataset '{}': could not measure S3 size for force_lazy_above_mb: {e}",
2056                    d.name
2057                );
2058                return None;
2059            }
2060        }
2061    } else {
2062        d.estimate_local_bytes()?
2063    };
2064
2065    (bytes > threshold).then_some(bytes)
2066}
2067
2068/// Sum the byte size of a dataset's S3 backing data by listing the object
2069/// store under the source prefix and adding up every `.parquet` object.
2070///
2071/// Works for both parquet and delta sources: delta data files are parquet
2072/// objects under the table root, so listing the root and counting parquet
2073/// gives a coarse (possibly over-counting un-vacuumed files) but cheap size
2074/// suitable for the force-lazy gate. Any glob wildcard tail is stripped so the
2075/// listing starts at the non-wildcard base prefix.
2076async fn estimate_s3_bytes(d: &DatasetConfig) -> Result<u64, AppError> {
2077    use futures_util::StreamExt;
2078    use object_store::ObjectStore;
2079
2080    let (bucket, _key) = d.source.s3_bucket()?;
2081    let creds = d.resolved_creds();
2082    let region = d.resolved_region();
2083    let s3 = d.s3.clone().unwrap_or_default();
2084    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
2085        AppError::Internal(format!(
2086            "dataset '{}': build S3 store for '{bucket}': {e}",
2087            d.name
2088        ))
2089    })?;
2090
2091    // Strip any glob wildcard tail, then reduce the base to a bucket-relative
2092    // key for object_store's `list` (which takes a key prefix, not a URL).
2093    let (base, _keys) = split_glob_base_keys(&d.source.location);
2094    let prefix_key = base
2095        .strip_prefix("s3://")
2096        .and_then(|rest| rest.split_once('/').map(|(_bucket, key)| key))
2097        .unwrap_or("")
2098        .trim_end_matches('/');
2099    let prefix =
2100        (!prefix_key.is_empty()).then(|| object_store::path::Path::from(prefix_key));
2101
2102    let mut total: u64 = 0;
2103    let mut stream = store.list(prefix.as_ref());
2104    while let Some(meta) = stream.next().await {
2105        let meta = meta.map_err(|e| {
2106            AppError::Internal(format!(
2107                "dataset '{}': s3 list under '{prefix_key}': {e}",
2108                d.name
2109            ))
2110        })?;
2111        if meta.location.as_ref().ends_with(".parquet") {
2112            total = total.saturating_add(meta.size);
2113        }
2114    }
2115    Ok(total)
2116}
2117
2118fn build_s3(
2119    bucket: &str,
2120    region: &str,
2121    s3: &S3Config,
2122    creds: &ResolvedCreds,
2123) -> Result<object_store::aws::AmazonS3, object_store::Error> {
2124    let mut b = AmazonS3Builder::new()
2125        .with_bucket_name(bucket)
2126        .with_region(region)
2127        .with_allow_http(s3.allow_http)
2128        .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
2129    if let Some(ep) = s3.effective_endpoint(bucket) {
2130        b = b.with_endpoint(ep);
2131    }
2132    if let Some(k) = creds.access_key_id.as_deref() {
2133        b = b.with_access_key_id(k);
2134    }
2135    if let Some(s) = creds.secret_access_key.as_deref() {
2136        b = b.with_secret_access_key(s);
2137    }
2138    if let Some(t) = creds.session_token.as_deref() {
2139        b = b.with_token(t);
2140    }
2141    b.build()
2142}
2143
2144fn arrow_to_logical(dt: &DataType) -> LogicalType {
2145    match dt {
2146        DataType::Boolean => LogicalType::Bool,
2147        DataType::Int8
2148        | DataType::Int16
2149        | DataType::Int32
2150        | DataType::Int64
2151        | DataType::UInt8
2152        | DataType::UInt16
2153        | DataType::UInt32
2154        | DataType::UInt64 => LogicalType::Int,
2155        DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
2156        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
2157        // Dictionary-encoded strings are reported as plain strings — clients
2158        // (and the rest of the backend) shouldn't have to care that we keep
2159        // a compressed representation in memory.
2160        DataType::Dictionary(_, v)
2161            if matches!(
2162                v.as_ref(),
2163                DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
2164            ) =>
2165        {
2166            LogicalType::Utf8
2167        }
2168        DataType::Date32
2169        | DataType::Date64
2170        | DataType::Time32(_)
2171        | DataType::Time64(_)
2172        | DataType::Timestamp(_, _)
2173        | DataType::Duration(_)
2174        | DataType::Interval(_) => LogicalType::Temporal,
2175        _ => LogicalType::Other,
2176    }
2177}
2178
2179// ---------------------------------------------------------------------------
2180// Per-batch projection
2181// ---------------------------------------------------------------------------
2182
2183fn project(
2184    schema: &DatasetSchema,
2185    batch: RecordBatch,
2186    columns: &[String],
2187) -> Result<RecordBatch, AppError> {
2188    if columns.is_empty() {
2189        return Ok(batch);
2190    }
2191    let indices: Vec<usize> = columns
2192        .iter()
2193        .map(|c| {
2194            schema
2195                .find(c)
2196                .map(|info| schema.by_name[&info.name.to_lowercase()])
2197        })
2198        .collect::<Result<_, _>>()?;
2199    let fields: Vec<Field> = indices
2200        .iter()
2201        .map(|&i| batch.schema().field(i).clone())
2202        .collect();
2203    let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
2204    Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
2205}
2206
2207// ---------------------------------------------------------------------------
2208// SQL builder
2209// ---------------------------------------------------------------------------
2210
2211/// Accumulates the typed literal values for a parameterised query.
2212///
2213/// Predicate values are never interpolated into the SQL text. Instead each
2214/// value is pushed here and the builder emits a positional placeholder
2215/// (`$1`, `$2`, …) referencing it. The collected [`ScalarValue`]s are bound
2216/// to the logical plan via [`DataFrame::with_param_values`], so user input
2217/// reaches the engine as typed scalars and can never alter the query
2218/// structure (no SQL injection surface, no escaping to get wrong).
2219#[derive(Default)]
2220struct Params {
2221    values: Vec<ScalarValue>,
2222}
2223
2224impl Params {
2225    fn new() -> Self {
2226        Self::default()
2227    }
2228
2229    /// Bind `v` and return its `$N` placeholder token.
2230    fn bind(&mut self, v: ScalarValue) -> String {
2231        self.values.push(v);
2232        format!("${}", self.values.len())
2233    }
2234
2235    fn into_values(self) -> Vec<ScalarValue> {
2236        self.values
2237    }
2238}
2239
2240fn build_query_sql(
2241    schema: &DatasetSchema,
2242    req: &QueryRequest,
2243    max_page_size: u64,
2244) -> Result<(String, Vec<ScalarValue>), AppError> {
2245    let (limit, offset) = req.effective_limit_offset(max_page_size);
2246    build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
2247}
2248
2249fn build_query_stream_sql(
2250    schema: &DatasetSchema,
2251    req: &QueryRequest,
2252) -> Result<(String, Vec<ScalarValue>), AppError> {
2253    let suffix = req
2254        .limit
2255        .map(|limit| format!(" LIMIT {limit}"))
2256        .unwrap_or_default();
2257    build_query_sql_with_suffix(schema, req, &suffix)
2258}
2259
2260fn build_query_sql_with_suffix(
2261    schema: &DatasetSchema,
2262    req: &QueryRequest,
2263    suffix: &str,
2264) -> Result<(String, Vec<ScalarValue>), AppError> {
2265    let agg_plan = req.agg_plan(schema)?;
2266
2267    let cols = if let Some(plan) = &agg_plan {
2268        // Group cols, then aggregations, each aliased to the JSON output key.
2269        let mut parts: Vec<String> = plan
2270            .group_cols
2271            .iter()
2272            .map(|c| DatasetSchema::quote_ident(c))
2273            .collect();
2274        for a in &plan.aggs {
2275            let expr = a.sql_expr()?;
2276            parts.push(format!(
2277                "{expr} AS {}",
2278                DatasetSchema::quote_ident(&a.alias)
2279            ));
2280        }
2281        parts.join(", ")
2282    } else if req.columns.is_empty() {
2283        if req.distinct {
2284            "DISTINCT *".to_string()
2285        } else {
2286            "*".to_string()
2287        }
2288    } else {
2289        let list = req
2290            .columns
2291            .iter()
2292            .map(|c| {
2293                schema
2294                    .find(c)
2295                    .map(|info| DatasetSchema::quote_ident(&info.name))
2296            })
2297            .collect::<Result<Vec<_>, _>>()?
2298            .join(", ");
2299        if req.distinct {
2300            format!("DISTINCT {list}")
2301        } else {
2302            list
2303        }
2304    };
2305
2306    let mut params = Params::new();
2307    let clauses: Vec<String> = req
2308        .predicates
2309        .iter()
2310        .map(|p| pred_to_sql(schema, p, &mut params))
2311        .collect::<Result<_, _>>()?;
2312
2313    let table = DatasetSchema::quote_ident(&schema.name);
2314    let where_clause = if clauses.is_empty() {
2315        String::new()
2316    } else {
2317        format!(" WHERE {}", clauses.join(" AND "))
2318    };
2319    let group_clause = match &agg_plan {
2320        Some(p) => format!(
2321            " GROUP BY {}",
2322            p.group_cols
2323                .iter()
2324                .map(|c| DatasetSchema::quote_ident(c))
2325                .collect::<Vec<_>>()
2326                .join(", "),
2327        ),
2328        None => String::new(),
2329    };
2330    let having_clause = {
2331        let resolved = req.having_plan(agg_plan.as_ref())?;
2332        if resolved.is_empty() {
2333            String::new()
2334        } else {
2335            let clauses: Vec<String> = resolved
2336                .iter()
2337                .map(|(lhs, p)| pred_to_sql_with_lhs(lhs, p, &mut params))
2338                .collect::<Result<_, _>>()?;
2339            format!(" HAVING {}", clauses.join(" AND "))
2340        }
2341    };
2342    let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
2343        Some(s) => format!(" ORDER BY {s}"),
2344        None => String::new(),
2345    };
2346    let sql =
2347        format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{having_clause}{order_clause}{suffix}");
2348    Ok((sql, params.into_values()))
2349}
2350
2351fn build_count_sql(
2352    schema: &DatasetSchema,
2353    predicates: &[Predicate],
2354) -> Result<(String, Vec<ScalarValue>), AppError> {
2355    let mut params = Params::new();
2356    let clauses: Vec<String> = predicates
2357        .iter()
2358        .map(|p| pred_to_sql(schema, p, &mut params))
2359        .collect::<Result<_, _>>()?;
2360    let table = DatasetSchema::quote_ident(&schema.name);
2361    let where_clause = if clauses.is_empty() {
2362        String::new()
2363    } else {
2364        format!(" WHERE {}", clauses.join(" AND "))
2365    };
2366    let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
2367    Ok((sql, params.into_values()))
2368}
2369
2370fn pred_to_sql(
2371    schema: &DatasetSchema,
2372    pred: &Predicate,
2373    params: &mut Params,
2374) -> Result<String, AppError> {
2375    let info = schema.find(&pred.col)?;
2376    let col = DatasetSchema::quote_ident(&info.name);
2377    pred_to_sql_with_lhs(&col, pred, params)
2378}
2379
2380/// Render a predicate against a pre-resolved left-hand-side SQL
2381/// expression. The dataset-`WHERE` path resolves a quoted column name as
2382/// the LHS (see [`pred_to_sql`]); the `HAVING` path passes an aggregate
2383/// expression such as `COUNT(*)` instead. Both share the operator and
2384/// value-binding logic here.
2385fn pred_to_sql_with_lhs(
2386    col: &str,
2387    pred: &Predicate,
2388    params: &mut Params,
2389) -> Result<String, AppError> {
2390    match pred.op.as_str() {
2391        "is_null" => return Ok(format!("{col} IS NULL")),
2392        "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
2393        _ => {}
2394    }
2395
2396    let val = pred
2397        .val
2398        .as_ref()
2399        .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
2400
2401    if pred.op == "in" {
2402        let items = val
2403            .as_array()
2404            .filter(|a| !a.is_empty())
2405            .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
2406        let placeholders: Vec<String> = items
2407            .iter()
2408            .map(|item| Ok(params.bind(json_to_scalar(item)?)))
2409            .collect::<Result<_, AppError>>()?;
2410        return Ok(format!("{col} IN ({})", placeholders.join(", ")));
2411    }
2412
2413    let sql_op = match pred.op.as_str() {
2414        "eq" => "=",
2415        "neq" => "!=",
2416        "gt" => ">",
2417        "gte" => ">=",
2418        "lt" => "<",
2419        "lte" => "<=",
2420        "like" => "LIKE",
2421        "ilike" => "ILIKE",
2422        other => return Err(AppError::UnknownOperator(other.into())),
2423    };
2424    let placeholder = params.bind(json_to_scalar(val)?);
2425    Ok(format!("{col} {sql_op} {placeholder}"))
2426}
2427
2428/// Convert a JSON predicate value into a typed Arrow [`ScalarValue`] for
2429/// binding as a query parameter. The engine applies the usual numeric
2430/// widening / comparison coercion against the target column type.
2431fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
2432    match val {
2433        JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
2434        JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
2435        JsonValue::Null => Ok(ScalarValue::Null),
2436        JsonValue::Number(n) => {
2437            if let Some(i) = n.as_i64() {
2438                Ok(ScalarValue::Int64(Some(i)))
2439            } else if let Some(u) = n.as_u64() {
2440                Ok(ScalarValue::UInt64(Some(u)))
2441            } else if let Some(f) = n.as_f64() {
2442                Ok(ScalarValue::Float64(Some(f)))
2443            } else {
2444                Err(AppError::InvalidValue(
2445                    "unsupported numeric literal in predicate".into(),
2446                ))
2447            }
2448        }
2449        _ => Err(AppError::InvalidValue(
2450            "unsupported literal type in predicate".into(),
2451        )),
2452    }
2453}
2454
2455// ---------------------------------------------------------------------------
2456// Equality index — built once at startup, queried on every predicate request
2457// ---------------------------------------------------------------------------
2458
2459fn json_index_key(val: &JsonValue) -> Option<String> {
2460    match val {
2461        JsonValue::String(s) => Some(s.clone()),
2462        JsonValue::Number(n) => Some(n.to_string()),
2463        JsonValue::Bool(b) => Some(b.to_string()),
2464        _ => None,
2465    }
2466}
2467
2468fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2469    let mut out = Vec::new();
2470    let (mut i, mut j) = (0, 0);
2471    while i < a.len() && j < b.len() {
2472        match a[i].cmp(&b[j]) {
2473            Ordering::Equal => {
2474                out.push(a[i]);
2475                i += 1;
2476                j += 1;
2477            }
2478            Ordering::Less => i += 1,
2479            Ordering::Greater => j += 1,
2480        }
2481    }
2482    out
2483}
2484
2485fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2486    let mut out = Vec::with_capacity(a.len() + b.len());
2487    let (mut i, mut j) = (0, 0);
2488    while i < a.len() && j < b.len() {
2489        match a[i].cmp(&b[j]) {
2490            Ordering::Less => {
2491                out.push(a[i]);
2492                i += 1;
2493            }
2494            Ordering::Greater => {
2495                out.push(b[j]);
2496                j += 1;
2497            }
2498            Ordering::Equal => {
2499                out.push(a[i]);
2500                i += 1;
2501                j += 1;
2502            }
2503        }
2504    }
2505    out.extend_from_slice(&a[i..]);
2506    out.extend_from_slice(&b[j..]);
2507    out
2508}
2509
2510fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2511    if predicates.is_empty() || index.is_empty() {
2512        return None;
2513    }
2514
2515    // Fast path: a single `eq` predicate resolves to exactly one index bucket,
2516    // so borrow it directly instead of cloning the row-id vector.
2517    if let [pred] = predicates
2518        && pred.op.as_str() == "eq"
2519    {
2520        let col_lower = pred.col.to_lowercase();
2521        let col_map = index.get(&col_lower)?;
2522        let key = json_index_key(pred.val.as_ref()?)?;
2523        return Some(match col_map.get(&key) {
2524            Some(rows) => Cow::Borrowed(rows.as_slice()),
2525            None => Cow::Owned(Vec::new()),
2526        });
2527    }
2528
2529    let mut result: Option<Vec<u32>> = None;
2530    for pred in predicates {
2531        let col_lower = pred.col.to_lowercase();
2532        let col_map = index.get(&col_lower)?;
2533
2534        let rows: Vec<u32> = match pred.op.as_str() {
2535            "eq" => {
2536                let key = json_index_key(pred.val.as_ref()?)?;
2537                col_map.get(&key).cloned().unwrap_or_default()
2538            }
2539            "in" => {
2540                let items = pred.val.as_ref()?.as_array()?;
2541                let mut merged: Vec<u32> = Vec::new();
2542                for item in items {
2543                    if let Some(r) = col_map.get(&json_index_key(item)?) {
2544                        merged = union_sorted(&merged, r);
2545                    }
2546                }
2547                merged
2548            }
2549            _ => return None,
2550        };
2551
2552        result = Some(match result {
2553            None => rows,
2554            Some(r) => intersect_sorted(&r, &rows),
2555        });
2556    }
2557    result.map(Cow::Owned)
2558}
2559
2560/// Benchmark-only hooks for the equality-index hot path. Hidden from docs and
2561/// not part of the stable API — exposed solely so `benches/index.rs` can build
2562/// an `EqIndex` and exercise `try_index` directly.
2563#[doc(hidden)]
2564pub mod bench {
2565    use super::{EqIndex, FastMap, json_index_key, try_index};
2566    use datapress_core::models::Predicate;
2567    use serde_json::Value as JsonValue;
2568    use std::borrow::Cow;
2569
2570    /// Opaque equality index wrapper for benches (the inner alias is private).
2571    pub struct BenchIndex(EqIndex);
2572
2573    /// Build an index with a single `col` whose `val` bucket holds `rows`.
2574    /// The bucket key is derived with the same `json_index_key` the query path
2575    /// uses, so a matching `eq` predicate is guaranteed to hit.
2576    pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
2577        let key = json_index_key(val).expect("benchable index key");
2578        let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
2579        col_map.insert(key, rows);
2580        let mut index: EqIndex = EqIndex::default();
2581        index.insert(col.to_string(), col_map);
2582        BenchIndex(index)
2583    }
2584
2585    /// Resolve `predicates` against the index — the timed operation.
2586    pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2587        try_index(&idx.0, predicates)
2588    }
2589
2590    /// Reference implementation of the pre-`Cow` behaviour: always clones the
2591    /// matched bucket into an owned `Vec` (what `try_index` did before the
2592    /// single-`eq` borrow fast path). Used as the `clone-before` baseline so
2593    /// the borrow win is measured against the old allocation cost in-process.
2594    pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
2595        let [pred] = predicates else { return None };
2596        if pred.op.as_str() != "eq" {
2597            return None;
2598        }
2599        let col_lower = pred.col.to_lowercase();
2600        let col_map = idx.0.get(&col_lower)?;
2601        let key = json_index_key(pred.val.as_ref()?)?;
2602        Some(col_map.get(&key).cloned().unwrap_or_default())
2603    }
2604}
2605
2606/// Return rows `[offset, offset+limit)` from a chunked dataset by slicing
2607/// the underlying batches (zero-copy) and concatenating the (small) page.
2608fn slice_global(
2609    chunks: &[RecordBatch],
2610    schema: &Arc<Schema>,
2611    offset: usize,
2612    limit: usize,
2613) -> Result<RecordBatch, AppError> {
2614    if limit == 0 || chunks.is_empty() {
2615        return Ok(RecordBatch::new_empty(schema.clone()));
2616    }
2617    let mut out = Vec::new();
2618    let mut to_skip = offset;
2619    let mut remaining = limit;
2620    for b in chunks {
2621        if remaining == 0 {
2622            break;
2623        }
2624        let n = b.num_rows();
2625        if to_skip >= n {
2626            to_skip -= n;
2627            continue;
2628        }
2629        let take = remaining.min(n - to_skip);
2630        out.push(b.slice(to_skip, take));
2631        to_skip = 0;
2632        remaining -= take;
2633    }
2634    if out.is_empty() {
2635        return Ok(RecordBatch::new_empty(schema.clone()));
2636    }
2637    compute::concat_batches(schema, out.iter()).map_err(AppError::from)
2638}
2639
2640/// Materialise the page `rows[offset..offset+limit]` from a chunked dataset.
2641/// Row ids are global (across the concatenation of all chunks). We map each
2642/// requested row to its (chunk, local-index), `take` per chunk, then stitch
2643/// the per-chunk results back together preserving the original row order.
2644fn take_page(
2645    chunks: &[RecordBatch],
2646    schema: &Arc<Schema>,
2647    rows: &[u32],
2648    offset: usize,
2649    limit: usize,
2650) -> Result<RecordBatch, AppError> {
2651    let start = offset.min(rows.len());
2652    let len = limit.min(rows.len() - start);
2653    if len == 0 || chunks.is_empty() {
2654        return Ok(RecordBatch::new_empty(schema.clone()));
2655    }
2656
2657    // Prefix-sum table: `offsets[i]` is the first global row id of chunk `i`,
2658    // and `offsets.last()` is the total row count.
2659    let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
2660    let mut acc: u32 = 0;
2661    offsets.push(0);
2662    for b in chunks {
2663        acc = acc
2664            .checked_add(b.num_rows() as u32)
2665            .expect("row count exceeds u32::MAX");
2666        offsets.push(acc);
2667    }
2668
2669    // Bucket each global row id into the chunk that contains it, remembering
2670    // the original output position so we can restore page order at the end.
2671    let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
2672    for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
2673        let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
2674        let local = gid - offsets[bi];
2675        buckets[bi].push((out_pos as u32, local));
2676    }
2677
2678    // Per-chunk take, recording the destination index for each emitted row.
2679    let mut takens: Vec<RecordBatch> = Vec::new();
2680    let mut dest: Vec<u32> = Vec::with_capacity(len);
2681    for (bi, bucket) in buckets.iter().enumerate() {
2682        if bucket.is_empty() {
2683            continue;
2684        }
2685        let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
2686        let cols: Vec<ArrayRef> = chunks[bi]
2687            .columns()
2688            .iter()
2689            .map(|c| {
2690                arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
2691                    .map_err(AppError::from)
2692            })
2693            .collect::<Result<_, _>>()?;
2694        takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
2695        dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
2696    }
2697
2698    // Stitch per-chunk results then permute to restore the requested order.
2699    let stitched = compute::concat_batches(schema, takens.iter())?;
2700    let mut inv = vec![0u32; len];
2701    for (i, &d) in dest.iter().enumerate() {
2702        inv[d as usize] = i as u32;
2703    }
2704    let perm = UInt32Array::from(inv);
2705    let cols: Vec<ArrayRef> = stitched
2706        .columns()
2707        .iter()
2708        .map(|c| {
2709            arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
2710                .map_err(AppError::from)
2711        })
2712        .collect::<Result<_, _>>()?;
2713    RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
2714}
2715
2716/// Build the equality index per the dataset's policy, against the chunked
2717/// representation. Row ids are global across the concatenation of all
2718/// chunks (so they remain compatible with `take_page` / `slice_global`).
2719fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
2720    use rayon::prelude::*;
2721
2722    if cfg.mode == IndexMode::None || chunks.is_empty() {
2723        return EqIndex::default();
2724    }
2725
2726    let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
2727        Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
2728    } else {
2729        None
2730    };
2731
2732    let max_card = if cfg.mode == IndexMode::Auto {
2733        Some(cfg.max_cardinality)
2734    } else {
2735        None
2736    };
2737
2738    // Per-chunk starting global row id.
2739    let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
2740    let mut acc: u32 = 0;
2741    for b in chunks {
2742        batch_offsets.push(acc);
2743        acc = acc
2744            .checked_add(b.num_rows() as u32)
2745            .expect("row count exceeds u32::MAX");
2746    }
2747
2748    let schema = chunks[0].schema();
2749
2750    schema
2751        .fields()
2752        .par_iter()
2753        .enumerate()
2754        .filter_map(|(ci, field)| {
2755            let col_lower = field.name().to_lowercase();
2756            if let Some(a) = &allow
2757                && !a.contains_key(&col_lower)
2758            {
2759                return None;
2760            }
2761
2762            // Only build for index-friendly types; skip everything else
2763            // up-front so we don't pay the per-chunk dispatch cost.
2764            let dtype = field.data_type();
2765            let dict_utf8 = matches!(dtype,
2766                DataType::Dictionary(k, v)
2767                    if matches!(k.as_ref(), DataType::Int32)
2768                    && matches!(v.as_ref(), DataType::Utf8));
2769            match dtype {
2770                DataType::Utf8
2771                | DataType::Utf8View
2772                | DataType::Boolean
2773                | DataType::Int8
2774                | DataType::Int16
2775                | DataType::Int32
2776                | DataType::Int64 => {}
2777                _ if dict_utf8 => {}
2778                _ => return None,
2779            }
2780
2781            let mut map: FastMap<String, Vec<u32>> = FastMap::default();
2782
2783            for (bi, batch) in chunks.iter().enumerate() {
2784                let base = batch_offsets[bi];
2785                let col = batch.column(ci);
2786
2787                macro_rules! index_col {
2788                    ($arr_ty:ty) => {{
2789                        let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2790                        for row in 0..arr.len() {
2791                            if arr.is_null(row) {
2792                                continue;
2793                            }
2794                            let key = arr.value(row).to_string();
2795                            let gid = base + row as u32;
2796                            if let Some(v) = map.get_mut(&key) {
2797                                v.push(gid);
2798                            } else {
2799                                if let Some(mc) = max_card {
2800                                    if map.len() >= mc {
2801                                        return None;
2802                                    }
2803                                }
2804                                map.insert(key, vec![gid]);
2805                            }
2806                        }
2807                    }};
2808                }
2809
2810                if dict_utf8 {
2811                    // Dictionary(Int32, Utf8): iterate keys + look up the
2812                    // string value from the (small) dictionary. We allocate
2813                    // the key string only when the value is new — repeated
2814                    // values reuse the existing HashMap entry by hash, but
2815                    // `HashMap::get_mut` still needs the key, so we use a
2816                    // borrowed lookup via `get` first to avoid the alloc.
2817                    let arr = col
2818                        .as_any()
2819                        .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2820                        )?;
2821                    let keys = arr.keys();
2822                    let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2823                    for row in 0..arr.len() {
2824                        if arr.is_null(row) {
2825                            continue;
2826                        }
2827                        let k = keys.value(row) as usize;
2828                        let s = values.value(k);
2829                        let gid = base + row as u32;
2830                        if let Some(v) = map.get_mut(s) {
2831                            v.push(gid);
2832                        } else {
2833                            if let Some(mc) = max_card
2834                                && map.len() >= mc
2835                            {
2836                                return None;
2837                            }
2838                            map.insert(s.to_string(), vec![gid]);
2839                        }
2840                    }
2841                } else {
2842                    match dtype {
2843                        DataType::Utf8 => index_col!(StringArray),
2844                        DataType::Utf8View => index_col!(StringViewArray),
2845                        DataType::Boolean => index_col!(BooleanArray),
2846                        DataType::Int8 => index_col!(Int8Array),
2847                        DataType::Int16 => index_col!(Int16Array),
2848                        DataType::Int32 => index_col!(Int32Array),
2849                        DataType::Int64 => index_col!(Int64Array),
2850                        _ => unreachable!(),
2851                    }
2852                }
2853            }
2854
2855            Some((col_lower, map))
2856        })
2857        .collect()
2858}
2859
2860// ---------------------------------------------------------------------------
2861// Serialise-time temporal cast: convert Timestamp/Date/Time columns to Utf8
2862// on the page batch right before JSON encoding. We deliberately do **not**
2863// pay this cost at load time — a `Date32` is 4 bytes per row, its ISO-8601
2864// rendering is ~10–24 bytes per row, and a wide dataset full of temporal
2865// columns would balloon resident RAM. The cast is applied per returned page
2866// after pagination, so the cost is paid only for rows the caller requested.
2867// ---------------------------------------------------------------------------
2868
2869/// Returns true for Arrow types that `write_value` can render directly. Any
2870/// type returning false is pre-cast to Utf8 in [`cast_for_serialize`] so the
2871/// JSON output is faithful rather than silently `null`.
2872fn writable_inline(dt: &DataType) -> bool {
2873    match dt {
2874        DataType::Utf8
2875        | DataType::LargeUtf8
2876        | DataType::Utf8View
2877        | DataType::Boolean
2878        | DataType::Int8
2879        | DataType::Int16
2880        | DataType::Int32
2881        | DataType::Int64
2882        | DataType::UInt8
2883        | DataType::UInt16
2884        | DataType::UInt32
2885        | DataType::UInt64
2886        | DataType::Float32
2887        | DataType::Float64
2888        | DataType::Decimal128(_, _)
2889        | DataType::Decimal256(_, _) => true,
2890        DataType::Dictionary(k, v)
2891            if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2892        {
2893            true
2894        }
2895        _ => false,
2896    }
2897}
2898
2899/// Cast any column whose dtype isn't directly writable by `write_value` to
2900/// `Utf8`, on the bounded page batch. Covers temporals (Timestamp/Date/Time)
2901/// — kept native in resident memory to save RAM — and also any exotic dtype
2902/// (Float16, Binary, List, Struct, Decimal-with-unsupported-precision, …)
2903/// so the JSON serializer never falls back to writing literal `null`.
2904fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2905    let schema = batch.schema();
2906    let to_cast: Vec<usize> = schema
2907        .fields()
2908        .iter()
2909        .enumerate()
2910        .filter_map(|(i, f)| {
2911            if writable_inline(f.data_type()) {
2912                None
2913            } else {
2914                Some(i)
2915            }
2916        })
2917        .collect();
2918    if to_cast.is_empty() {
2919        return Ok(batch.clone());
2920    }
2921    let new_fields: Vec<Field> = schema
2922        .fields()
2923        .iter()
2924        .enumerate()
2925        .map(|(i, f)| {
2926            if to_cast.contains(&i) {
2927                Field::new(f.name(), DataType::Utf8, f.is_nullable())
2928            } else {
2929                f.as_ref().clone()
2930            }
2931        })
2932        .collect();
2933    let new_schema = Arc::new(Schema::new(new_fields));
2934    let cols: Vec<ArrayRef> = batch
2935        .columns()
2936        .iter()
2937        .enumerate()
2938        .map(|(i, c)| {
2939            if to_cast.contains(&i) {
2940                compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2941            } else {
2942                Ok(c.clone())
2943            }
2944        })
2945        .collect::<Result<_, _>>()?;
2946    RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2947}
2948
2949// ---------------------------------------------------------------------------
2950// Compute helpers — retained for symmetry; reserved for future inline scan
2951// path. Currently the engine fallback handles all non-index queries.
2952// ---------------------------------------------------------------------------
2953
2954#[allow(dead_code)]
2955#[derive(Clone, Copy)]
2956enum CmpOp {
2957    Eq,
2958    Neq,
2959    Gt,
2960    Gte,
2961    Lt,
2962    Lte,
2963    Like,
2964    ILike,
2965}
2966
2967#[allow(dead_code)]
2968fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2969    let arr = col
2970        .as_any()
2971        .downcast_ref::<StringArray>()
2972        .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2973    let s = Scalar::new(StringArray::from(vec![val]));
2974    Ok(eq(arr, &s)?)
2975}
2976
2977#[allow(dead_code)]
2978fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2979    macro_rules! num_cmp {
2980        ($arr_type:ty, $cast:ty) => {{
2981            let n = val
2982                .as_f64()
2983                .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2984                as $cast;
2985            let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2986            let s = Scalar::new(<$arr_type>::from(vec![n]));
2987            Ok(match op {
2988                CmpOp::Eq => eq(arr, &s)?,
2989                CmpOp::Neq => neq(arr, &s)?,
2990                CmpOp::Gt => gt(arr, &s)?,
2991                CmpOp::Gte => gt_eq(arr, &s)?,
2992                CmpOp::Lt => lt(arr, &s)?,
2993                CmpOp::Lte => lt_eq(arr, &s)?,
2994                CmpOp::Like | CmpOp::ILike => {
2995                    return Err(AppError::InvalidValue(
2996                        "LIKE requires a string column".into(),
2997                    ));
2998                }
2999            })
3000        }};
3001    }
3002    match col.data_type() {
3003        DataType::Utf8 => {
3004            let s = val
3005                .as_str()
3006                .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
3007            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
3008            let sc = Scalar::new(StringArray::from(vec![s]));
3009            Ok(match op {
3010                CmpOp::Eq => eq(arr, &sc)?,
3011                CmpOp::Neq => neq(arr, &sc)?,
3012                CmpOp::Gt => gt(arr, &sc)?,
3013                CmpOp::Gte => gt_eq(arr, &sc)?,
3014                CmpOp::Lt => lt(arr, &sc)?,
3015                CmpOp::Lte => lt_eq(arr, &sc)?,
3016                CmpOp::Like => compute::like(arr, &sc)?,
3017                CmpOp::ILike => compute::ilike(arr, &sc)?,
3018            })
3019        }
3020        DataType::Int8 => num_cmp!(Int8Array, i8),
3021        DataType::Int16 => num_cmp!(Int16Array, i16),
3022        DataType::Int32 => num_cmp!(Int32Array, i32),
3023        DataType::Int64 => num_cmp!(Int64Array, i64),
3024        DataType::Float32 => num_cmp!(Float32Array, f32),
3025        DataType::Float64 => num_cmp!(Float64Array, f64),
3026        dt => Err(AppError::InvalidValue(format!(
3027            "unsupported type for comparison: {dt:?}"
3028        ))),
3029    }
3030}
3031
3032// ---------------------------------------------------------------------------
3033// Serialisation
3034// ---------------------------------------------------------------------------
3035
3036pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
3037    // Temporal columns are kept native in resident memory (compact). Cast
3038    // them — plus any other dtype `write_value` can't render directly — to
3039    // Utf8 here, on the bounded page batch, so the JSON output is faithful
3040    // without paying the load-time RAM cost.
3041    let batch = cast_for_serialize(batch)?;
3042    let schema = batch.schema();
3043    let n_rows = batch.num_rows();
3044
3045    let keys: Vec<Vec<u8>> = schema
3046        .fields()
3047        .iter()
3048        .map(|f| {
3049            let mut k = Vec::with_capacity(f.name().len() + 3);
3050            k.push(b'"');
3051            k.extend_from_slice(f.name().as_bytes());
3052            k.extend_from_slice(b"\":");
3053            k
3054        })
3055        .collect();
3056
3057    // Resolve every column's concrete Arrow array type exactly ONCE here,
3058    // instead of paying a `data_type()` match + `downcast_ref` for every
3059    // single cell. The inner row loop then dispatches over a small enum,
3060    // which the compiler turns into a cheap jump table.
3061    let encoders: Vec<ColEnc> = batch
3062        .columns()
3063        .iter()
3064        .map(|c| ColEnc::new(c.as_ref()))
3065        .collect();
3066
3067    let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
3068    let mut itoa_buf = itoa::Buffer::new();
3069    let mut ryu_buf = ryu::Buffer::new();
3070    buf.push(b'[');
3071
3072    for row in 0..n_rows {
3073        if row > 0 {
3074            buf.push(b',');
3075        }
3076        buf.push(b'{');
3077        for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
3078            if i > 0 {
3079                buf.push(b',');
3080            }
3081            buf.extend_from_slice(key);
3082            enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
3083        }
3084        buf.push(b'}');
3085    }
3086
3087    buf.push(b']');
3088    Ok(unsafe { String::from_utf8_unchecked(buf) })
3089}
3090
3091/// Per-column JSON encoder. The concrete Arrow array type is resolved once
3092/// (in [`ColEnc::new`]) so the hot row loop dispatches over this small enum
3093/// instead of repeating `data_type()` matching + `downcast_ref` for every
3094/// cell. Each variant borrows the already-downcast typed array.
3095enum ColEnc<'a> {
3096    Utf8(&'a StringArray),
3097    LargeUtf8(&'a LargeStringArray),
3098    Utf8View(&'a StringViewArray),
3099    /// Dictionary-encoded UTF-8 (Int32 keys). Holds the keys array and the
3100    /// pre-downcast string values so neither is re-resolved per row.
3101    DictI32Utf8(
3102        &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
3103        &'a StringArray,
3104    ),
3105    Bool(&'a BooleanArray),
3106    I8(&'a Int8Array),
3107    I16(&'a Int16Array),
3108    I32(&'a Int32Array),
3109    I64(&'a Int64Array),
3110    U8(&'a UInt8Array),
3111    U16(&'a UInt16Array),
3112    U32(&'a UInt32Array),
3113    U64(&'a UInt64Array),
3114    Dec128(&'a Decimal128Array),
3115    Dec256(&'a Decimal256Array),
3116    F32(&'a Float32Array),
3117    F64(&'a Float64Array),
3118    /// Anything else: fall back to the generic `write_value` dispatch.
3119    Other(&'a dyn Array),
3120}
3121
3122impl<'a> ColEnc<'a> {
3123    fn new(col: &'a dyn Array) -> ColEnc<'a> {
3124        macro_rules! dc {
3125            ($t:ty) => {
3126                col.as_any().downcast_ref::<$t>().unwrap()
3127            };
3128        }
3129        match col.data_type() {
3130            DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
3131            DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
3132            DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
3133            DataType::Dictionary(key, value)
3134                if matches!(key.as_ref(), DataType::Int32)
3135                    && matches!(value.as_ref(), DataType::Utf8) =>
3136            {
3137                let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
3138                let values = dict
3139                    .values()
3140                    .as_any()
3141                    .downcast_ref::<StringArray>()
3142                    .unwrap();
3143                ColEnc::DictI32Utf8(dict, values)
3144            }
3145            DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
3146            DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
3147            DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
3148            DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
3149            DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
3150            DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
3151            DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
3152            DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
3153            DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
3154            DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
3155            DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
3156            DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
3157            DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
3158            _ => ColEnc::Other(col),
3159        }
3160    }
3161
3162    #[inline]
3163    fn write(
3164        &self,
3165        buf: &mut Vec<u8>,
3166        row: usize,
3167        itoa_buf: &mut itoa::Buffer,
3168        ryu_buf: &mut ryu::Buffer,
3169    ) {
3170        macro_rules! int {
3171            ($arr:expr) => {{
3172                if $arr.is_null(row) {
3173                    buf.extend_from_slice(b"null");
3174                } else {
3175                    buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
3176                }
3177            }};
3178        }
3179        match self {
3180            ColEnc::Utf8(a) => {
3181                if a.is_null(row) {
3182                    buf.extend_from_slice(b"null");
3183                } else {
3184                    write_str(buf, a.value(row));
3185                }
3186            }
3187            ColEnc::LargeUtf8(a) => {
3188                if a.is_null(row) {
3189                    buf.extend_from_slice(b"null");
3190                } else {
3191                    write_str(buf, a.value(row));
3192                }
3193            }
3194            ColEnc::Utf8View(a) => {
3195                if a.is_null(row) {
3196                    buf.extend_from_slice(b"null");
3197                } else {
3198                    write_str(buf, a.value(row));
3199                }
3200            }
3201            ColEnc::DictI32Utf8(keys, values) => {
3202                if keys.is_null(row) {
3203                    buf.extend_from_slice(b"null");
3204                } else {
3205                    let k = keys.keys().value(row) as usize;
3206                    write_str(buf, values.value(k));
3207                }
3208            }
3209            ColEnc::Bool(a) => {
3210                if a.is_null(row) {
3211                    buf.extend_from_slice(b"null");
3212                } else {
3213                    buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
3214                }
3215            }
3216            ColEnc::I8(a) => int!(a),
3217            ColEnc::I16(a) => int!(a),
3218            ColEnc::I32(a) => int!(a),
3219            ColEnc::I64(a) => int!(a),
3220            ColEnc::U8(a) => int!(a),
3221            ColEnc::U16(a) => int!(a),
3222            ColEnc::U32(a) => int!(a),
3223            ColEnc::U64(a) => int!(a),
3224            ColEnc::Dec128(a) => {
3225                if a.is_null(row) {
3226                    buf.extend_from_slice(b"null");
3227                } else {
3228                    write_str(buf, &a.value_as_string(row));
3229                }
3230            }
3231            ColEnc::Dec256(a) => {
3232                if a.is_null(row) {
3233                    buf.extend_from_slice(b"null");
3234                } else {
3235                    write_str(buf, &a.value_as_string(row));
3236                }
3237            }
3238            ColEnc::F32(a) => {
3239                if a.is_null(row) {
3240                    buf.extend_from_slice(b"null");
3241                } else {
3242                    let v = a.value(row);
3243                    if v.is_finite() {
3244                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
3245                    } else {
3246                        buf.extend_from_slice(b"null");
3247                    }
3248                }
3249            }
3250            ColEnc::F64(a) => {
3251                if a.is_null(row) {
3252                    buf.extend_from_slice(b"null");
3253                } else {
3254                    let v = a.value(row);
3255                    if v.is_finite() {
3256                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
3257                    } else {
3258                        buf.extend_from_slice(b"null");
3259                    }
3260                }
3261            }
3262            ColEnc::Other(col) => {
3263                if col.is_null(row) {
3264                    buf.extend_from_slice(b"null");
3265                } else {
3266                    write_value(buf, *col, row);
3267                }
3268            }
3269        }
3270    }
3271}
3272
3273#[inline]
3274fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
3275    match col.data_type() {
3276        DataType::Utf8 => write_str(
3277            buf,
3278            col.as_any()
3279                .downcast_ref::<StringArray>()
3280                .unwrap()
3281                .value(row),
3282        ),
3283        DataType::LargeUtf8 => write_str(
3284            buf,
3285            col.as_any()
3286                .downcast_ref::<LargeStringArray>()
3287                .unwrap()
3288                .value(row),
3289        ),
3290        DataType::Utf8View => write_str(
3291            buf,
3292            col.as_any()
3293                .downcast_ref::<StringViewArray>()
3294                .unwrap()
3295                .value(row),
3296        ),
3297        DataType::Dictionary(key, value)
3298            if matches!(key.as_ref(), DataType::Int32)
3299                && matches!(value.as_ref(), DataType::Utf8) =>
3300        {
3301            let dict = col
3302                .as_any()
3303                .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
3304                .unwrap();
3305            let keys = dict.keys();
3306            let values = dict
3307                .values()
3308                .as_any()
3309                .downcast_ref::<StringArray>()
3310                .unwrap();
3311            let k = keys.value(row) as usize;
3312            write_str(buf, values.value(k));
3313        }
3314        DataType::Boolean => {
3315            let v = col
3316                .as_any()
3317                .downcast_ref::<BooleanArray>()
3318                .unwrap()
3319                .value(row);
3320            buf.extend_from_slice(if v { b"true" } else { b"false" });
3321        }
3322        DataType::Int8 => {
3323            let mut b = itoa::Buffer::new();
3324            buf.extend_from_slice(
3325                b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
3326                    .as_bytes(),
3327            );
3328        }
3329        DataType::Int16 => {
3330            let mut b = itoa::Buffer::new();
3331            buf.extend_from_slice(
3332                b.format(
3333                    col.as_any()
3334                        .downcast_ref::<Int16Array>()
3335                        .unwrap()
3336                        .value(row),
3337                )
3338                .as_bytes(),
3339            );
3340        }
3341        DataType::Int32 => {
3342            let mut b = itoa::Buffer::new();
3343            buf.extend_from_slice(
3344                b.format(
3345                    col.as_any()
3346                        .downcast_ref::<Int32Array>()
3347                        .unwrap()
3348                        .value(row),
3349                )
3350                .as_bytes(),
3351            );
3352        }
3353        DataType::Int64 => {
3354            let mut b = itoa::Buffer::new();
3355            buf.extend_from_slice(
3356                b.format(
3357                    col.as_any()
3358                        .downcast_ref::<Int64Array>()
3359                        .unwrap()
3360                        .value(row),
3361                )
3362                .as_bytes(),
3363            );
3364        }
3365        DataType::UInt8 => {
3366            let mut b = itoa::Buffer::new();
3367            buf.extend_from_slice(
3368                b.format(
3369                    col.as_any()
3370                        .downcast_ref::<UInt8Array>()
3371                        .unwrap()
3372                        .value(row),
3373                )
3374                .as_bytes(),
3375            );
3376        }
3377        DataType::UInt16 => {
3378            let mut b = itoa::Buffer::new();
3379            buf.extend_from_slice(
3380                b.format(
3381                    col.as_any()
3382                        .downcast_ref::<UInt16Array>()
3383                        .unwrap()
3384                        .value(row),
3385                )
3386                .as_bytes(),
3387            );
3388        }
3389        DataType::UInt32 => {
3390            let mut b = itoa::Buffer::new();
3391            buf.extend_from_slice(
3392                b.format(
3393                    col.as_any()
3394                        .downcast_ref::<UInt32Array>()
3395                        .unwrap()
3396                        .value(row),
3397                )
3398                .as_bytes(),
3399            );
3400        }
3401        DataType::UInt64 => {
3402            let mut b = itoa::Buffer::new();
3403            buf.extend_from_slice(
3404                b.format(
3405                    col.as_any()
3406                        .downcast_ref::<UInt64Array>()
3407                        .unwrap()
3408                        .value(row),
3409                )
3410                .as_bytes(),
3411            );
3412        }
3413        DataType::Decimal128(_, _) => {
3414            let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
3415            write_str(buf, &arr.value_as_string(row));
3416        }
3417        DataType::Decimal256(_, _) => {
3418            let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
3419            write_str(buf, &arr.value_as_string(row));
3420        }
3421        DataType::Float32 => {
3422            let v = col
3423                .as_any()
3424                .downcast_ref::<Float32Array>()
3425                .unwrap()
3426                .value(row);
3427            if v.is_finite() {
3428                let mut b = ryu::Buffer::new();
3429                buf.extend_from_slice(b.format_finite(v).as_bytes());
3430            } else {
3431                buf.extend_from_slice(b"null");
3432            }
3433        }
3434        DataType::Float64 => {
3435            let v = col
3436                .as_any()
3437                .downcast_ref::<Float64Array>()
3438                .unwrap()
3439                .value(row);
3440            if v.is_finite() {
3441                let mut b = ryu::Buffer::new();
3442                buf.extend_from_slice(b.format_finite(v).as_bytes());
3443            } else {
3444                buf.extend_from_slice(b"null");
3445            }
3446        }
3447        // Any dtype not handled above must have been pre-cast to Utf8 by
3448        // `cast_for_serialize`. Hitting this arm is a bug — surface it as a
3449        // visible JSON string rather than a silent null so it can't be
3450        // mistaken for a real NULL value.
3451        other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
3452    }
3453}
3454
3455#[inline]
3456fn write_str(buf: &mut Vec<u8>, s: &str) {
3457    buf.push(b'"');
3458    for &byte in s.as_bytes() {
3459        match byte {
3460            b'"' => buf.extend_from_slice(b"\\\""),
3461            b'\\' => buf.extend_from_slice(b"\\\\"),
3462            b'\n' => buf.extend_from_slice(b"\\n"),
3463            b'\r' => buf.extend_from_slice(b"\\r"),
3464            b'\t' => buf.extend_from_slice(b"\\t"),
3465            0x00..=0x1f => {
3466                buf.extend_from_slice(b"\\u00");
3467                const HEX: &[u8] = b"0123456789abcdef";
3468                buf.push(HEX[(byte >> 4) as usize]);
3469                buf.push(HEX[(byte & 0xf) as usize]);
3470            }
3471            b => buf.push(b),
3472        }
3473    }
3474    buf.push(b'"');
3475}
3476
3477// ---------------------------------------------------------------------------
3478// Backend trait impl — wires the store into the generic core handlers.
3479// ---------------------------------------------------------------------------
3480
3481#[async_trait]
3482impl Backend for Store {
3483    fn names(&self) -> Vec<String> {
3484        Store::names(self)
3485    }
3486
3487    fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
3488        let st = self.dataset(name)?;
3489        Ok(DatasetSummary {
3490            name: st.schema.name.clone(),
3491            columns: st.schema.columns.len(),
3492            rows: st.num_rows(),
3493        })
3494    }
3495
3496    fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
3497        let st = self.dataset(name)?;
3498        Ok(Arc::new(st.schema.clone()))
3499    }
3500
3501    fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
3502        let st = self.dataset(name)?;
3503        // Report indexed columns in the dataset's declared schema order
3504        // so the `/schema` response is deterministic.
3505        let mut cols: Vec<String> = st
3506            .schema
3507            .columns
3508            .iter()
3509            .map(|c| c.name.clone())
3510            .filter(|n| st.index.contains_key(n))
3511            .collect();
3512        // Any indexed columns not in `schema.columns` (shouldn't happen,
3513        // but be defensive) get appended sorted.
3514        let mut extras: Vec<String> = st
3515            .index
3516            .keys()
3517            .filter(|n| !cols.iter().any(|c| c == *n))
3518            .cloned()
3519            .collect();
3520        extras.sort();
3521        cols.extend(extras);
3522        Ok(cols)
3523    }
3524
3525    async fn sample(&self, name: &str) -> Result<String, AppError> {
3526        Store::sample(self, name).await
3527    }
3528
3529    async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
3530        Store::query(self, name, req).await
3531    }
3532
3533    async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
3534        Store::query_arrow(self, name, req).await
3535    }
3536
3537    async fn query_arrow_stream(
3538        &self,
3539        name: &str,
3540        req: &QueryRequest,
3541    ) -> Result<ArrowIpcStream, AppError> {
3542        Store::query_arrow_stream(self, name, req).await
3543    }
3544
3545    async fn query_arrow_stream_all(
3546        &self,
3547        name: &str,
3548        req: &QueryRequest,
3549    ) -> Result<ArrowIpcStream, AppError> {
3550        Store::query_arrow_stream_all(self, name, req).await
3551    }
3552
3553    async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
3554        Store::count(self, name, req).await
3555    }
3556
3557    async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
3558        Store::query_sql(self, sql, max_rows).await
3559    }
3560
3561    async fn query_sql_arrow_stream(
3562        &self,
3563        sql: &str,
3564        max_rows: u64,
3565    ) -> Result<ArrowIpcStream, AppError> {
3566        Store::query_sql_arrow_stream(self, sql, max_rows).await
3567    }
3568
3569    async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
3570        Store::parquet(self, name).await
3571    }
3572
3573    async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
3574        Store::reload(self, name).await
3575    }
3576
3577    async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
3578        Store::register(self, cfg).await
3579    }
3580}
3581
3582#[cfg(test)]
3583mod tests {
3584    use super::is_s3_access_denied;
3585
3586    #[test]
3587    fn detects_s3_access_denied_variants() {
3588        // Representative messages from object_store / AWS / deltalake / MinIO.
3589        for msg in [
3590            "Generic S3 error: Error performing get request: response error \"<Error><Code>AccessDenied</Code></Error>\", status: 403",
3591            "Client error with status 403 Forbidden",
3592            "S3 error: Access Denied",
3593            "request failed: 403 Forbidden",
3594        ] {
3595            assert!(is_s3_access_denied(msg), "should flag: {msg}");
3596        }
3597    }
3598
3599    #[test]
3600    fn ignores_unrelated_errors() {
3601        for msg in [
3602            "Not a Delta table: Generic delta kernel error: No files in log segment",
3603            "object at location data/part.parquet not found",
3604            "failed to infer parquet schema: invalid magic bytes",
3605        ] {
3606            assert!(!is_s3_access_denied(msg), "should not flag: {msg}");
3607        }
3608    }
3609}
3610