Skip to main content

datapress_datafusion/
store.rs

1use std::any::Any;
2use std::borrow::Cow;
3use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex, RwLock};
6use std::time::Duration;
7
8use arc_swap::ArcSwap;
9use arrow::array::{
10    Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
11    Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
12    StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
13};
14use arrow::compute;
15use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
16use arrow::datatypes::{DataType, Field, Schema};
17use async_trait::async_trait;
18use parquet::arrow::ProjectionMask;
19use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
20use serde_json::Value as JsonValue;
21
22use datafusion::datasource::file_format::parquet::ParquetFormat;
23use datafusion::datasource::listing::{
24    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
25};
26use datafusion::datasource::{MemTable, TableProvider};
27use datafusion::error::Result as DfResult;
28use datafusion::execution::cache::DefaultListFilesCache;
29use datafusion::execution::cache::cache_manager::CacheManagerConfig;
30use datafusion::execution::runtime_env::RuntimeEnvBuilder;
31use datafusion::logical_expr::{
32    ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
33};
34use datafusion::prelude::{SessionConfig, SessionContext};
35use datafusion::scalar::ScalarValue;
36
37use object_store::aws::AmazonS3Builder;
38use url::Url;
39
40use datapress_core::backend::{
41    ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
42};
43use datapress_core::config::{
44    AddressingStyle, AppConfig, DataFusionConfig, DatasetConfig, IndexConfig, IndexMode,
45    Partitioning, ResolvedCreds, S3Config, ServerConfig, SourceKind,
46};
47use datapress_core::errors::AppError;
48use datapress_core::models::{CountRequest, Predicate, QueryRequest};
49use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
50
51// ---------------------------------------------------------------------------
52// Public types
53// ---------------------------------------------------------------------------
54
55/// Hasher-swapped map alias. The default `std::collections::HashMap` uses
56/// SipHash (DoS-resistant but slow). The equality index keys are our own
57/// column values, not untrusted hash-flood input, so we use `ahash` — a much
58/// faster non-cryptographic hasher — on this per-request hot path.
59type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
60
61/// Pre-built equality index: lowercase col name → string-encoded value → sorted row ids.
62type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
63
64/// Per-dataset state: schema metadata, the resident chunks, and the
65/// equality index built per the dataset's `[dataset.index]` policy.
66///
67/// `data` is the dataset as a `Vec<RecordBatch>` — exactly the chunks
68/// produced by the underlying reader, after temporal columns are cast to
69/// `Utf8`. We deliberately do **not** call `concat_batches` to fuse them
70/// into one batch: on wide schemas (hundreds of columns) that transiently
71/// allocates a second full copy of the decoded Arrow data, pushing peak
72/// RSS to ~2× the resident size and OOM-killing the process at startup.
73///
74/// When `lazy` is true the dataset is *not* materialised: `data` is empty,
75/// `index` is empty, and every query is dispatched to DataFusion SQL
76/// against a registered `ListingTable`. `arrow_schema` still carries the
77/// inferred schema so discovery endpoints work.
78pub struct DatasetState {
79    pub schema: DatasetSchema,
80    pub data: Vec<RecordBatch>,
81    pub arrow_schema: Arc<Schema>,
82    pub index: EqIndex,
83    pub lazy: bool,
84}
85
86impl DatasetState {
87    /// Sum of `num_rows()` across all resident chunks. `0` for lazy datasets.
88    pub fn num_rows(&self) -> usize {
89        self.data.iter().map(|b| b.num_rows()).sum()
90    }
91}
92
93/// Multi-dataset registry. Each dataset is registered in the shared
94/// `SessionContext` under its configured name. The per-dataset state is
95/// held behind `ArcSwap` so a reload can atomically replace it without
96/// blocking concurrent queries.
97pub struct Store {
98    ctx: SessionContext,
99    max_page_size: u64,
100    /// Original dataset configs, indexed by name. Reload reads the source
101    /// path from here — clients can't redirect a reload at an arbitrary file.
102    /// Behind an `RwLock` so datasets registered at runtime can be added
103    /// without a restart.
104    configs: RwLock<HashMap<String, DatasetConfig>>,
105    /// Hot-swappable snapshot of all currently loaded datasets.
106    datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
107    /// Per-name reload mutex. Serialises concurrent reloads of the same
108    /// dataset; reloads of different datasets proceed in parallel.
109    reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
110}
111
112impl Store {
113    /// Shared `SessionContext` all datasets are registered in. Handed to the
114    /// optional pgwire server so it queries the exact same tables as the HTTP
115    /// API (never a fresh context). Clone it before moving it into a task.
116    pub fn session_context(&self) -> &SessionContext {
117        &self.ctx
118    }
119
120    /// Load every dataset declared in `cfg`.
121    pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
122        // One-shot init for the deltalake S3 backend. Safe to call more
123        // than once — the handlers are idempotent.
124        if cfg
125            .datasets
126            .iter()
127            .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
128        {
129            deltalake::aws::register_handlers(None);
130        }
131
132        // NOTE: identifier handling for the raw-SQL endpoint is done by
133        // rewriting schema column/table references to quoted canonical
134        // names before execution (see `query_sql`). DataFusion's default
135        // identifier normalization is left ON so unquoted *aliases* and
136        // CTE names stay case-insensitive, matching DuckDB.
137        let ctx = build_tuned_context(&cfg.datafusion);
138        let mut datasets = HashMap::with_capacity(cfg.datasets.len());
139        let mut configs = HashMap::with_capacity(cfg.datasets.len());
140
141        for d in &cfg.datasets {
142            log::info!(
143                "Loading dataset '{}' ({} @ {})",
144                d.name,
145                d.source.kind.as_str(),
146                d.source.location
147            );
148            // Force lazy when the source exceeds the server size threshold.
149            // S3-aware: local sources are stat'd, S3 sources are sized by
150            // listing the object store under their prefix.
151            let d: std::borrow::Cow<'_, DatasetConfig> = match should_force_lazy(d, &cfg.server)
152                .await
153            {
154                Some(bytes) => {
155                    log::info!(
156                        "dataset '{}': {:.1} MiB exceeds force_lazy_above_mb = {} → forcing lazy",
157                        d.name,
158                        bytes as f64 / (1024.0 * 1024.0),
159                        cfg.server.force_lazy_above_mb
160                    );
161                    let mut forced = d.clone();
162                    forced.lazy = true;
163                    std::borrow::Cow::Owned(forced)
164                }
165                None => std::borrow::Cow::Borrowed(d),
166            };
167            let d = d.as_ref();
168            let (state, provider) = match build_dataset(d, &ctx).await {
169                Ok(built) => built,
170                Err(AppError::EmptyDataset(msg)) => {
171                    log::warn!("skipping empty dataset '{}': {msg}", d.name);
172                    continue;
173                }
174                // An S3 source we're not authorized to read (bad creds, no
175                // bucket/prefix policy, expired token) returns a 403. Don't
176                // abort the whole registry for one inaccessible dataset —
177                // log and skip it, exactly like an empty source. The rest of
178                // the datasets still load and serve traffic.
179                Err(e) if d.source.is_s3() && is_s3_access_denied(&e.to_string()) => {
180                    log::warn!(
181                        "skipping dataset '{}': S3 access denied — check credentials \
182                         and bucket policy ({e})",
183                        d.name
184                    );
185                    continue;
186                }
187                Err(e) => return Err(e),
188            };
189            ctx.register_table(d.name.as_str(), provider)?;
190            datasets.insert(d.name.clone(), Arc::new(state));
191            configs.insert(d.name.clone(), d.clone());
192        }
193        Ok(Self {
194            ctx,
195            max_page_size: cfg.server.max_page_size.max(1),
196            configs: RwLock::new(configs),
197            datasets: ArcSwap::from_pointee(datasets),
198            reload_locks: Mutex::new(HashMap::new()),
199        })
200    }
201
202    /// Sorted list of dataset names.
203    pub fn names(&self) -> Vec<String> {
204        let snap = self.datasets.load();
205        let mut v: Vec<String> = snap.keys().cloned().collect();
206        v.sort();
207        v
208    }
209
210    pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
211        self.datasets
212            .load()
213            .get(name)
214            .cloned()
215            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
216    }
217
218    /// JSON for the first row of the dataset, or `null` if empty. Used by
219    /// `GET /api/datasets/{name}/schema` for discoverability.
220    pub async fn sample(&self, name: &str) -> Result<String, AppError> {
221        let st = self.dataset(name)?;
222
223        // Lazy datasets have no resident batch — pull one row via SQL.
224        if st.lazy {
225            let table = DatasetSchema::quote_ident(&st.schema.name);
226            let sql = format!("SELECT * FROM {table} LIMIT 1");
227            let df = self.ctx.sql(&sql).await?;
228            let batches = df.collect().await?;
229            if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
230                return Ok("null".into());
231            }
232            let arr = serialize(&batches[0].slice(0, 1))?;
233            let trimmed = arr.trim();
234            let inner = trimmed
235                .strip_prefix('[')
236                .and_then(|s| s.strip_suffix(']'))
237                .unwrap_or(trimmed);
238            return Ok(inner.to_string());
239        }
240
241        let first = match st.data.iter().find(|b| b.num_rows() > 0) {
242            Some(b) => b,
243            None => return Ok("null".into()),
244        };
245        let arr = serialize(&first.slice(0, 1))?;
246        // strip the outer [] to return a single object
247        let trimmed = arr.trim();
248        let inner = trimmed
249            .strip_prefix('[')
250            .and_then(|s| s.strip_suffix(']'))
251            .unwrap_or(trimmed);
252        Ok(inner.to_string())
253    }
254
255    /// Rebuild `name` from disk and atomically swap it in. Concurrent queries
256    /// against the same name continue to see the *old* `Arc<DatasetState>`
257    /// until they finish; the old data is dropped once the last reference
258    /// goes away.
259    pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
260        // 1. Look up the dataset config. Not finding it = 404.
261        let cfg = self
262            .configs
263            .read()
264            .unwrap()
265            .get(name)
266            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
267            .clone();
268
269        // 2. Per-name lock: only one reload of this dataset at a time.
270        let lock = {
271            let mut locks = self.reload_locks.lock().unwrap();
272            locks
273                .entry(name.to_string())
274                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
275                .clone()
276        };
277        let _guard = lock.lock().await;
278
279        let started = std::time::Instant::now();
280
281        // Invalidate the object-store LIST cache before rebuilding. For an
282        // S3 *prefix* source the files are often rewritten with new UUID
283        // names; without dropping the cached listing the rebuilt
284        // `ListingTable` would replay the stale keys and fail with
285        // "object at location … not found". A plain prefix is therefore
286        // re-listed fresh; an explicit filename/glob source is re-resolved
287        // against the same configured pattern. Clearing the whole cache is
288        // cheap — other datasets simply re-list on their next query.
289        //
290        // NOTE: this must clear the cache that actually lives on the
291        // `RuntimeEnv`, not just the one we opt into via
292        // `[datafusion] list_files_cache`. DataFusion ≥ 53 installs a
293        // *default* `DefaultListFilesCache` (1 MiB, infinite TTL) on every
294        // `SessionContext` even when no cache is configured, so the staleness
295        // bites regardless of our flag.
296        if let Some(cache) = self.ctx.runtime_env().cache_manager.get_list_files_cache() {
297            cache.clear();
298        }
299
300        // 3. Heavy lifting (source read + index build). Parquet/delta
301        // readers are themselves async, so we don't wrap in `web::block`.
302        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
303        let rows = state.num_rows();
304
305        // 4. Atomic swap.
306        //   a) Replace the MemTable inside the SessionContext.
307        //   b) ArcSwap a new snapshot map with the updated Arc<DatasetState>.
308        // In-flight queries already hold the old provider + old Arc; they
309        // run to completion. New queries see the new data.
310        let _ = self.ctx.deregister_table(name)?;
311        self.ctx.register_table(name, provider)?;
312
313        let mut new_map = (**self.datasets.load()).clone();
314        new_map.insert(name.to_string(), Arc::new(state));
315        self.datasets.store(Arc::new(new_map));
316
317        let elapsed_ms = started.elapsed().as_millis();
318        log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
319        Ok(ReloadStats { rows, elapsed_ms })
320    }
321
322    /// Register a brand-new dataset from `cfg` at runtime. Opens the source,
323    /// registers a provider in the shared `SessionContext`, and inserts it
324    /// into the live snapshot so it is immediately queryable — no restart.
325    pub async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
326        cfg.validate_for_register()?;
327
328        // Fast pre-check before taking the (async) per-name lock.
329        if self.datasets.load().contains_key(&cfg.name) {
330            return Err(AppError::InvalidValue(format!(
331                "dataset '{}' already exists",
332                cfg.name
333            )));
334        }
335
336        // Serialise against a concurrent register/reload of the same name.
337        let lock = {
338            let mut locks = self.reload_locks.lock().unwrap();
339            locks
340                .entry(cfg.name.clone())
341                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
342                .clone()
343        };
344        let _guard = lock.lock().await;
345
346        // Re-check under the lock — another task may have won the race.
347        if self.datasets.load().contains_key(&cfg.name) {
348            return Err(AppError::InvalidValue(format!(
349                "dataset '{}' already exists",
350                cfg.name
351            )));
352        }
353
354        // One-shot init for the deltalake S3 backend, mirroring `load`.
355        // Idempotent — safe to call again for a runtime-registered dataset.
356        if cfg.source.kind == SourceKind::Delta && cfg.source.is_s3() {
357            deltalake::aws::register_handlers(None);
358        }
359
360        let started = std::time::Instant::now();
361        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
362        let rows = state.num_rows();
363        let columns = state.schema.columns.len();
364
365        self.ctx.register_table(cfg.name.as_str(), provider)?;
366
367        let mut new_map = (**self.datasets.load()).clone();
368        new_map.insert(cfg.name.clone(), Arc::new(state));
369        self.datasets.store(Arc::new(new_map));
370        self.configs
371            .write()
372            .unwrap()
373            .insert(cfg.name.clone(), cfg.clone());
374
375        let elapsed_ms = started.elapsed().as_millis();
376        log::info!(
377            "registered dataset '{}' ({} @ {}): {rows} rows in {elapsed_ms} ms",
378            cfg.name,
379            cfg.source.kind.as_str(),
380            cfg.source.location
381        );
382        Ok(DatasetSummary {
383            name: cfg.name,
384            columns,
385            rows,
386        })
387    }
388
389    /// Run a `QueryRequest` against `name`. Empty predicates → O(1) Arrow
390    /// slice. Otherwise → DataFusion SQL on the single registered table.
391    /// Lazy datasets skip the in-memory hot paths and always dispatch to SQL.
392    pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
393        let batch = self.query_batch(name, req).await?;
394        if batch.num_rows() == 0 {
395            return Ok("[]".to_string());
396        }
397        serialize(&batch)
398    }
399
400    /// Rewrite registered table / column references in a raw-SQL string to
401    /// their canonical, quoted spelling so matching is case-insensitive
402    /// (like DuckDB). Builds the lookup from every currently loaded
403    /// dataset; unknown identifiers (aliases, CTE names) are left for the
404    /// engine's default normalization to handle.
405    fn canonicalize_sql(&self, sql: &str) -> String {
406        let snap = self.datasets.load();
407        let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
408        let mut columns: HashMap<String, String> = HashMap::new();
409        for (name, state) in snap.iter() {
410            tables.insert(name.to_lowercase(), name.clone());
411            for col in &state.schema.columns {
412                columns
413                    .entry(col.name.to_lowercase())
414                    .or_insert_with(|| col.name.clone());
415            }
416        }
417        datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
418    }
419
420    /// Execute a pre-validated raw `SELECT` and return the JSON `data`
421    /// array. The statement has already passed
422    /// [`datapress_core::sql::validate`]; here it is wrapped in an outer
423    /// `LIMIT max_rows` so the result is bounded regardless of the user's
424    /// own clauses, executed through the shared `SessionContext`, and run
425    /// through the same fast JSON encoder as [`Self::query`].
426    pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
427        let cap = max_rows.max(1);
428        let sql = self.canonicalize_sql(sql);
429        // DESCRIBE yields a schema listing and cannot be nested in a
430        // subquery on DataFusion, so run it directly. Its row count is
431        // bounded by the column count and the slice below still enforces
432        // `cap`; everything else is wrapped in an outer LIMIT.
433        let wrapped = if datapress_core::sql::is_describe(&sql) {
434            sql
435        } else {
436            format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
437        };
438        let df = self.ctx.sql(&wrapped).await?;
439        let batches = df.collect().await?;
440        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
441            return Ok("[]".to_string());
442        }
443        let batch = if batches.len() == 1 {
444            batches.into_iter().next().expect("checked len")
445        } else {
446            compute::concat_batches(&batches[0].schema(), batches.iter())?
447        };
448        // Defence in depth: the outer LIMIT already bounds the row count,
449        // but slice anyway so a planning quirk can never blow the cap.
450        let batch = if batch.num_rows() as u64 > cap {
451            batch.slice(0, cap as usize)
452        } else {
453            batch
454        };
455        serialize(&batch)
456    }
457
458    /// Same plan as [`Self::query_sql`], but encode the bounded result as
459    /// an Arrow IPC stream instead of JSON. Backs the Arrow
460    /// content-negotiated branch of `POST /api/v1/sql`.
461    pub async fn query_sql_arrow_stream(
462        &self,
463        sql: &str,
464        max_rows: u64,
465    ) -> Result<ArrowIpcStream, AppError> {
466        let cap = max_rows.max(1);
467        let sql = self.canonicalize_sql(sql);
468        // DESCRIBE cannot be nested in a subquery on DataFusion (see
469        // `query_sql`); run it directly. Its output is bounded by the
470        // column count, so the missing outer LIMIT is harmless.
471        let wrapped = if datapress_core::sql::is_describe(&sql) {
472            sql
473        } else {
474            format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
475        };
476        let df = self.ctx.sql(&wrapped).await?;
477        let batches = df.collect().await?;
478        Ok(stream_arrow_batches(batches))
479    }
480
481    /// Same plan as [`Self::query`], but encode the result page as an
482    /// Arrow IPC stream (one schema message + one batch + EOS). Empty
483    /// results still produce a valid, self-describing zero-batch stream.
484    pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
485        let batch = self.query_batch(name, req).await?;
486        let schema = batch.schema();
487        let mut buf = Vec::with_capacity(8 * 1024);
488        {
489            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
490            if batch.num_rows() > 0 {
491                w.write(&batch)?;
492            }
493            w.finish()?;
494        }
495        Ok(buf)
496    }
497
498    pub async fn query_arrow_stream(
499        &self,
500        name: &str,
501        req: &QueryRequest,
502    ) -> Result<ArrowIpcStream, AppError> {
503        let batches = self.query_batches(name, req).await?;
504        Ok(stream_arrow_batches(batches))
505    }    pub async fn query_arrow_stream_all(
506        &self,
507        name: &str,
508        req: &QueryRequest,
509    ) -> Result<ArrowIpcStream, AppError> {
510        let batches = self.query_batches_all(name, req).await?;
511        Ok(stream_arrow_batches(batches))
512    }
513
514    /// Encode the entire dataset as a single self-contained Parquet file.
515    ///
516    /// Collects every row (all columns, no predicates, no paging) and runs
517    /// it through a single [`parquet::arrow::ArrowWriter`], so the result
518    /// carries the row-group + footer metadata a Parquet reader needs to
519    /// answer `count(*)` straight from the footer. Powers the cached
520    /// `GET /datasets/{name}/parquet` HTTP endpoint.
521    pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
522        // All rows, all columns, no predicates / ordering / limit.
523        let req = QueryRequest {
524            columns: Vec::new(),
525            predicates: Vec::new(),
526            group_by: Vec::new(),
527            aggregations: Vec::new(),
528            having: Vec::new(),
529            distinct: false,
530            order_by: Vec::new(),
531            limit: None,
532            page: 1,
533            page_size: 1,
534        };
535        let st = self.dataset(name)?;
536        let batches = self.query_batches_all(name, &req).await?;
537        // Use the actual batch schema when we have rows so the writer schema
538        // matches exactly (projection/nullability); fall back to the
539        // dataset schema for an empty dataset.
540        let schema = batches
541            .first()
542            .map(|b| b.schema())
543            .unwrap_or_else(|| st.arrow_schema.clone());
544
545        let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
546        {
547            let props = parquet::file::properties::WriterProperties::builder()
548                .set_compression(parquet::basic::Compression::SNAPPY)
549                .build();
550            let mut writer =
551                parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
552                    .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
553            for batch in &batches {
554                if batch.num_rows() > 0 {
555                    writer
556                        .write(batch)
557                        .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
558                }
559            }
560            writer
561                .close()
562                .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
563        }
564        Ok(bytes::Bytes::from(buf))
565    }
566
567    /// Compute the result page as a single `RecordBatch`. Shared between
568    /// the JSON and Arrow IPC encoders.
569    async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
570        let batches = self.query_batches(name, req).await?;
571        if batches.is_empty() {
572            return Ok(RecordBatch::new_empty(Arc::new(
573                arrow::datatypes::Schema::empty(),
574            )));
575        }
576        if batches.len() == 1 {
577            return Ok(batches.into_iter().next().expect("checked len"));
578        }
579        if batches.iter().all(|b| b.num_rows() == 0) {
580            return Ok(RecordBatch::new_empty(batches[0].schema()));
581        }
582        let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
583        Ok(batch)
584    }
585
586    /// Compute the result page as Arrow batches. Arrow IPC responses can
587    /// write these directly, while JSON callers concatenate via
588    /// [`Self::query_batch`] for the existing row conversion path.
589    async fn query_batches(
590        &self,
591        name: &str,
592        req: &QueryRequest,
593    ) -> Result<Vec<RecordBatch>, AppError> {
594        let st = self.dataset(name)?;
595
596        let page = req.page.max(1);
597        let page_size = req.page_size.clamp(1, self.max_page_size);
598        let offset = ((page - 1) * page_size) as usize;
599        let limit = page_size as usize;
600
601        self.query_batches_inner(st, req, Some((offset, limit)))
602            .await
603    }
604
605    /// Compute all matching rows as Arrow batches for the one-request
606    /// streaming endpoint. `page` and `page_size` are intentionally ignored;
607    /// optional `limit` still caps the total result size.
608    async fn query_batches_all(
609        &self,
610        name: &str,
611        req: &QueryRequest,
612    ) -> Result<Vec<RecordBatch>, AppError> {
613        let st = self.dataset(name)?;
614        self.query_batches_inner(st, req, None).await
615    }
616
617    async fn query_batches_inner(
618        &self,
619        st: Arc<DatasetState>,
620        req: &QueryRequest,
621        page_window: Option<(usize, usize)>,
622    ) -> Result<Vec<RecordBatch>, AppError> {
623        let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
624
625        // In-memory hot paths only fire when:
626        //   - the dataset is materialised,
627        //   - the caller did not ask for ordering,
628        //   - and did not ask for a hard `limit` cap on a paged request.
629        // Both of the latter two require sorting / capping that the SQL
630        // engine handles uniformly across all data types.
631        let can_fast_path = !st.lazy
632            && req.order_by.is_empty()
633            && (page_window.is_none() || req.limit.is_none())
634            && req.group_by.is_empty()
635            && !req.distinct;
636
637        if can_fast_path {
638            let total = st.num_rows();
639
640            // No predicates -> O(1) raw Arrow slices over resident batches,
641            // no engine overhead.
642            if req.predicates.is_empty() {
643                if page_window.is_none() && req.limit.is_none() {
644                    return st
645                        .data
646                        .iter()
647                        .cloned()
648                        .map(|batch| project(&st.schema, batch, &req.columns))
649                        .collect();
650                }
651                let start = offset.min(total);
652                let len = limit.min(total - start);
653                let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
654                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
655            }
656
657            // Index fast path: if every predicate is eq/in on an indexed column,
658            // resolve via the pre-built equality index.
659            if let Some(rows) = try_index(&st.index, &req.predicates) {
660                let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
661                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
662            }
663        }
664
665        // Fallback (and only path for lazy datasets): DataFusion SQL.
666        let (sql, params) = match page_window {
667            Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
668            None => build_query_stream_sql(&st.schema, req)?,
669        };
670        let mut df = self.ctx.sql(&sql).await?;
671        if !params.is_empty() {
672            df = df.with_param_values(params)?;
673        }
674        let batches = df.collect().await?;
675        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
676            let schema = batches
677                .first()
678                .map(|b| b.schema())
679                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
680            return Ok(vec![RecordBatch::new_empty(schema)]);
681        }
682        Ok(batches)
683    }
684}
685
686fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
687    let schema = batches
688        .first()
689        .map(|batch| batch.schema())
690        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
691    let (mut writer, stream) = arrow_ipc_stream_channel(8);
692
693    tokio::task::spawn_blocking(move || {
694        let result = (|| -> Result<(), AppError> {
695            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
696            for batch in batches {
697                if batch.num_rows() > 0 {
698                    w.write(&batch)?;
699                }
700            }
701            w.finish()?;
702            Ok(())
703        })();
704        if let Err(err) = result {
705            log::error!("datafusion arrow stream failed: {err}");
706            writer.send_error(err);
707        }
708    });
709
710    stream
711}
712
713impl Store {
714    /// Return the number of rows matching `req.predicates`. With no
715    /// predicates this is a cheap metadata lookup on materialised datasets
716    /// and a `SELECT COUNT(*)` on lazy ones.
717    pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
718        let st = self.dataset(name)?;
719
720        if !st.lazy {
721            // No predicates → resident row count, no scan.
722            if req.predicates.is_empty() {
723                return Ok(st.num_rows() as i64);
724            }
725            // Index fast path: same eligibility rules as `query`.
726            if let Some(rows) = try_index(&st.index, &req.predicates) {
727                return Ok(rows.len() as i64);
728            }
729        }
730
731        // Fallback: DataFusion SQL — same predicate translation as `query`,
732        // with predicate values bound as typed parameters.
733        let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
734        let mut df = self.ctx.sql(&sql).await?;
735        if !params.is_empty() {
736            df = df.with_param_values(params)?;
737        }
738        let batches = df.collect().await?;
739        let n = batches
740            .first()
741            .and_then(|b| {
742                b.column(0)
743                    .as_any()
744                    .downcast_ref::<arrow::array::Int64Array>()
745            })
746            .filter(|a| !a.is_empty())
747            .map(|a| a.value(0))
748            .unwrap_or(0);
749        Ok(n)
750    }
751}
752
753// ---------------------------------------------------------------------------
754// Dataset loading
755// ---------------------------------------------------------------------------
756
757/// Build a tuned `SessionContext` for the whole `Store`.
758///
759/// Everything here is opt-in via the `[datafusion]` config block; with the
760/// defaults this is equivalent to a plain `SessionContext::new()`. Lazy
761/// datasets (`ListingTable` over parquet, possibly on S3) benefit most:
762///
763/// * `pushdown_filters` — evaluate predicates *during* the parquet decode
764///   so filtered-out rows in a surviving row group are never materialised.
765/// * `reorder_filters` — let the scan reorder pushed-down predicates by
766///   selectivity (only meaningful with `pushdown_filters`).
767/// * `list_files_cache` — a `ListFilesCache` on the `RuntimeEnv` so repeated
768///   lazy queries reuse object-store `LIST` results instead of re-listing the
769///   prefix every time — the dominant per-query cost on S3.
770///
771/// Row-group / page-index / bloom-filter pruning and `metadata_size_hint`
772/// are already on by default in this DataFusion version, so they are left
773/// untouched.
774///
775/// Note: DataFusion ≥ 53 installs a *default* `DefaultListFilesCache` on the
776/// `RuntimeEnv` even with the defaults below, so a reload must explicitly
777/// invalidate it (see [`Store::reload`]) — otherwise a re-listed S3 prefix
778/// would keep serving stale, now-deleted object keys.
779fn build_tuned_context(cfg: &DataFusionConfig) -> SessionContext {
780    let mut config = SessionConfig::new();
781    {
782        let opts = config.options_mut();
783        opts.execution.parquet.pushdown_filters = cfg.pushdown_filters;
784        opts.execution.parquet.reorder_filters = cfg.reorder_filters;
785        // Expose the `information_schema.tables`/`columns` virtual tables. BI
786        // navigators (Npgsql/Power BI, DBeaver) query them alongside
787        // `pg_catalog` to enumerate tables, and the pgwire front-end needs
788        // them for schema browsing. They live in their own `information_schema`
789        // schema, so they don't affect dataset listing (which reads the
790        // Store's own registry, not the catalog) or the HTTP endpoints.
791        opts.catalog.information_schema = true;
792    }
793
794    // Name of the session's default schema, surfaced by the `current_schema()`
795    // compatibility UDF registered below.
796    let default_schema = config.options().catalog.default_schema.clone();
797
798    if !cfg.list_files_cache {
799        let ctx = SessionContext::new_with_config(config);
800        register_compat_udfs(&ctx, default_schema);
801        return ctx;
802    }
803
804    // Cache object-store listings so repeated lazy/S3 queries skip the
805    // LIST round-trips. A zero TTL means infinite (never expires).
806    let ttl = (cfg.list_files_cache_ttl_secs > 0)
807        .then(|| Duration::from_secs(cfg.list_files_cache_ttl_secs));
808    let list_cache = Arc::new(DefaultListFilesCache::new(
809        cfg.list_files_cache_mb.saturating_mul(1024 * 1024),
810        ttl,
811    ));
812    let cache_manager = CacheManagerConfig::default().with_list_files_cache(Some(list_cache));
813
814    let runtime = RuntimeEnvBuilder::new()
815        .with_cache_manager(cache_manager)
816        .build_arc()
817        .expect("failed to build DataFusion runtime env");
818
819    let ctx = SessionContext::new_with_config_rt(config, runtime);
820    register_compat_udfs(&ctx, default_schema);
821    ctx
822}
823
824/// Register scalar UDFs that exist on DuckDB but not on DataFusion, so the
825/// same portable smoke-test / introspection SQL works on both backends.
826///
827/// * `current_schema()` — DuckDB returns the active schema (`main`);
828///   DataFusion has no such function, so we return the session's default
829///   schema name (`public`).
830///
831/// Note: when the optional `pgwire` feature is enabled and the listener is
832/// started, `datafusion_pg_catalog::setup_pg_catalog` runs *after* this on the
833/// same shared context and re-registers `current_schema()` (plus siblings) with
834/// its own implementation, which also returns `public`. The library version
835/// wins there by construction (later `register_udf` replaces by name); we keep
836/// ours here so the behavior is identical whether or not pgwire is compiled in
837/// or enabled, and so the DuckDB-parity `/api/v1/sql` contract holds by default.
838fn register_compat_udfs(ctx: &SessionContext, default_schema: String) {
839    ctx.register_udf(ScalarUDF::from(CurrentSchemaUdf::new(default_schema)));
840}
841
842/// `current_schema()` — a no-argument scalar UDF returning the session's
843/// default schema name as a constant `Utf8`.
844#[derive(Debug, PartialEq, Eq, Hash)]
845struct CurrentSchemaUdf {
846    signature: Signature,
847    schema: String,
848}
849
850impl CurrentSchemaUdf {
851    fn new(schema: String) -> Self {
852        Self {
853            signature: Signature::nullary(Volatility::Stable),
854            schema,
855        }
856    }
857}
858
859impl ScalarUDFImpl for CurrentSchemaUdf {
860    fn as_any(&self) -> &dyn Any {
861        self
862    }
863
864    fn name(&self) -> &str {
865        "current_schema"
866    }
867
868    fn signature(&self) -> &Signature {
869        &self.signature
870    }
871
872    fn return_type(&self, _arg_types: &[DataType]) -> DfResult<DataType> {
873        Ok(DataType::Utf8)
874    }
875
876    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> DfResult<ColumnarValue> {
877        Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(
878            self.schema.clone(),
879        ))))
880    }
881}
882
883async fn build_dataset(
884    d: &DatasetConfig,
885    ctx: &SessionContext,
886) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
887    // Lazy datasets: register a streaming provider straight against the
888    // source and skip the materialise / index / partition pipeline below.
889    // Parquet uses a `ListingTable`; delta uses deltalake's own DataFusion
890    // `TableProvider` (which reads the transaction log once for the file
891    // list, then streams parquet row groups per query with predicate
892    // pushdown + file skipping).
893    if d.lazy {
894        match (d.source.kind, d.source.is_s3()) {
895            (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
896            (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
897            (SourceKind::Delta, _) => return build_lazy_delta(d, ctx).await,
898        }
899    }
900
901    // Fetch raw RecordBatches from whichever backing store the dataset
902    // is configured to use. All four (parquet, delta) x (local, s3)
903    // combinations converge into one Vec<RecordBatch>; the materialisation
904    // / indexing / partitioning logic below is shared.
905    let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
906        (SourceKind::Parquet, false) => read_local_parquet(d)?,
907        (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
908        (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
909        (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
910    };
911    if raw_batches.is_empty() {
912        return Err(AppError::EmptyDataset(format!(
913            "dataset '{}': source produced no batches",
914            d.name
915        )));
916    }
917    // A source can also resolve to one or more *zero-row* batches (e.g. an
918    // empty Delta table, or a parquet file with only a schema). Treat that as
919    // empty too so it's logged and skipped rather than registered as a 0-row
920    // dataset that shows up in discovery / explore.
921    if raw_batches.iter().all(|b| b.num_rows() == 0) {
922        return Err(AppError::EmptyDataset(format!(
923            "dataset '{}': source has a schema but no rows",
924            d.name
925        )));
926    }
927
928    let chunks = raw_batches;
929    let arrow_sch = chunks[0].schema();
930
931    // Build DatasetSchema from the Arrow schema.
932    let columns: Vec<ColumnInfo> = arrow_sch
933        .fields()
934        .iter()
935        .map(|f| {
936            let dt = f.data_type();
937            ColumnInfo {
938                name: f.name().clone(),
939                logical: arrow_to_logical(dt),
940                sql_type: format!("{dt:?}"),
941                nullable: f.is_nullable(),
942            }
943        })
944        .collect();
945    let schema = DatasetSchema::new(&d.name, columns)
946        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
947
948    // Build the equality index per the per-dataset policy. Operates on the
949    // chunked representation directly so we never have to materialise a
950    // single concatenated batch (which would double peak RSS on wide
951    // schemas — see `DatasetState` docs).
952    let index = build_eq_index_with_policy(&chunks, &d.index);
953
954    // Partition for parallel scans by the SQL fallback path. We distribute
955    // the existing batches round-robin across `n_parts` partitions instead
956    // of re-slicing a concatenated batch — `clone()` on a RecordBatch is
957    // an Arc-clone of the column buffers, not a copy.
958    let n_parts = std::thread::available_parallelism()
959        .map(|n| n.get())
960        .unwrap_or(4);
961    let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
962    for (i, b) in chunks.iter().enumerate() {
963        if b.num_rows() == 0 {
964            continue;
965        }
966        parts[i % n_parts].push(b.clone());
967    }
968    parts.retain(|p| !p.is_empty());
969    let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
970
971    let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
972    let mem_mb: usize = chunks
973        .iter()
974        .flat_map(|b| b.columns().iter())
975        .map(|c| c.get_buffer_memory_size())
976        .sum::<usize>()
977        / 1_048_576;
978    log::info!(
979        "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
980        d.name,
981        d.source.kind.as_str(),
982        total_rows,
983        schema.columns.len(),
984        mem_mb,
985        chunks.len(),
986        index.len()
987    );
988
989    Ok((
990        DatasetState {
991            schema,
992            data: chunks,
993            arrow_schema: arrow_sch,
994            index,
995            lazy: false,
996        },
997        provider,
998    ))
999}
1000
1001/// Build a lazy state + `ListingTable` provider for a local parquet dataset.
1002/// The dataset is never read into RAM; DataFusion streams row groups on
1003/// each query. The returned `DatasetState.data` is an empty `Vec` —
1004/// `arrow_schema` still carries the inferred Arrow schema for discovery.
1005async fn build_lazy_local_parquet(
1006    d: &DatasetConfig,
1007    ctx: &SessionContext,
1008) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1009    let (url, part_keys) = lazy_local_listing(d)?;
1010
1011    let mut opts =
1012        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1013    if !part_keys.is_empty() {
1014        opts = opts.with_table_partition_cols(
1015            part_keys
1016                .iter()
1017                .map(|k| (k.clone(), DataType::Utf8))
1018                .collect(),
1019        );
1020    }
1021
1022    let session_state = ctx.state();
1023    // `infer_schema` returns the *file* schema (without partition columns);
1024    // `ListingTable` appends the declared partition columns on top.
1025    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1026        AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
1027    })?;
1028
1029    // An empty listing yields an empty schema (DataFusion does not error on a
1030    // prefix/glob that matches no files). Treat that as an empty dataset so
1031    // the load loop logs and skips it rather than registering 0 columns.
1032    if file_schema.fields().is_empty() {
1033        return Err(AppError::EmptyDataset(format!(
1034            "dataset '{}': no .parquet files at {}",
1035            d.name, d.source.location
1036        )));
1037    }
1038
1039    let cfg = ListingTableConfig::new(url)
1040        .with_listing_options(opts)
1041        .with_schema(file_schema.clone());
1042    let table = ListingTable::try_new(cfg).map_err(|e| {
1043        AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
1044    })?;
1045    let provider: Arc<dyn TableProvider> = Arc::new(table);
1046
1047    // Discovery schema = file columns + partition columns (Utf8).
1048    let mut fields: Vec<Field> = file_schema
1049        .fields()
1050        .iter()
1051        .map(|f| f.as_ref().clone())
1052        .collect();
1053    for k in &part_keys {
1054        if !fields.iter().any(|f| f.name() == k) {
1055            fields.push(Field::new(k, DataType::Utf8, false));
1056        }
1057    }
1058    let arrow_sch = Arc::new(Schema::new(fields));
1059
1060    let columns: Vec<ColumnInfo> = arrow_sch
1061        .fields()
1062        .iter()
1063        .map(|f| {
1064            let dt = f.data_type();
1065            ColumnInfo {
1066                name: f.name().clone(),
1067                logical: arrow_to_logical(dt),
1068                sql_type: format!("{dt:?}"),
1069                nullable: f.is_nullable(),
1070            }
1071        })
1072        .collect();
1073    let schema = DatasetSchema::new(&d.name, columns)
1074        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1075
1076    log::info!(
1077        "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
1078        d.name,
1079        d.source.kind.as_str(),
1080        schema.columns.len(),
1081        part_keys.len()
1082    );
1083
1084    Ok((
1085        DatasetState {
1086            schema,
1087            data: Vec::new(),
1088            arrow_schema: arrow_sch,
1089            index: EqIndex::default(),
1090            lazy: true,
1091        },
1092        provider,
1093    ))
1094}
1095
1096/// Resolve a local lazy-parquet location into a single `ListingTableUrl`
1097/// rooted at the dataset base plus the ordered hive partition keys (if any).
1098/// Handles three shapes: a glob (`root/city=*/*.parquet`), a directory
1099/// (hive root or flat folder of parquets), and a single `*.parquet` file.
1100fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1101    let loc = &d.source.location;
1102
1103    if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1104        let parts: Vec<&str> = loc.split('/').collect();
1105        let first_wild = parts
1106            .iter()
1107            .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1108            .unwrap_or(parts.len());
1109        let base = parts[..first_wild].join("/");
1110        let base = if base.is_empty() {
1111            "/".to_string()
1112        } else {
1113            base
1114        };
1115        // Partition keys: `key=…` components between the base and the file
1116        // pattern (the final component).
1117        let upper = parts.len().saturating_sub(1);
1118        let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1119            .iter()
1120            .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1121            .filter(|k| !k.is_empty())
1122            .collect();
1123        return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
1124    }
1125
1126    let path = std::path::Path::new(loc);
1127    if path.is_dir() {
1128        let keys = discover_hive_keys(path);
1129        return Ok((dir_url(path, d)?, keys));
1130    }
1131
1132    let url = ListingTableUrl::parse(loc)
1133        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
1134    Ok((url, Vec::new()))
1135}
1136
1137/// Parse a directory path into a `ListingTableUrl` (trailing slash so
1138/// DataFusion treats it as a directory root, not a single object).
1139fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
1140    let s = path.to_str().ok_or_else(|| {
1141        AppError::Internal(format!(
1142            "dataset '{}': non-utf8 path {}",
1143            d.name,
1144            path.display()
1145        ))
1146    })?;
1147    let s = if s.ends_with('/') {
1148        s.to_string()
1149    } else {
1150        format!("{s}/")
1151    };
1152    ListingTableUrl::parse(&s)
1153        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
1154}
1155
1156/// Walk down a directory following the first `key=value` subdirectory at
1157/// each level to discover the ordered hive partition keys. Returns an empty
1158/// vec for a flat (non-partitioned) folder.
1159fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
1160    let mut keys = Vec::new();
1161    let mut cur = base.to_path_buf();
1162    loop {
1163        let Ok(rd) = std::fs::read_dir(&cur) else {
1164            break;
1165        };
1166        let mut next: Option<(String, std::path::PathBuf)> = None;
1167        for entry in rd.flatten() {
1168            let p = entry.path();
1169            if !p.is_dir() {
1170                continue;
1171            }
1172            let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
1173                continue;
1174            };
1175            if let Some((k, v)) = name.split_once('=')
1176                && !k.is_empty()
1177                && !v.is_empty()
1178            {
1179                next = Some((k.to_string(), p));
1180                break;
1181            }
1182        }
1183        match next {
1184            Some((k, p)) => {
1185                keys.push(k);
1186                cur = p;
1187            }
1188            None => break,
1189        }
1190    }
1191    keys
1192}
1193
1194/// Lazy S3 parquet: register the dataset's S3 object store on `ctx`, then
1195/// build a `ListingTable` rooted at the `s3://bucket/prefix/` location with
1196/// any discovered hive partition columns. DataFusion does the directory
1197/// listing through the registered store and streams row groups on each
1198/// query — no local enumeration needed.
1199async fn build_lazy_s3_parquet(
1200    d: &DatasetConfig,
1201    ctx: &SessionContext,
1202) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1203    register_s3_object_store(d, ctx)?;
1204
1205    let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
1206
1207    // An empty S3 prefix yields an empty inferred schema (no error). Treat it
1208    // as an empty dataset so the load loop logs and skips it.
1209    if file_schema.fields().is_empty() {
1210        return Err(AppError::EmptyDataset(format!(
1211            "dataset '{}': no .parquet files at {}",
1212            d.name, d.source.location
1213        )));
1214    }
1215
1216    // Discovery schema = file columns + partition columns (Utf8).
1217    let mut fields: Vec<Field> = file_schema
1218        .fields()
1219        .iter()
1220        .map(|f| f.as_ref().clone())
1221        .collect();
1222    for k in &part_keys {
1223        if !fields.iter().any(|f| f.name() == k) {
1224            fields.push(Field::new(k, DataType::Utf8, false));
1225        }
1226    }
1227    let arrow_sch = Arc::new(Schema::new(fields));
1228
1229    let columns: Vec<ColumnInfo> = arrow_sch
1230        .fields()
1231        .iter()
1232        .map(|f| {
1233            let dt = f.data_type();
1234            ColumnInfo {
1235                name: f.name().clone(),
1236                logical: arrow_to_logical(dt),
1237                sql_type: format!("{dt:?}"),
1238                nullable: f.is_nullable(),
1239            }
1240        })
1241        .collect();
1242    let schema = DatasetSchema::new(&d.name, columns)
1243        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1244
1245    log::info!(
1246        "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
1247        d.name,
1248        d.source.kind.as_str(),
1249        schema.columns.len(),
1250        part_keys.len()
1251    );
1252
1253    Ok((
1254        DatasetState {
1255            schema,
1256            data: Vec::new(),
1257            arrow_schema: arrow_sch,
1258            index: EqIndex::default(),
1259            lazy: true,
1260        },
1261        provider,
1262    ))
1263}
1264
1265/// Build a `ListingTable` provider for an S3 parquet source, resolving the
1266/// base prefix and hive partition keys via [`s3_listing`]. The registered
1267/// S3 object store must already be present on `ctx`. Returns the provider,
1268/// the inferred *file* schema (without partition columns), and the ordered
1269/// partition keys.
1270async fn build_s3_listing_table(
1271    d: &DatasetConfig,
1272    ctx: &SessionContext,
1273) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
1274    let (url, part_keys) = s3_listing(d, ctx).await?;
1275
1276    let mut opts =
1277        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1278    if !part_keys.is_empty() {
1279        opts = opts.with_table_partition_cols(
1280            part_keys
1281                .iter()
1282                .map(|k| (k.clone(), DataType::Utf8))
1283                .collect(),
1284        );
1285    }
1286
1287    let session_state = ctx.state();
1288    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1289        AppError::Internal(format!(
1290            "dataset '{}': infer parquet schema on s3: {e}",
1291            d.name
1292        ))
1293    })?;
1294
1295    let cfg = ListingTableConfig::new(url)
1296        .with_listing_options(opts)
1297        .with_schema(file_schema.clone());
1298    let table = ListingTable::try_new(cfg).map_err(|e| {
1299        AppError::Internal(format!(
1300            "dataset '{}': ListingTable::try_new (s3): {e}",
1301            d.name
1302        ))
1303    })?;
1304    Ok((Arc::new(table), file_schema, part_keys))
1305}
1306
1307/// Resolve an S3 parquet source into a `(base ListingTableUrl, partition
1308/// keys)` pair. Hive keys come from the location glob when present
1309/// (`s3://bucket/events/year=*/...`), otherwise — for a plain prefix — they
1310/// are discovered by listing the registered object store. Honours the
1311/// dataset's `partitioning` mode (`auto` / `hive` / `none`).
1312async fn s3_listing(
1313    d: &DatasetConfig,
1314    ctx: &SessionContext,
1315) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1316    let s3 = d.s3.clone().unwrap_or_default();
1317    let want_partitions = !matches!(s3.partitioning, Partitioning::None);
1318    let loc = &d.source.location;
1319
1320    if d.source.has_glob() {
1321        let (base, keys) = split_glob_base_keys(loc);
1322        let base = format!("{}/", base.trim_end_matches('/'));
1323        let url = ListingTableUrl::parse(&base).map_err(|e| {
1324            AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1325        })?;
1326        let keys = if want_partitions { keys } else { Vec::new() };
1327        return Ok((url, keys));
1328    }
1329
1330    let base = if loc.ends_with('/') {
1331        loc.clone()
1332    } else {
1333        format!("{loc}/")
1334    };
1335    let url = ListingTableUrl::parse(&base).map_err(|e| {
1336        AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1337    })?;
1338    let keys = if want_partitions {
1339        discover_s3_hive_keys(ctx, &url).await
1340    } else {
1341        Vec::new()
1342    };
1343    Ok((url, keys))
1344}
1345
1346/// Split a glob location into its non-wildcard base path and the ordered
1347/// hive partition keys (`key=…` directory components between the base and
1348/// the final file pattern). Works for both local and `s3://` paths.
1349fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1350    let parts: Vec<&str> = loc.split('/').collect();
1351    let first_wild = parts
1352        .iter()
1353        .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1354        .unwrap_or(parts.len());
1355    let base = parts[..first_wild].join("/");
1356    let base = if base.is_empty() {
1357        "/".to_string()
1358    } else {
1359        base
1360    };
1361    let upper = parts.len().saturating_sub(1);
1362    let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1363        .iter()
1364        .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1365        .filter(|k| !k.is_empty())
1366        .collect();
1367    (base, keys)
1368}
1369
1370/// Discover ordered hive partition keys for an S3 prefix by walking the
1371/// object store one `key=value/` level at a time via delimiter listings.
1372/// Best-effort: any listing error stops discovery and returns what was found
1373/// so far (empty = treat as a flat folder).
1374async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1375    let store = match ctx.runtime_env().object_store(url.object_store()) {
1376        Ok(s) => s,
1377        Err(_) => return Vec::new(),
1378    };
1379    let mut keys = Vec::new();
1380    let mut prefix = url.prefix().clone();
1381    loop {
1382        let listing = match store.list_with_delimiter(Some(&prefix)).await {
1383            Ok(l) => l,
1384            Err(_) => break,
1385        };
1386        let mut next: Option<object_store::path::Path> = None;
1387        for cp in &listing.common_prefixes {
1388            if let Some(seg) = cp.parts().next_back() {
1389                let seg = seg.as_ref().to_string();
1390                if let Some((k, v)) = seg.split_once('=')
1391                    && !k.is_empty()
1392                    && !v.is_empty()
1393                {
1394                    keys.push(k.to_string());
1395                    next = Some(cp.clone());
1396                    break;
1397                }
1398            }
1399        }
1400        match next {
1401            Some(p) => prefix = p,
1402            None => break,
1403        }
1404    }
1405    keys
1406}
1407
1408/// Original local-parquet code path — sync file I/O. We set a large reader
1409/// batch size so wide schemas (hundreds of columns) don't pay per-array
1410/// metadata overhead on thousands of small (default 1024-row) batches.
1411///
1412/// Two memory-saving knobs are applied here:
1413///
1414/// * **Column projection** — if `d.columns` is non-empty, only those
1415///   columns are decoded; everything else is skipped at the parquet reader
1416///   level (no Arrow array is ever allocated for the dropped columns).
1417/// * **Dictionary preservation** — Utf8 columns whose parquet column chunks
1418///   carry a dictionary page are materialised as Arrow
1419///   `Dictionary(Int32, Utf8)` instead of plain `Utf8`. Low-cardinality
1420///   string columns (state, country, severity, …) stay represented as
1421///   `n_unique` string slots plus an Int32 index per row instead of
1422///   `n_rows` independent strings — typically 10×–50× smaller for
1423///   real-world data.
1424fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1425    let files = d.resolve_local_parquet_files()?;
1426    let mut all = Vec::new();
1427    let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1428        None
1429    } else {
1430        Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1431    };
1432
1433    for f in &files {
1434        let file = std::fs::File::open(f)
1435            .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1436
1437        // First pass: peek the parquet metadata + default Arrow schema so we
1438        // can (a) decide a column projection and (b) override Utf8 columns
1439        // that are dictionary-encoded in the file so the reader materialises
1440        // them as Arrow Dictionary arrays instead of expanding to plain Utf8.
1441        let probe = ParquetRecordBatchReaderBuilder::try_new(
1442            file.try_clone()
1443                .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1444        )?;
1445        let parquet_schema = probe.parquet_schema().clone();
1446        let arrow_schema = probe.schema().clone();
1447        let metadata = probe.metadata().clone();
1448        drop(probe);
1449
1450        // Column projection (top-level / leaf indices for flat schemas).
1451        let projection = if let Some(w) = &wanted {
1452            let indices: Vec<usize> = arrow_schema
1453                .fields()
1454                .iter()
1455                .enumerate()
1456                .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1457                .map(|(i, _)| i)
1458                .collect();
1459            if indices.is_empty() {
1460                return Err(AppError::Internal(format!(
1461                    "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1462                    d.name,
1463                    d.columns,
1464                    f.display()
1465                )));
1466            }
1467            ProjectionMask::roots(&parquet_schema, indices)
1468        } else {
1469            ProjectionMask::all()
1470        };
1471
1472        // Dictionary override: any Utf8 column whose first row group carries
1473        // a dictionary page is re-typed to Dictionary(Int32, Utf8). The
1474        // override schema must still describe every column in the parquet
1475        // file (projection is applied separately). Skipped entirely when
1476        // the dataset has `dict_encode = false` — escape hatch for cases
1477        // where the override interacts badly with null propagation in the
1478        // downstream engine.
1479        let mut new_fields: Vec<Field> = arrow_schema
1480            .fields()
1481            .iter()
1482            .map(|f| f.as_ref().clone())
1483            .collect();
1484        if d.dict_encode
1485            && let Some(rg0) = metadata.row_groups().first()
1486        {
1487            for (i, fld) in arrow_schema.fields().iter().enumerate() {
1488                if !matches!(
1489                    fld.data_type(),
1490                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1491                ) {
1492                    continue;
1493                }
1494                if let Some(col) = rg0.columns().get(i)
1495                    && col.dictionary_page_offset().is_some()
1496                {
1497                    new_fields[i] = Field::new(
1498                        fld.name(),
1499                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1500                        fld.is_nullable(),
1501                    );
1502                }
1503            }
1504        }
1505        let forced_schema = Arc::new(Schema::new(new_fields));
1506
1507        let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1508        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1509            .with_batch_size(65_536)
1510            .with_projection(projection)
1511            .build()?;
1512        // Hive-style partition columns (`city=NYC/…`) live in the path, not
1513        // the file. Fold them in as constant Utf8 columns so they show up in
1514        // the schema and are queryable — matching the DuckDB backend.
1515        let pairs = hive_pairs(f);
1516        for batch in reader {
1517            let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1518            all.push(if pairs.is_empty() {
1519                batch
1520            } else {
1521                append_partition_cols(&batch, &pairs)?
1522            });
1523        }
1524    }
1525    if all.is_empty() {
1526        return Err(AppError::Internal(format!(
1527            "dataset '{}': parquet source is empty",
1528            d.name
1529        )));
1530    }
1531    Ok(all)
1532}
1533
1534/// Ordered hive-style partition `(key, value)` pairs encoded in a path, i.e.
1535/// directory components shaped like `key=value` (e.g. `year=2024/city=NYC`).
1536fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1537    path.components()
1538        .filter_map(|c| c.as_os_str().to_str())
1539        .filter_map(|seg| {
1540            let (k, v) = seg.split_once('=')?;
1541            if k.is_empty() || v.is_empty() || v.contains('=') {
1542                return None;
1543            }
1544            Some((k.to_string(), v.to_string()))
1545        })
1546        .collect()
1547}
1548
1549/// Append constant Utf8 columns for each hive partition pair. A partition
1550/// key that collides with a real file column is skipped (the file wins).
1551fn append_partition_cols(
1552    batch: &RecordBatch,
1553    pairs: &[(String, String)],
1554) -> Result<RecordBatch, AppError> {
1555    let n = batch.num_rows();
1556    let mut fields: Vec<Field> = batch
1557        .schema()
1558        .fields()
1559        .iter()
1560        .map(|f| f.as_ref().clone())
1561        .collect();
1562    let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1563    for (k, v) in pairs {
1564        if fields.iter().any(|f| f.name() == k) {
1565            continue;
1566        }
1567        fields.push(Field::new(k, DataType::Utf8, false));
1568        cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1569    }
1570    RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1571        .map_err(|e| AppError::Internal(e.to_string()))
1572}
1573
1574/// Register an `AmazonS3` object store on the SessionContext, build a
1575/// `ListingTable` (with any hive partition columns) over the dataset prefix,
1576/// and stream the whole dataset back through `DataFrame::collect`. Using a
1577/// ListingTable here — rather than a bare `read_parquet` — means S3 sources
1578/// get the same hive-partition handling as local parquet.
1579async fn read_s3_parquet(
1580    d: &DatasetConfig,
1581    ctx: &SessionContext,
1582) -> Result<Vec<RecordBatch>, AppError> {
1583    register_s3_object_store(d, ctx)?;
1584    let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1585    let df = ctx
1586        .read_table(provider)
1587        .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1588    Ok(df.collect().await?)
1589}
1590
1591/// Open a Delta table (local or S3) and return the deltalake `DeltaTable`
1592/// handle. The transaction log is read here to resolve the current file
1593/// list + schema; the table's object store is registered on the session
1594/// `RuntimeEnv` lazily at scan time, so callers don't need to register a
1595/// store themselves. A location that doesn't exist, is empty, or was never
1596/// committed maps to [`AppError::EmptyDataset`] so startup can log-and-skip.
1597async fn open_delta_table(
1598    d: &DatasetConfig,
1599    opts: HashMap<String, String>,
1600) -> Result<deltalake::DeltaTable, AppError> {
1601    let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1602        AppError::Internal(format!(
1603            "dataset '{}': bad delta location '{}': {e}",
1604            d.name, d.source.location
1605        ))
1606    })?;
1607    deltalake::open_table_with_storage_options(url, opts)
1608        .await
1609        .map_err(|e| {
1610            // A Delta location that doesn't exist, is empty, or was never
1611            // committed has no files in its log segment. deltalake (kernel
1612            // 0.32) surfaces every one of these as:
1613            //   "Not a Delta table: Generic delta kernel error: No files in
1614            //    log segment"
1615            // — covering a missing local dir AND a non-existent S3 prefix
1616            // alike. Match case-insensitively (the kernel capitalises the
1617            // phrases) and treat it like any other empty dataset so startup
1618            // logs and skips instead of aborting the whole process.
1619            let msg = e.to_string();
1620            let low = msg.to_lowercase();
1621            if low.contains("no files in log segment") || low.contains("not a delta table") {
1622                AppError::EmptyDataset(format!(
1623                    "delta location '{}' has no committed files ({msg})",
1624                    d.source.location
1625                ))
1626            } else {
1627                AppError::Internal(format!(
1628                    "dataset '{}': delta open '{}': {msg}",
1629                    d.name, d.source.location
1630                ))
1631            }
1632        })
1633}
1634
1635/// Open a Delta table (local or S3) and return its DataFusion
1636/// `TableProvider`. The provider reads the transaction log to resolve the
1637/// current file list and registers the table's object store on the session
1638/// `RuntimeEnv` lazily at scan time, so callers don't need to register a
1639/// store themselves.
1640async fn open_delta_provider(
1641    d: &DatasetConfig,
1642    opts: HashMap<String, String>,
1643) -> Result<Arc<dyn TableProvider>, AppError> {
1644    let table = open_delta_table(d, opts).await?;
1645    table.table_provider().await.map_err(|e| {
1646        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1647    })
1648}
1649
1650/// Resolve the deltalake storage-options for a dataset: empty for local
1651/// tables, S3 credentials/endpoint for S3-backed ones.
1652fn delta_storage_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1653    if d.source.is_s3() {
1654        delta_s3_options(d)
1655    } else {
1656        Ok(HashMap::new())
1657    }
1658}
1659
1660/// Open a Delta table (local or S3) and stream every row back as a Vec of
1661/// `RecordBatch`. We materialise eagerly so the rest of the backend can
1662/// treat all datasets uniformly (single in-memory batch + eq-index).
1663async fn read_delta(
1664    d: &DatasetConfig,
1665    opts: HashMap<String, String>,
1666) -> Result<Vec<RecordBatch>, AppError> {
1667    let provider = open_delta_provider(d, opts).await?;
1668    // Drive a full scan via a throwaway SessionContext so we end up with
1669    // an in-memory Vec<RecordBatch> the shared materialise path can use.
1670    //
1671    // A Delta table can open cleanly (valid log + schema) yet still fail to
1672    // scan: its add actions may reference data files that no longer exist in
1673    // storage (e.g. vacuumed away), or are otherwise unreadable. Rather than
1674    // abort the whole registry for one broken table, map scan failures to
1675    // `EmptyDataset` so `Store::load` logs and skips it — mirroring the
1676    // bounded-probe behaviour on the lazy path.
1677    let scan_ctx = SessionContext::new();
1678    let df = scan_ctx.read_table(provider).map_err(|e| {
1679        AppError::EmptyDataset(format!(
1680            "delta location '{}' could not be scanned, skipping ({e})",
1681            d.source.location
1682        ))
1683    })?;
1684    df.collect().await.map_err(|e| {
1685        AppError::EmptyDataset(format!(
1686            "delta location '{}' could not be scanned, skipping ({e})",
1687            d.source.location
1688        ))
1689    })
1690}
1691
1692/// Build a lazy state + deltalake `TableProvider` for a Delta dataset
1693/// (local or S3). The table is never read into RAM: deltalake reads the
1694/// transaction log once here to resolve the file list and discovery schema,
1695/// then streams parquet row groups on each query (with predicate pushdown
1696/// and Delta file skipping). The returned `DatasetState.data` is empty.
1697async fn build_lazy_delta(
1698    d: &DatasetConfig,
1699    _ctx: &SessionContext,
1700) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1701    let table = open_delta_table(d, delta_storage_options(d)?).await?;
1702
1703    // An empty Delta table carries a valid schema in its log but resolves to
1704    // zero data files. The eager path catches this naturally (a full scan
1705    // yields no batches), but the lazy path never scans — so check the file
1706    // list explicitly and skip, matching the eager behaviour. Without this an
1707    // empty Delta table would register as a 0-row dataset and show up in
1708    // discovery / explore.
1709    let file_count = table
1710        .get_file_uris()
1711        .map(|it| it.count())
1712        .map_err(|e| AppError::Internal(format!("dataset '{}': delta file list: {e}", d.name)))?;
1713    if file_count == 0 {
1714        return Err(AppError::EmptyDataset(format!(
1715            "delta location '{}' has a schema but no data files",
1716            d.source.location
1717        )));
1718    }
1719
1720    let provider = table.table_provider().await.map_err(|e| {
1721        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1722    })?;
1723
1724    // The file-list check above only catches a Delta table with *no* data
1725    // files. A table can still be effectively empty in ways the log doesn't
1726    // make obvious: its add actions may reference files that contain zero
1727    // rows, or files that no longer exist in storage (e.g. vacuumed). Those
1728    // register fine here but then return nothing — or hard-error — the moment
1729    // they're queried, leaving a broken dataset visible in discovery/explore.
1730    //
1731    // The eager path catches all of this for free (a full scan yields no rows
1732    // or surfaces the read error up front), but the lazy path never scans. So
1733    // probe with a bounded one-row scan: it reads at most a single row group
1734    // from a single file, costing almost nothing on a healthy table, but lets
1735    // us log-and-skip an empty or unreadable table instead of registering it.
1736    {
1737        let probe_ctx = SessionContext::new();
1738        let probe = probe_ctx
1739            .read_table(provider.clone())
1740            .and_then(|df| df.limit(0, Some(1)));
1741        match probe {
1742            Ok(df) => match df.collect().await {
1743                Ok(batches) if batches.iter().all(|b| b.num_rows() == 0) => {
1744                    return Err(AppError::EmptyDataset(format!(
1745                        "delta location '{}' resolves to no rows",
1746                        d.source.location
1747                    )));
1748                }
1749                Ok(_) => {}
1750                Err(e) => {
1751                    return Err(AppError::EmptyDataset(format!(
1752                        "delta location '{}' could not be scanned, skipping ({e})",
1753                        d.source.location
1754                    )));
1755                }
1756            },
1757            Err(e) => {
1758                return Err(AppError::EmptyDataset(format!(
1759                    "delta location '{}' could not be scanned, skipping ({e})",
1760                    d.source.location
1761                )));
1762            }
1763        }
1764    }
1765
1766    // The provider's schema is the full logical schema (data + partition
1767    // columns), which is exactly the discovery schema we want.
1768    let arrow_sch = provider.schema();
1769    let columns: Vec<ColumnInfo> = arrow_sch
1770        .fields()
1771        .iter()
1772        .map(|f| {
1773            let dt = f.data_type();
1774            ColumnInfo {
1775                name: f.name().clone(),
1776                logical: arrow_to_logical(dt),
1777                sql_type: format!("{dt:?}"),
1778                nullable: f.is_nullable(),
1779            }
1780        })
1781        .collect();
1782    let schema = DatasetSchema::new(&d.name, columns)
1783        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1784
1785    log::info!(
1786        "dataset '{}' [{}, lazy]: {} cols, no materialise, no index",
1787        d.name,
1788        d.source.kind.as_str(),
1789        schema.columns.len()
1790    );
1791
1792    Ok((
1793        DatasetState {
1794            schema,
1795            data: Vec::new(),
1796            arrow_schema: arrow_sch,
1797            index: EqIndex::default(),
1798            lazy: true,
1799        },
1800        provider,
1801    ))
1802}
1803
1804/// Build the storage-options HashMap that `deltalake::open_table_with_storage_options`
1805/// expects for S3 access. Keys mirror the AWS env-var names; deltalake
1806/// passes them through to object_store internally.
1807fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1808    let creds = d.resolved_creds();
1809    let region = d.resolved_region();
1810    let s3 = d.s3.clone().unwrap_or_default();
1811    let (bucket, _) = d.source.s3_bucket()?;
1812
1813    let mut opts = HashMap::new();
1814    opts.insert("AWS_REGION".into(), region);
1815    if let Some(ep) = s3.effective_endpoint(bucket) {
1816        opts.insert("AWS_ENDPOINT_URL".into(), ep);
1817    }
1818    if s3.allow_http {
1819        opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1820    }
1821    opts.insert(
1822        "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1823        (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1824    );
1825    if let Some(k) = creds.access_key_id {
1826        opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1827    }
1828    if let Some(s) = creds.secret_access_key {
1829        opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1830    }
1831    if let Some(t) = creds.session_token {
1832        opts.insert("AWS_SESSION_TOKEN".into(), t);
1833    }
1834    // Read-only paths don't need the S3 lock-provider plumbing.
1835    opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1836    Ok(opts)
1837}
1838
1839/// Construct an `AmazonS3` object_store from the dataset's `[dataset.s3]`
1840/// block + resolved credentials and register it on `ctx` under
1841/// `s3://bucket/`.
1842fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1843    let (bucket, _key) = d.source.s3_bucket()?;
1844    let creds = d.resolved_creds();
1845    let region = d.resolved_region();
1846    let s3 = d.s3.clone().unwrap_or_default();
1847
1848    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1849        AppError::Internal(format!(
1850            "dataset '{}': build S3 store for '{bucket}': {e}",
1851            d.name
1852        ))
1853    })?;
1854
1855    let url = Url::parse(&format!("s3://{bucket}"))
1856        .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1857    ctx.register_object_store(&url, Arc::new(store));
1858    Ok(())
1859}
1860
1861/// Best-effort detection of an S3 authorization failure (HTTP 403) from an
1862/// error message. object_store, the AWS SDK, deltalake, and MinIO all
1863/// surface a denied read as some mix of "Access Denied" / "AccessDenied"
1864/// (the S3 error code), "Forbidden", or a bare "403" in the rendered error
1865/// chain. Matched case-insensitively. Only consulted for sources we already
1866/// know are S3, so a stray "forbidden" in unrelated text can't misfire.
1867fn is_s3_access_denied(msg: &str) -> bool {
1868    let low = msg.to_lowercase();
1869    low.contains("access denied")
1870        || low.contains("accessdenied")
1871        || low.contains("forbidden")
1872        || low.contains("403")
1873}
1874
1875
1876///
1877/// Local sources are sized with a cheap filesystem stat (`estimate_local_bytes`);
1878/// S3 sources are sized by listing the object store under their prefix. S3
1879/// sizing is best-effort: a listing failure logs a warning and returns `None`
1880/// (don't force) so a transient S3 error never blocks startup.
1881async fn should_force_lazy(d: &DatasetConfig, server: &ServerConfig) -> Option<u64> {
1882    if d.lazy || server.force_lazy_above_mb == 0 {
1883        return None;
1884    }
1885    let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1886
1887    let bytes = if d.source.is_s3() {
1888        match estimate_s3_bytes(d).await {
1889            Ok(b) => b,
1890            Err(e) => {
1891                log::warn!(
1892                    "dataset '{}': could not measure S3 size for force_lazy_above_mb: {e}",
1893                    d.name
1894                );
1895                return None;
1896            }
1897        }
1898    } else {
1899        d.estimate_local_bytes()?
1900    };
1901
1902    (bytes > threshold).then_some(bytes)
1903}
1904
1905/// Sum the byte size of a dataset's S3 backing data by listing the object
1906/// store under the source prefix and adding up every `.parquet` object.
1907///
1908/// Works for both parquet and delta sources: delta data files are parquet
1909/// objects under the table root, so listing the root and counting parquet
1910/// gives a coarse (possibly over-counting un-vacuumed files) but cheap size
1911/// suitable for the force-lazy gate. Any glob wildcard tail is stripped so the
1912/// listing starts at the non-wildcard base prefix.
1913async fn estimate_s3_bytes(d: &DatasetConfig) -> Result<u64, AppError> {
1914    use futures_util::StreamExt;
1915    use object_store::ObjectStore;
1916
1917    let (bucket, _key) = d.source.s3_bucket()?;
1918    let creds = d.resolved_creds();
1919    let region = d.resolved_region();
1920    let s3 = d.s3.clone().unwrap_or_default();
1921    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1922        AppError::Internal(format!(
1923            "dataset '{}': build S3 store for '{bucket}': {e}",
1924            d.name
1925        ))
1926    })?;
1927
1928    // Strip any glob wildcard tail, then reduce the base to a bucket-relative
1929    // key for object_store's `list` (which takes a key prefix, not a URL).
1930    let (base, _keys) = split_glob_base_keys(&d.source.location);
1931    let prefix_key = base
1932        .strip_prefix("s3://")
1933        .and_then(|rest| rest.split_once('/').map(|(_bucket, key)| key))
1934        .unwrap_or("")
1935        .trim_end_matches('/');
1936    let prefix =
1937        (!prefix_key.is_empty()).then(|| object_store::path::Path::from(prefix_key));
1938
1939    let mut total: u64 = 0;
1940    let mut stream = store.list(prefix.as_ref());
1941    while let Some(meta) = stream.next().await {
1942        let meta = meta.map_err(|e| {
1943            AppError::Internal(format!(
1944                "dataset '{}': s3 list under '{prefix_key}': {e}",
1945                d.name
1946            ))
1947        })?;
1948        if meta.location.as_ref().ends_with(".parquet") {
1949            total = total.saturating_add(meta.size);
1950        }
1951    }
1952    Ok(total)
1953}
1954
1955fn build_s3(
1956    bucket: &str,
1957    region: &str,
1958    s3: &S3Config,
1959    creds: &ResolvedCreds,
1960) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1961    let mut b = AmazonS3Builder::new()
1962        .with_bucket_name(bucket)
1963        .with_region(region)
1964        .with_allow_http(s3.allow_http)
1965        .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1966    if let Some(ep) = s3.effective_endpoint(bucket) {
1967        b = b.with_endpoint(ep);
1968    }
1969    if let Some(k) = creds.access_key_id.as_deref() {
1970        b = b.with_access_key_id(k);
1971    }
1972    if let Some(s) = creds.secret_access_key.as_deref() {
1973        b = b.with_secret_access_key(s);
1974    }
1975    if let Some(t) = creds.session_token.as_deref() {
1976        b = b.with_token(t);
1977    }
1978    b.build()
1979}
1980
1981fn arrow_to_logical(dt: &DataType) -> LogicalType {
1982    match dt {
1983        DataType::Boolean => LogicalType::Bool,
1984        DataType::Int8
1985        | DataType::Int16
1986        | DataType::Int32
1987        | DataType::Int64
1988        | DataType::UInt8
1989        | DataType::UInt16
1990        | DataType::UInt32
1991        | DataType::UInt64 => LogicalType::Int,
1992        DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1993        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1994        // Dictionary-encoded strings are reported as plain strings — clients
1995        // (and the rest of the backend) shouldn't have to care that we keep
1996        // a compressed representation in memory.
1997        DataType::Dictionary(_, v)
1998            if matches!(
1999                v.as_ref(),
2000                DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
2001            ) =>
2002        {
2003            LogicalType::Utf8
2004        }
2005        DataType::Date32
2006        | DataType::Date64
2007        | DataType::Time32(_)
2008        | DataType::Time64(_)
2009        | DataType::Timestamp(_, _)
2010        | DataType::Duration(_)
2011        | DataType::Interval(_) => LogicalType::Temporal,
2012        _ => LogicalType::Other,
2013    }
2014}
2015
2016// ---------------------------------------------------------------------------
2017// Per-batch projection
2018// ---------------------------------------------------------------------------
2019
2020fn project(
2021    schema: &DatasetSchema,
2022    batch: RecordBatch,
2023    columns: &[String],
2024) -> Result<RecordBatch, AppError> {
2025    if columns.is_empty() {
2026        return Ok(batch);
2027    }
2028    let indices: Vec<usize> = columns
2029        .iter()
2030        .map(|c| {
2031            schema
2032                .find(c)
2033                .map(|info| schema.by_name[&info.name.to_lowercase()])
2034        })
2035        .collect::<Result<_, _>>()?;
2036    let fields: Vec<Field> = indices
2037        .iter()
2038        .map(|&i| batch.schema().field(i).clone())
2039        .collect();
2040    let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
2041    Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
2042}
2043
2044// ---------------------------------------------------------------------------
2045// SQL builder
2046// ---------------------------------------------------------------------------
2047
2048/// Accumulates the typed literal values for a parameterised query.
2049///
2050/// Predicate values are never interpolated into the SQL text. Instead each
2051/// value is pushed here and the builder emits a positional placeholder
2052/// (`$1`, `$2`, …) referencing it. The collected [`ScalarValue`]s are bound
2053/// to the logical plan via [`DataFrame::with_param_values`], so user input
2054/// reaches the engine as typed scalars and can never alter the query
2055/// structure (no SQL injection surface, no escaping to get wrong).
2056#[derive(Default)]
2057struct Params {
2058    values: Vec<ScalarValue>,
2059}
2060
2061impl Params {
2062    fn new() -> Self {
2063        Self::default()
2064    }
2065
2066    /// Bind `v` and return its `$N` placeholder token.
2067    fn bind(&mut self, v: ScalarValue) -> String {
2068        self.values.push(v);
2069        format!("${}", self.values.len())
2070    }
2071
2072    fn into_values(self) -> Vec<ScalarValue> {
2073        self.values
2074    }
2075}
2076
2077fn build_query_sql(
2078    schema: &DatasetSchema,
2079    req: &QueryRequest,
2080    max_page_size: u64,
2081) -> Result<(String, Vec<ScalarValue>), AppError> {
2082    let (limit, offset) = req.effective_limit_offset(max_page_size);
2083    build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
2084}
2085
2086fn build_query_stream_sql(
2087    schema: &DatasetSchema,
2088    req: &QueryRequest,
2089) -> Result<(String, Vec<ScalarValue>), AppError> {
2090    let suffix = req
2091        .limit
2092        .map(|limit| format!(" LIMIT {limit}"))
2093        .unwrap_or_default();
2094    build_query_sql_with_suffix(schema, req, &suffix)
2095}
2096
2097fn build_query_sql_with_suffix(
2098    schema: &DatasetSchema,
2099    req: &QueryRequest,
2100    suffix: &str,
2101) -> Result<(String, Vec<ScalarValue>), AppError> {
2102    let agg_plan = req.agg_plan(schema)?;
2103
2104    let cols = if let Some(plan) = &agg_plan {
2105        // Group cols, then aggregations, each aliased to the JSON output key.
2106        let mut parts: Vec<String> = plan
2107            .group_cols
2108            .iter()
2109            .map(|c| DatasetSchema::quote_ident(c))
2110            .collect();
2111        for a in &plan.aggs {
2112            let expr = a.sql_expr()?;
2113            parts.push(format!(
2114                "{expr} AS {}",
2115                DatasetSchema::quote_ident(&a.alias)
2116            ));
2117        }
2118        parts.join(", ")
2119    } else if req.columns.is_empty() {
2120        if req.distinct {
2121            "DISTINCT *".to_string()
2122        } else {
2123            "*".to_string()
2124        }
2125    } else {
2126        let list = req
2127            .columns
2128            .iter()
2129            .map(|c| {
2130                schema
2131                    .find(c)
2132                    .map(|info| DatasetSchema::quote_ident(&info.name))
2133            })
2134            .collect::<Result<Vec<_>, _>>()?
2135            .join(", ");
2136        if req.distinct {
2137            format!("DISTINCT {list}")
2138        } else {
2139            list
2140        }
2141    };
2142
2143    let mut params = Params::new();
2144    let clauses: Vec<String> = req
2145        .predicates
2146        .iter()
2147        .map(|p| pred_to_sql(schema, p, &mut params))
2148        .collect::<Result<_, _>>()?;
2149
2150    let table = DatasetSchema::quote_ident(&schema.name);
2151    let where_clause = if clauses.is_empty() {
2152        String::new()
2153    } else {
2154        format!(" WHERE {}", clauses.join(" AND "))
2155    };
2156    let group_clause = match &agg_plan {
2157        Some(p) => format!(
2158            " GROUP BY {}",
2159            p.group_cols
2160                .iter()
2161                .map(|c| DatasetSchema::quote_ident(c))
2162                .collect::<Vec<_>>()
2163                .join(", "),
2164        ),
2165        None => String::new(),
2166    };
2167    let having_clause = {
2168        let resolved = req.having_plan(agg_plan.as_ref())?;
2169        if resolved.is_empty() {
2170            String::new()
2171        } else {
2172            let clauses: Vec<String> = resolved
2173                .iter()
2174                .map(|(lhs, p)| pred_to_sql_with_lhs(lhs, p, &mut params))
2175                .collect::<Result<_, _>>()?;
2176            format!(" HAVING {}", clauses.join(" AND "))
2177        }
2178    };
2179    let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
2180        Some(s) => format!(" ORDER BY {s}"),
2181        None => String::new(),
2182    };
2183    let sql =
2184        format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{having_clause}{order_clause}{suffix}");
2185    Ok((sql, params.into_values()))
2186}
2187
2188fn build_count_sql(
2189    schema: &DatasetSchema,
2190    predicates: &[Predicate],
2191) -> Result<(String, Vec<ScalarValue>), AppError> {
2192    let mut params = Params::new();
2193    let clauses: Vec<String> = predicates
2194        .iter()
2195        .map(|p| pred_to_sql(schema, p, &mut params))
2196        .collect::<Result<_, _>>()?;
2197    let table = DatasetSchema::quote_ident(&schema.name);
2198    let where_clause = if clauses.is_empty() {
2199        String::new()
2200    } else {
2201        format!(" WHERE {}", clauses.join(" AND "))
2202    };
2203    let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
2204    Ok((sql, params.into_values()))
2205}
2206
2207fn pred_to_sql(
2208    schema: &DatasetSchema,
2209    pred: &Predicate,
2210    params: &mut Params,
2211) -> Result<String, AppError> {
2212    let info = schema.find(&pred.col)?;
2213    let col = DatasetSchema::quote_ident(&info.name);
2214    pred_to_sql_with_lhs(&col, pred, params)
2215}
2216
2217/// Render a predicate against a pre-resolved left-hand-side SQL
2218/// expression. The dataset-`WHERE` path resolves a quoted column name as
2219/// the LHS (see [`pred_to_sql`]); the `HAVING` path passes an aggregate
2220/// expression such as `COUNT(*)` instead. Both share the operator and
2221/// value-binding logic here.
2222fn pred_to_sql_with_lhs(
2223    col: &str,
2224    pred: &Predicate,
2225    params: &mut Params,
2226) -> Result<String, AppError> {
2227    match pred.op.as_str() {
2228        "is_null" => return Ok(format!("{col} IS NULL")),
2229        "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
2230        _ => {}
2231    }
2232
2233    let val = pred
2234        .val
2235        .as_ref()
2236        .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
2237
2238    if pred.op == "in" {
2239        let items = val
2240            .as_array()
2241            .filter(|a| !a.is_empty())
2242            .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
2243        let placeholders: Vec<String> = items
2244            .iter()
2245            .map(|item| Ok(params.bind(json_to_scalar(item)?)))
2246            .collect::<Result<_, AppError>>()?;
2247        return Ok(format!("{col} IN ({})", placeholders.join(", ")));
2248    }
2249
2250    let sql_op = match pred.op.as_str() {
2251        "eq" => "=",
2252        "neq" => "!=",
2253        "gt" => ">",
2254        "gte" => ">=",
2255        "lt" => "<",
2256        "lte" => "<=",
2257        "like" => "LIKE",
2258        "ilike" => "ILIKE",
2259        other => return Err(AppError::UnknownOperator(other.into())),
2260    };
2261    let placeholder = params.bind(json_to_scalar(val)?);
2262    Ok(format!("{col} {sql_op} {placeholder}"))
2263}
2264
2265/// Convert a JSON predicate value into a typed Arrow [`ScalarValue`] for
2266/// binding as a query parameter. The engine applies the usual numeric
2267/// widening / comparison coercion against the target column type.
2268fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
2269    match val {
2270        JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
2271        JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
2272        JsonValue::Null => Ok(ScalarValue::Null),
2273        JsonValue::Number(n) => {
2274            if let Some(i) = n.as_i64() {
2275                Ok(ScalarValue::Int64(Some(i)))
2276            } else if let Some(u) = n.as_u64() {
2277                Ok(ScalarValue::UInt64(Some(u)))
2278            } else if let Some(f) = n.as_f64() {
2279                Ok(ScalarValue::Float64(Some(f)))
2280            } else {
2281                Err(AppError::InvalidValue(
2282                    "unsupported numeric literal in predicate".into(),
2283                ))
2284            }
2285        }
2286        _ => Err(AppError::InvalidValue(
2287            "unsupported literal type in predicate".into(),
2288        )),
2289    }
2290}
2291
2292// ---------------------------------------------------------------------------
2293// Equality index — built once at startup, queried on every predicate request
2294// ---------------------------------------------------------------------------
2295
2296fn json_index_key(val: &JsonValue) -> Option<String> {
2297    match val {
2298        JsonValue::String(s) => Some(s.clone()),
2299        JsonValue::Number(n) => Some(n.to_string()),
2300        JsonValue::Bool(b) => Some(b.to_string()),
2301        _ => None,
2302    }
2303}
2304
2305fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2306    let mut out = Vec::new();
2307    let (mut i, mut j) = (0, 0);
2308    while i < a.len() && j < b.len() {
2309        match a[i].cmp(&b[j]) {
2310            Ordering::Equal => {
2311                out.push(a[i]);
2312                i += 1;
2313                j += 1;
2314            }
2315            Ordering::Less => i += 1,
2316            Ordering::Greater => j += 1,
2317        }
2318    }
2319    out
2320}
2321
2322fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2323    let mut out = Vec::with_capacity(a.len() + b.len());
2324    let (mut i, mut j) = (0, 0);
2325    while i < a.len() && j < b.len() {
2326        match a[i].cmp(&b[j]) {
2327            Ordering::Less => {
2328                out.push(a[i]);
2329                i += 1;
2330            }
2331            Ordering::Greater => {
2332                out.push(b[j]);
2333                j += 1;
2334            }
2335            Ordering::Equal => {
2336                out.push(a[i]);
2337                i += 1;
2338                j += 1;
2339            }
2340        }
2341    }
2342    out.extend_from_slice(&a[i..]);
2343    out.extend_from_slice(&b[j..]);
2344    out
2345}
2346
2347fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2348    if predicates.is_empty() || index.is_empty() {
2349        return None;
2350    }
2351
2352    // Fast path: a single `eq` predicate resolves to exactly one index bucket,
2353    // so borrow it directly instead of cloning the row-id vector.
2354    if let [pred] = predicates
2355        && pred.op.as_str() == "eq"
2356    {
2357        let col_lower = pred.col.to_lowercase();
2358        let col_map = index.get(&col_lower)?;
2359        let key = json_index_key(pred.val.as_ref()?)?;
2360        return Some(match col_map.get(&key) {
2361            Some(rows) => Cow::Borrowed(rows.as_slice()),
2362            None => Cow::Owned(Vec::new()),
2363        });
2364    }
2365
2366    let mut result: Option<Vec<u32>> = None;
2367    for pred in predicates {
2368        let col_lower = pred.col.to_lowercase();
2369        let col_map = index.get(&col_lower)?;
2370
2371        let rows: Vec<u32> = match pred.op.as_str() {
2372            "eq" => {
2373                let key = json_index_key(pred.val.as_ref()?)?;
2374                col_map.get(&key).cloned().unwrap_or_default()
2375            }
2376            "in" => {
2377                let items = pred.val.as_ref()?.as_array()?;
2378                let mut merged: Vec<u32> = Vec::new();
2379                for item in items {
2380                    if let Some(r) = col_map.get(&json_index_key(item)?) {
2381                        merged = union_sorted(&merged, r);
2382                    }
2383                }
2384                merged
2385            }
2386            _ => return None,
2387        };
2388
2389        result = Some(match result {
2390            None => rows,
2391            Some(r) => intersect_sorted(&r, &rows),
2392        });
2393    }
2394    result.map(Cow::Owned)
2395}
2396
2397/// Benchmark-only hooks for the equality-index hot path. Hidden from docs and
2398/// not part of the stable API — exposed solely so `benches/index.rs` can build
2399/// an `EqIndex` and exercise `try_index` directly.
2400#[doc(hidden)]
2401pub mod bench {
2402    use super::{EqIndex, FastMap, json_index_key, try_index};
2403    use datapress_core::models::Predicate;
2404    use serde_json::Value as JsonValue;
2405    use std::borrow::Cow;
2406
2407    /// Opaque equality index wrapper for benches (the inner alias is private).
2408    pub struct BenchIndex(EqIndex);
2409
2410    /// Build an index with a single `col` whose `val` bucket holds `rows`.
2411    /// The bucket key is derived with the same `json_index_key` the query path
2412    /// uses, so a matching `eq` predicate is guaranteed to hit.
2413    pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
2414        let key = json_index_key(val).expect("benchable index key");
2415        let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
2416        col_map.insert(key, rows);
2417        let mut index: EqIndex = EqIndex::default();
2418        index.insert(col.to_string(), col_map);
2419        BenchIndex(index)
2420    }
2421
2422    /// Resolve `predicates` against the index — the timed operation.
2423    pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2424        try_index(&idx.0, predicates)
2425    }
2426
2427    /// Reference implementation of the pre-`Cow` behaviour: always clones the
2428    /// matched bucket into an owned `Vec` (what `try_index` did before the
2429    /// single-`eq` borrow fast path). Used as the `clone-before` baseline so
2430    /// the borrow win is measured against the old allocation cost in-process.
2431    pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
2432        let [pred] = predicates else { return None };
2433        if pred.op.as_str() != "eq" {
2434            return None;
2435        }
2436        let col_lower = pred.col.to_lowercase();
2437        let col_map = idx.0.get(&col_lower)?;
2438        let key = json_index_key(pred.val.as_ref()?)?;
2439        Some(col_map.get(&key).cloned().unwrap_or_default())
2440    }
2441}
2442
2443/// Return rows `[offset, offset+limit)` from a chunked dataset by slicing
2444/// the underlying batches (zero-copy) and concatenating the (small) page.
2445fn slice_global(
2446    chunks: &[RecordBatch],
2447    schema: &Arc<Schema>,
2448    offset: usize,
2449    limit: usize,
2450) -> Result<RecordBatch, AppError> {
2451    if limit == 0 || chunks.is_empty() {
2452        return Ok(RecordBatch::new_empty(schema.clone()));
2453    }
2454    let mut out = Vec::new();
2455    let mut to_skip = offset;
2456    let mut remaining = limit;
2457    for b in chunks {
2458        if remaining == 0 {
2459            break;
2460        }
2461        let n = b.num_rows();
2462        if to_skip >= n {
2463            to_skip -= n;
2464            continue;
2465        }
2466        let take = remaining.min(n - to_skip);
2467        out.push(b.slice(to_skip, take));
2468        to_skip = 0;
2469        remaining -= take;
2470    }
2471    if out.is_empty() {
2472        return Ok(RecordBatch::new_empty(schema.clone()));
2473    }
2474    compute::concat_batches(schema, out.iter()).map_err(AppError::from)
2475}
2476
2477/// Materialise the page `rows[offset..offset+limit]` from a chunked dataset.
2478/// Row ids are global (across the concatenation of all chunks). We map each
2479/// requested row to its (chunk, local-index), `take` per chunk, then stitch
2480/// the per-chunk results back together preserving the original row order.
2481fn take_page(
2482    chunks: &[RecordBatch],
2483    schema: &Arc<Schema>,
2484    rows: &[u32],
2485    offset: usize,
2486    limit: usize,
2487) -> Result<RecordBatch, AppError> {
2488    let start = offset.min(rows.len());
2489    let len = limit.min(rows.len() - start);
2490    if len == 0 || chunks.is_empty() {
2491        return Ok(RecordBatch::new_empty(schema.clone()));
2492    }
2493
2494    // Prefix-sum table: `offsets[i]` is the first global row id of chunk `i`,
2495    // and `offsets.last()` is the total row count.
2496    let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
2497    let mut acc: u32 = 0;
2498    offsets.push(0);
2499    for b in chunks {
2500        acc = acc
2501            .checked_add(b.num_rows() as u32)
2502            .expect("row count exceeds u32::MAX");
2503        offsets.push(acc);
2504    }
2505
2506    // Bucket each global row id into the chunk that contains it, remembering
2507    // the original output position so we can restore page order at the end.
2508    let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
2509    for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
2510        let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
2511        let local = gid - offsets[bi];
2512        buckets[bi].push((out_pos as u32, local));
2513    }
2514
2515    // Per-chunk take, recording the destination index for each emitted row.
2516    let mut takens: Vec<RecordBatch> = Vec::new();
2517    let mut dest: Vec<u32> = Vec::with_capacity(len);
2518    for (bi, bucket) in buckets.iter().enumerate() {
2519        if bucket.is_empty() {
2520            continue;
2521        }
2522        let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
2523        let cols: Vec<ArrayRef> = chunks[bi]
2524            .columns()
2525            .iter()
2526            .map(|c| {
2527                arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
2528                    .map_err(AppError::from)
2529            })
2530            .collect::<Result<_, _>>()?;
2531        takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
2532        dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
2533    }
2534
2535    // Stitch per-chunk results then permute to restore the requested order.
2536    let stitched = compute::concat_batches(schema, takens.iter())?;
2537    let mut inv = vec![0u32; len];
2538    for (i, &d) in dest.iter().enumerate() {
2539        inv[d as usize] = i as u32;
2540    }
2541    let perm = UInt32Array::from(inv);
2542    let cols: Vec<ArrayRef> = stitched
2543        .columns()
2544        .iter()
2545        .map(|c| {
2546            arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
2547                .map_err(AppError::from)
2548        })
2549        .collect::<Result<_, _>>()?;
2550    RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
2551}
2552
2553/// Build the equality index per the dataset's policy, against the chunked
2554/// representation. Row ids are global across the concatenation of all
2555/// chunks (so they remain compatible with `take_page` / `slice_global`).
2556fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
2557    use rayon::prelude::*;
2558
2559    if cfg.mode == IndexMode::None || chunks.is_empty() {
2560        return EqIndex::default();
2561    }
2562
2563    let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
2564        Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
2565    } else {
2566        None
2567    };
2568
2569    let max_card = if cfg.mode == IndexMode::Auto {
2570        Some(cfg.max_cardinality)
2571    } else {
2572        None
2573    };
2574
2575    // Per-chunk starting global row id.
2576    let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
2577    let mut acc: u32 = 0;
2578    for b in chunks {
2579        batch_offsets.push(acc);
2580        acc = acc
2581            .checked_add(b.num_rows() as u32)
2582            .expect("row count exceeds u32::MAX");
2583    }
2584
2585    let schema = chunks[0].schema();
2586
2587    schema
2588        .fields()
2589        .par_iter()
2590        .enumerate()
2591        .filter_map(|(ci, field)| {
2592            let col_lower = field.name().to_lowercase();
2593            if let Some(a) = &allow
2594                && !a.contains_key(&col_lower)
2595            {
2596                return None;
2597            }
2598
2599            // Only build for index-friendly types; skip everything else
2600            // up-front so we don't pay the per-chunk dispatch cost.
2601            let dtype = field.data_type();
2602            let dict_utf8 = matches!(dtype,
2603                DataType::Dictionary(k, v)
2604                    if matches!(k.as_ref(), DataType::Int32)
2605                    && matches!(v.as_ref(), DataType::Utf8));
2606            match dtype {
2607                DataType::Utf8
2608                | DataType::Utf8View
2609                | DataType::Boolean
2610                | DataType::Int8
2611                | DataType::Int16
2612                | DataType::Int32
2613                | DataType::Int64 => {}
2614                _ if dict_utf8 => {}
2615                _ => return None,
2616            }
2617
2618            let mut map: FastMap<String, Vec<u32>> = FastMap::default();
2619
2620            for (bi, batch) in chunks.iter().enumerate() {
2621                let base = batch_offsets[bi];
2622                let col = batch.column(ci);
2623
2624                macro_rules! index_col {
2625                    ($arr_ty:ty) => {{
2626                        let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2627                        for row in 0..arr.len() {
2628                            if arr.is_null(row) {
2629                                continue;
2630                            }
2631                            let key = arr.value(row).to_string();
2632                            let gid = base + row as u32;
2633                            if let Some(v) = map.get_mut(&key) {
2634                                v.push(gid);
2635                            } else {
2636                                if let Some(mc) = max_card {
2637                                    if map.len() >= mc {
2638                                        return None;
2639                                    }
2640                                }
2641                                map.insert(key, vec![gid]);
2642                            }
2643                        }
2644                    }};
2645                }
2646
2647                if dict_utf8 {
2648                    // Dictionary(Int32, Utf8): iterate keys + look up the
2649                    // string value from the (small) dictionary. We allocate
2650                    // the key string only when the value is new — repeated
2651                    // values reuse the existing HashMap entry by hash, but
2652                    // `HashMap::get_mut` still needs the key, so we use a
2653                    // borrowed lookup via `get` first to avoid the alloc.
2654                    let arr = col
2655                        .as_any()
2656                        .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2657                        )?;
2658                    let keys = arr.keys();
2659                    let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2660                    for row in 0..arr.len() {
2661                        if arr.is_null(row) {
2662                            continue;
2663                        }
2664                        let k = keys.value(row) as usize;
2665                        let s = values.value(k);
2666                        let gid = base + row as u32;
2667                        if let Some(v) = map.get_mut(s) {
2668                            v.push(gid);
2669                        } else {
2670                            if let Some(mc) = max_card
2671                                && map.len() >= mc
2672                            {
2673                                return None;
2674                            }
2675                            map.insert(s.to_string(), vec![gid]);
2676                        }
2677                    }
2678                } else {
2679                    match dtype {
2680                        DataType::Utf8 => index_col!(StringArray),
2681                        DataType::Utf8View => index_col!(StringViewArray),
2682                        DataType::Boolean => index_col!(BooleanArray),
2683                        DataType::Int8 => index_col!(Int8Array),
2684                        DataType::Int16 => index_col!(Int16Array),
2685                        DataType::Int32 => index_col!(Int32Array),
2686                        DataType::Int64 => index_col!(Int64Array),
2687                        _ => unreachable!(),
2688                    }
2689                }
2690            }
2691
2692            Some((col_lower, map))
2693        })
2694        .collect()
2695}
2696
2697// ---------------------------------------------------------------------------
2698// Serialise-time temporal cast: convert Timestamp/Date/Time columns to Utf8
2699// on the page batch right before JSON encoding. We deliberately do **not**
2700// pay this cost at load time — a `Date32` is 4 bytes per row, its ISO-8601
2701// rendering is ~10–24 bytes per row, and a wide dataset full of temporal
2702// columns would balloon resident RAM. The cast is applied per returned page
2703// after pagination, so the cost is paid only for rows the caller requested.
2704// ---------------------------------------------------------------------------
2705
2706/// Returns true for Arrow types that `write_value` can render directly. Any
2707/// type returning false is pre-cast to Utf8 in [`cast_for_serialize`] so the
2708/// JSON output is faithful rather than silently `null`.
2709fn writable_inline(dt: &DataType) -> bool {
2710    match dt {
2711        DataType::Utf8
2712        | DataType::LargeUtf8
2713        | DataType::Utf8View
2714        | DataType::Boolean
2715        | DataType::Int8
2716        | DataType::Int16
2717        | DataType::Int32
2718        | DataType::Int64
2719        | DataType::UInt8
2720        | DataType::UInt16
2721        | DataType::UInt32
2722        | DataType::UInt64
2723        | DataType::Float32
2724        | DataType::Float64
2725        | DataType::Decimal128(_, _)
2726        | DataType::Decimal256(_, _) => true,
2727        DataType::Dictionary(k, v)
2728            if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2729        {
2730            true
2731        }
2732        _ => false,
2733    }
2734}
2735
2736/// Cast any column whose dtype isn't directly writable by `write_value` to
2737/// `Utf8`, on the bounded page batch. Covers temporals (Timestamp/Date/Time)
2738/// — kept native in resident memory to save RAM — and also any exotic dtype
2739/// (Float16, Binary, List, Struct, Decimal-with-unsupported-precision, …)
2740/// so the JSON serializer never falls back to writing literal `null`.
2741fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2742    let schema = batch.schema();
2743    let to_cast: Vec<usize> = schema
2744        .fields()
2745        .iter()
2746        .enumerate()
2747        .filter_map(|(i, f)| {
2748            if writable_inline(f.data_type()) {
2749                None
2750            } else {
2751                Some(i)
2752            }
2753        })
2754        .collect();
2755    if to_cast.is_empty() {
2756        return Ok(batch.clone());
2757    }
2758    let new_fields: Vec<Field> = schema
2759        .fields()
2760        .iter()
2761        .enumerate()
2762        .map(|(i, f)| {
2763            if to_cast.contains(&i) {
2764                Field::new(f.name(), DataType::Utf8, f.is_nullable())
2765            } else {
2766                f.as_ref().clone()
2767            }
2768        })
2769        .collect();
2770    let new_schema = Arc::new(Schema::new(new_fields));
2771    let cols: Vec<ArrayRef> = batch
2772        .columns()
2773        .iter()
2774        .enumerate()
2775        .map(|(i, c)| {
2776            if to_cast.contains(&i) {
2777                compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2778            } else {
2779                Ok(c.clone())
2780            }
2781        })
2782        .collect::<Result<_, _>>()?;
2783    RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2784}
2785
2786// ---------------------------------------------------------------------------
2787// Compute helpers — retained for symmetry; reserved for future inline scan
2788// path. Currently the engine fallback handles all non-index queries.
2789// ---------------------------------------------------------------------------
2790
2791#[allow(dead_code)]
2792#[derive(Clone, Copy)]
2793enum CmpOp {
2794    Eq,
2795    Neq,
2796    Gt,
2797    Gte,
2798    Lt,
2799    Lte,
2800    Like,
2801    ILike,
2802}
2803
2804#[allow(dead_code)]
2805fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2806    let arr = col
2807        .as_any()
2808        .downcast_ref::<StringArray>()
2809        .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2810    let s = Scalar::new(StringArray::from(vec![val]));
2811    Ok(eq(arr, &s)?)
2812}
2813
2814#[allow(dead_code)]
2815fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2816    macro_rules! num_cmp {
2817        ($arr_type:ty, $cast:ty) => {{
2818            let n = val
2819                .as_f64()
2820                .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2821                as $cast;
2822            let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2823            let s = Scalar::new(<$arr_type>::from(vec![n]));
2824            Ok(match op {
2825                CmpOp::Eq => eq(arr, &s)?,
2826                CmpOp::Neq => neq(arr, &s)?,
2827                CmpOp::Gt => gt(arr, &s)?,
2828                CmpOp::Gte => gt_eq(arr, &s)?,
2829                CmpOp::Lt => lt(arr, &s)?,
2830                CmpOp::Lte => lt_eq(arr, &s)?,
2831                CmpOp::Like | CmpOp::ILike => {
2832                    return Err(AppError::InvalidValue(
2833                        "LIKE requires a string column".into(),
2834                    ));
2835                }
2836            })
2837        }};
2838    }
2839    match col.data_type() {
2840        DataType::Utf8 => {
2841            let s = val
2842                .as_str()
2843                .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2844            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2845            let sc = Scalar::new(StringArray::from(vec![s]));
2846            Ok(match op {
2847                CmpOp::Eq => eq(arr, &sc)?,
2848                CmpOp::Neq => neq(arr, &sc)?,
2849                CmpOp::Gt => gt(arr, &sc)?,
2850                CmpOp::Gte => gt_eq(arr, &sc)?,
2851                CmpOp::Lt => lt(arr, &sc)?,
2852                CmpOp::Lte => lt_eq(arr, &sc)?,
2853                CmpOp::Like => compute::like(arr, &sc)?,
2854                CmpOp::ILike => compute::ilike(arr, &sc)?,
2855            })
2856        }
2857        DataType::Int8 => num_cmp!(Int8Array, i8),
2858        DataType::Int16 => num_cmp!(Int16Array, i16),
2859        DataType::Int32 => num_cmp!(Int32Array, i32),
2860        DataType::Int64 => num_cmp!(Int64Array, i64),
2861        DataType::Float32 => num_cmp!(Float32Array, f32),
2862        DataType::Float64 => num_cmp!(Float64Array, f64),
2863        dt => Err(AppError::InvalidValue(format!(
2864            "unsupported type for comparison: {dt:?}"
2865        ))),
2866    }
2867}
2868
2869// ---------------------------------------------------------------------------
2870// Serialisation
2871// ---------------------------------------------------------------------------
2872
2873pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2874    // Temporal columns are kept native in resident memory (compact). Cast
2875    // them — plus any other dtype `write_value` can't render directly — to
2876    // Utf8 here, on the bounded page batch, so the JSON output is faithful
2877    // without paying the load-time RAM cost.
2878    let batch = cast_for_serialize(batch)?;
2879    let schema = batch.schema();
2880    let n_rows = batch.num_rows();
2881
2882    let keys: Vec<Vec<u8>> = schema
2883        .fields()
2884        .iter()
2885        .map(|f| {
2886            let mut k = Vec::with_capacity(f.name().len() + 3);
2887            k.push(b'"');
2888            k.extend_from_slice(f.name().as_bytes());
2889            k.extend_from_slice(b"\":");
2890            k
2891        })
2892        .collect();
2893
2894    // Resolve every column's concrete Arrow array type exactly ONCE here,
2895    // instead of paying a `data_type()` match + `downcast_ref` for every
2896    // single cell. The inner row loop then dispatches over a small enum,
2897    // which the compiler turns into a cheap jump table.
2898    let encoders: Vec<ColEnc> = batch
2899        .columns()
2900        .iter()
2901        .map(|c| ColEnc::new(c.as_ref()))
2902        .collect();
2903
2904    let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2905    let mut itoa_buf = itoa::Buffer::new();
2906    let mut ryu_buf = ryu::Buffer::new();
2907    buf.push(b'[');
2908
2909    for row in 0..n_rows {
2910        if row > 0 {
2911            buf.push(b',');
2912        }
2913        buf.push(b'{');
2914        for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2915            if i > 0 {
2916                buf.push(b',');
2917            }
2918            buf.extend_from_slice(key);
2919            enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2920        }
2921        buf.push(b'}');
2922    }
2923
2924    buf.push(b']');
2925    Ok(unsafe { String::from_utf8_unchecked(buf) })
2926}
2927
2928/// Per-column JSON encoder. The concrete Arrow array type is resolved once
2929/// (in [`ColEnc::new`]) so the hot row loop dispatches over this small enum
2930/// instead of repeating `data_type()` matching + `downcast_ref` for every
2931/// cell. Each variant borrows the already-downcast typed array.
2932enum ColEnc<'a> {
2933    Utf8(&'a StringArray),
2934    LargeUtf8(&'a LargeStringArray),
2935    Utf8View(&'a StringViewArray),
2936    /// Dictionary-encoded UTF-8 (Int32 keys). Holds the keys array and the
2937    /// pre-downcast string values so neither is re-resolved per row.
2938    DictI32Utf8(
2939        &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2940        &'a StringArray,
2941    ),
2942    Bool(&'a BooleanArray),
2943    I8(&'a Int8Array),
2944    I16(&'a Int16Array),
2945    I32(&'a Int32Array),
2946    I64(&'a Int64Array),
2947    U8(&'a UInt8Array),
2948    U16(&'a UInt16Array),
2949    U32(&'a UInt32Array),
2950    U64(&'a UInt64Array),
2951    Dec128(&'a Decimal128Array),
2952    Dec256(&'a Decimal256Array),
2953    F32(&'a Float32Array),
2954    F64(&'a Float64Array),
2955    /// Anything else: fall back to the generic `write_value` dispatch.
2956    Other(&'a dyn Array),
2957}
2958
2959impl<'a> ColEnc<'a> {
2960    fn new(col: &'a dyn Array) -> ColEnc<'a> {
2961        macro_rules! dc {
2962            ($t:ty) => {
2963                col.as_any().downcast_ref::<$t>().unwrap()
2964            };
2965        }
2966        match col.data_type() {
2967            DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2968            DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2969            DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2970            DataType::Dictionary(key, value)
2971                if matches!(key.as_ref(), DataType::Int32)
2972                    && matches!(value.as_ref(), DataType::Utf8) =>
2973            {
2974                let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2975                let values = dict
2976                    .values()
2977                    .as_any()
2978                    .downcast_ref::<StringArray>()
2979                    .unwrap();
2980                ColEnc::DictI32Utf8(dict, values)
2981            }
2982            DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2983            DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2984            DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2985            DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2986            DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2987            DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2988            DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2989            DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2990            DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2991            DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2992            DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2993            DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2994            DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2995            _ => ColEnc::Other(col),
2996        }
2997    }
2998
2999    #[inline]
3000    fn write(
3001        &self,
3002        buf: &mut Vec<u8>,
3003        row: usize,
3004        itoa_buf: &mut itoa::Buffer,
3005        ryu_buf: &mut ryu::Buffer,
3006    ) {
3007        macro_rules! int {
3008            ($arr:expr) => {{
3009                if $arr.is_null(row) {
3010                    buf.extend_from_slice(b"null");
3011                } else {
3012                    buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
3013                }
3014            }};
3015        }
3016        match self {
3017            ColEnc::Utf8(a) => {
3018                if a.is_null(row) {
3019                    buf.extend_from_slice(b"null");
3020                } else {
3021                    write_str(buf, a.value(row));
3022                }
3023            }
3024            ColEnc::LargeUtf8(a) => {
3025                if a.is_null(row) {
3026                    buf.extend_from_slice(b"null");
3027                } else {
3028                    write_str(buf, a.value(row));
3029                }
3030            }
3031            ColEnc::Utf8View(a) => {
3032                if a.is_null(row) {
3033                    buf.extend_from_slice(b"null");
3034                } else {
3035                    write_str(buf, a.value(row));
3036                }
3037            }
3038            ColEnc::DictI32Utf8(keys, values) => {
3039                if keys.is_null(row) {
3040                    buf.extend_from_slice(b"null");
3041                } else {
3042                    let k = keys.keys().value(row) as usize;
3043                    write_str(buf, values.value(k));
3044                }
3045            }
3046            ColEnc::Bool(a) => {
3047                if a.is_null(row) {
3048                    buf.extend_from_slice(b"null");
3049                } else {
3050                    buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
3051                }
3052            }
3053            ColEnc::I8(a) => int!(a),
3054            ColEnc::I16(a) => int!(a),
3055            ColEnc::I32(a) => int!(a),
3056            ColEnc::I64(a) => int!(a),
3057            ColEnc::U8(a) => int!(a),
3058            ColEnc::U16(a) => int!(a),
3059            ColEnc::U32(a) => int!(a),
3060            ColEnc::U64(a) => int!(a),
3061            ColEnc::Dec128(a) => {
3062                if a.is_null(row) {
3063                    buf.extend_from_slice(b"null");
3064                } else {
3065                    write_str(buf, &a.value_as_string(row));
3066                }
3067            }
3068            ColEnc::Dec256(a) => {
3069                if a.is_null(row) {
3070                    buf.extend_from_slice(b"null");
3071                } else {
3072                    write_str(buf, &a.value_as_string(row));
3073                }
3074            }
3075            ColEnc::F32(a) => {
3076                if a.is_null(row) {
3077                    buf.extend_from_slice(b"null");
3078                } else {
3079                    let v = a.value(row);
3080                    if v.is_finite() {
3081                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
3082                    } else {
3083                        buf.extend_from_slice(b"null");
3084                    }
3085                }
3086            }
3087            ColEnc::F64(a) => {
3088                if a.is_null(row) {
3089                    buf.extend_from_slice(b"null");
3090                } else {
3091                    let v = a.value(row);
3092                    if v.is_finite() {
3093                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
3094                    } else {
3095                        buf.extend_from_slice(b"null");
3096                    }
3097                }
3098            }
3099            ColEnc::Other(col) => {
3100                if col.is_null(row) {
3101                    buf.extend_from_slice(b"null");
3102                } else {
3103                    write_value(buf, *col, row);
3104                }
3105            }
3106        }
3107    }
3108}
3109
3110#[inline]
3111fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
3112    match col.data_type() {
3113        DataType::Utf8 => write_str(
3114            buf,
3115            col.as_any()
3116                .downcast_ref::<StringArray>()
3117                .unwrap()
3118                .value(row),
3119        ),
3120        DataType::LargeUtf8 => write_str(
3121            buf,
3122            col.as_any()
3123                .downcast_ref::<LargeStringArray>()
3124                .unwrap()
3125                .value(row),
3126        ),
3127        DataType::Utf8View => write_str(
3128            buf,
3129            col.as_any()
3130                .downcast_ref::<StringViewArray>()
3131                .unwrap()
3132                .value(row),
3133        ),
3134        DataType::Dictionary(key, value)
3135            if matches!(key.as_ref(), DataType::Int32)
3136                && matches!(value.as_ref(), DataType::Utf8) =>
3137        {
3138            let dict = col
3139                .as_any()
3140                .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
3141                .unwrap();
3142            let keys = dict.keys();
3143            let values = dict
3144                .values()
3145                .as_any()
3146                .downcast_ref::<StringArray>()
3147                .unwrap();
3148            let k = keys.value(row) as usize;
3149            write_str(buf, values.value(k));
3150        }
3151        DataType::Boolean => {
3152            let v = col
3153                .as_any()
3154                .downcast_ref::<BooleanArray>()
3155                .unwrap()
3156                .value(row);
3157            buf.extend_from_slice(if v { b"true" } else { b"false" });
3158        }
3159        DataType::Int8 => {
3160            let mut b = itoa::Buffer::new();
3161            buf.extend_from_slice(
3162                b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
3163                    .as_bytes(),
3164            );
3165        }
3166        DataType::Int16 => {
3167            let mut b = itoa::Buffer::new();
3168            buf.extend_from_slice(
3169                b.format(
3170                    col.as_any()
3171                        .downcast_ref::<Int16Array>()
3172                        .unwrap()
3173                        .value(row),
3174                )
3175                .as_bytes(),
3176            );
3177        }
3178        DataType::Int32 => {
3179            let mut b = itoa::Buffer::new();
3180            buf.extend_from_slice(
3181                b.format(
3182                    col.as_any()
3183                        .downcast_ref::<Int32Array>()
3184                        .unwrap()
3185                        .value(row),
3186                )
3187                .as_bytes(),
3188            );
3189        }
3190        DataType::Int64 => {
3191            let mut b = itoa::Buffer::new();
3192            buf.extend_from_slice(
3193                b.format(
3194                    col.as_any()
3195                        .downcast_ref::<Int64Array>()
3196                        .unwrap()
3197                        .value(row),
3198                )
3199                .as_bytes(),
3200            );
3201        }
3202        DataType::UInt8 => {
3203            let mut b = itoa::Buffer::new();
3204            buf.extend_from_slice(
3205                b.format(
3206                    col.as_any()
3207                        .downcast_ref::<UInt8Array>()
3208                        .unwrap()
3209                        .value(row),
3210                )
3211                .as_bytes(),
3212            );
3213        }
3214        DataType::UInt16 => {
3215            let mut b = itoa::Buffer::new();
3216            buf.extend_from_slice(
3217                b.format(
3218                    col.as_any()
3219                        .downcast_ref::<UInt16Array>()
3220                        .unwrap()
3221                        .value(row),
3222                )
3223                .as_bytes(),
3224            );
3225        }
3226        DataType::UInt32 => {
3227            let mut b = itoa::Buffer::new();
3228            buf.extend_from_slice(
3229                b.format(
3230                    col.as_any()
3231                        .downcast_ref::<UInt32Array>()
3232                        .unwrap()
3233                        .value(row),
3234                )
3235                .as_bytes(),
3236            );
3237        }
3238        DataType::UInt64 => {
3239            let mut b = itoa::Buffer::new();
3240            buf.extend_from_slice(
3241                b.format(
3242                    col.as_any()
3243                        .downcast_ref::<UInt64Array>()
3244                        .unwrap()
3245                        .value(row),
3246                )
3247                .as_bytes(),
3248            );
3249        }
3250        DataType::Decimal128(_, _) => {
3251            let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
3252            write_str(buf, &arr.value_as_string(row));
3253        }
3254        DataType::Decimal256(_, _) => {
3255            let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
3256            write_str(buf, &arr.value_as_string(row));
3257        }
3258        DataType::Float32 => {
3259            let v = col
3260                .as_any()
3261                .downcast_ref::<Float32Array>()
3262                .unwrap()
3263                .value(row);
3264            if v.is_finite() {
3265                let mut b = ryu::Buffer::new();
3266                buf.extend_from_slice(b.format_finite(v).as_bytes());
3267            } else {
3268                buf.extend_from_slice(b"null");
3269            }
3270        }
3271        DataType::Float64 => {
3272            let v = col
3273                .as_any()
3274                .downcast_ref::<Float64Array>()
3275                .unwrap()
3276                .value(row);
3277            if v.is_finite() {
3278                let mut b = ryu::Buffer::new();
3279                buf.extend_from_slice(b.format_finite(v).as_bytes());
3280            } else {
3281                buf.extend_from_slice(b"null");
3282            }
3283        }
3284        // Any dtype not handled above must have been pre-cast to Utf8 by
3285        // `cast_for_serialize`. Hitting this arm is a bug — surface it as a
3286        // visible JSON string rather than a silent null so it can't be
3287        // mistaken for a real NULL value.
3288        other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
3289    }
3290}
3291
3292#[inline]
3293fn write_str(buf: &mut Vec<u8>, s: &str) {
3294    buf.push(b'"');
3295    for &byte in s.as_bytes() {
3296        match byte {
3297            b'"' => buf.extend_from_slice(b"\\\""),
3298            b'\\' => buf.extend_from_slice(b"\\\\"),
3299            b'\n' => buf.extend_from_slice(b"\\n"),
3300            b'\r' => buf.extend_from_slice(b"\\r"),
3301            b'\t' => buf.extend_from_slice(b"\\t"),
3302            0x00..=0x1f => {
3303                buf.extend_from_slice(b"\\u00");
3304                const HEX: &[u8] = b"0123456789abcdef";
3305                buf.push(HEX[(byte >> 4) as usize]);
3306                buf.push(HEX[(byte & 0xf) as usize]);
3307            }
3308            b => buf.push(b),
3309        }
3310    }
3311    buf.push(b'"');
3312}
3313
3314// ---------------------------------------------------------------------------
3315// Backend trait impl — wires the store into the generic core handlers.
3316// ---------------------------------------------------------------------------
3317
3318#[async_trait]
3319impl Backend for Store {
3320    fn names(&self) -> Vec<String> {
3321        Store::names(self)
3322    }
3323
3324    fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
3325        let st = self.dataset(name)?;
3326        Ok(DatasetSummary {
3327            name: st.schema.name.clone(),
3328            columns: st.schema.columns.len(),
3329            rows: st.num_rows(),
3330        })
3331    }
3332
3333    fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
3334        let st = self.dataset(name)?;
3335        Ok(Arc::new(st.schema.clone()))
3336    }
3337
3338    fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
3339        let st = self.dataset(name)?;
3340        // Report indexed columns in the dataset's declared schema order
3341        // so the `/schema` response is deterministic.
3342        let mut cols: Vec<String> = st
3343            .schema
3344            .columns
3345            .iter()
3346            .map(|c| c.name.clone())
3347            .filter(|n| st.index.contains_key(n))
3348            .collect();
3349        // Any indexed columns not in `schema.columns` (shouldn't happen,
3350        // but be defensive) get appended sorted.
3351        let mut extras: Vec<String> = st
3352            .index
3353            .keys()
3354            .filter(|n| !cols.iter().any(|c| c == *n))
3355            .cloned()
3356            .collect();
3357        extras.sort();
3358        cols.extend(extras);
3359        Ok(cols)
3360    }
3361
3362    async fn sample(&self, name: &str) -> Result<String, AppError> {
3363        Store::sample(self, name).await
3364    }
3365
3366    async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
3367        Store::query(self, name, req).await
3368    }
3369
3370    async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
3371        Store::query_arrow(self, name, req).await
3372    }
3373
3374    async fn query_arrow_stream(
3375        &self,
3376        name: &str,
3377        req: &QueryRequest,
3378    ) -> Result<ArrowIpcStream, AppError> {
3379        Store::query_arrow_stream(self, name, req).await
3380    }
3381
3382    async fn query_arrow_stream_all(
3383        &self,
3384        name: &str,
3385        req: &QueryRequest,
3386    ) -> Result<ArrowIpcStream, AppError> {
3387        Store::query_arrow_stream_all(self, name, req).await
3388    }
3389
3390    async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
3391        Store::count(self, name, req).await
3392    }
3393
3394    async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
3395        Store::query_sql(self, sql, max_rows).await
3396    }
3397
3398    async fn query_sql_arrow_stream(
3399        &self,
3400        sql: &str,
3401        max_rows: u64,
3402    ) -> Result<ArrowIpcStream, AppError> {
3403        Store::query_sql_arrow_stream(self, sql, max_rows).await
3404    }
3405
3406    async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
3407        Store::parquet(self, name).await
3408    }
3409
3410    async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
3411        Store::reload(self, name).await
3412    }
3413
3414    async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
3415        Store::register(self, cfg).await
3416    }
3417}
3418
3419#[cfg(test)]
3420mod tests {
3421    use super::is_s3_access_denied;
3422
3423    #[test]
3424    fn detects_s3_access_denied_variants() {
3425        // Representative messages from object_store / AWS / deltalake / MinIO.
3426        for msg in [
3427            "Generic S3 error: Error performing get request: response error \"<Error><Code>AccessDenied</Code></Error>\", status: 403",
3428            "Client error with status 403 Forbidden",
3429            "S3 error: Access Denied",
3430            "request failed: 403 Forbidden",
3431        ] {
3432            assert!(is_s3_access_denied(msg), "should flag: {msg}");
3433        }
3434    }
3435
3436    #[test]
3437    fn ignores_unrelated_errors() {
3438        for msg in [
3439            "Not a Delta table: Generic delta kernel error: No files in log segment",
3440            "object at location data/part.parquet not found",
3441            "failed to infer parquet schema: invalid magic bytes",
3442        ] {
3443            assert!(!is_s3_access_denied(msg), "should not flag: {msg}");
3444        }
3445    }
3446}
3447