Skip to main content

datapress_datafusion/
store.rs

1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex, RwLock};
5use std::time::Duration;
6
7use arc_swap::ArcSwap;
8use arrow::array::{
9    Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
10    Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
11    StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
12};
13use arrow::compute;
14use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
15use arrow::datatypes::{DataType, Field, Schema};
16use async_trait::async_trait;
17use parquet::arrow::ProjectionMask;
18use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
19use serde_json::Value as JsonValue;
20
21use datafusion::datasource::file_format::parquet::ParquetFormat;
22use datafusion::datasource::listing::{
23    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
24};
25use datafusion::datasource::{MemTable, TableProvider};
26use datafusion::execution::cache::DefaultListFilesCache;
27use datafusion::execution::cache::cache_manager::CacheManagerConfig;
28use datafusion::execution::runtime_env::RuntimeEnvBuilder;
29use datafusion::prelude::{SessionConfig, SessionContext};
30use datafusion::scalar::ScalarValue;
31
32use object_store::aws::AmazonS3Builder;
33use url::Url;
34
35use datapress_core::backend::{
36    ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
37};
38use datapress_core::config::{
39    AddressingStyle, AppConfig, DataFusionConfig, DatasetConfig, IndexConfig, IndexMode,
40    Partitioning, ResolvedCreds, S3Config, ServerConfig, SourceKind,
41};
42use datapress_core::errors::AppError;
43use datapress_core::models::{CountRequest, Predicate, QueryRequest};
44use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
45
46// ---------------------------------------------------------------------------
47// Public types
48// ---------------------------------------------------------------------------
49
50/// Hasher-swapped map alias. The default `std::collections::HashMap` uses
51/// SipHash (DoS-resistant but slow). The equality index keys are our own
52/// column values, not untrusted hash-flood input, so we use `ahash` — a much
53/// faster non-cryptographic hasher — on this per-request hot path.
54type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
55
56/// Pre-built equality index: lowercase col name → string-encoded value → sorted row ids.
57type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
58
59/// Per-dataset state: schema metadata, the resident chunks, and the
60/// equality index built per the dataset's `[dataset.index]` policy.
61///
62/// `data` is the dataset as a `Vec<RecordBatch>` — exactly the chunks
63/// produced by the underlying reader, after temporal columns are cast to
64/// `Utf8`. We deliberately do **not** call `concat_batches` to fuse them
65/// into one batch: on wide schemas (hundreds of columns) that transiently
66/// allocates a second full copy of the decoded Arrow data, pushing peak
67/// RSS to ~2× the resident size and OOM-killing the process at startup.
68///
69/// When `lazy` is true the dataset is *not* materialised: `data` is empty,
70/// `index` is empty, and every query is dispatched to DataFusion SQL
71/// against a registered `ListingTable`. `arrow_schema` still carries the
72/// inferred schema so discovery endpoints work.
73pub struct DatasetState {
74    pub schema: DatasetSchema,
75    pub data: Vec<RecordBatch>,
76    pub arrow_schema: Arc<Schema>,
77    pub index: EqIndex,
78    pub lazy: bool,
79}
80
81impl DatasetState {
82    /// Sum of `num_rows()` across all resident chunks. `0` for lazy datasets.
83    pub fn num_rows(&self) -> usize {
84        self.data.iter().map(|b| b.num_rows()).sum()
85    }
86}
87
88/// Multi-dataset registry. Each dataset is registered in the shared
89/// `SessionContext` under its configured name. The per-dataset state is
90/// held behind `ArcSwap` so a reload can atomically replace it without
91/// blocking concurrent queries.
92pub struct Store {
93    ctx: SessionContext,
94    max_page_size: u64,
95    /// Original dataset configs, indexed by name. Reload reads the source
96    /// path from here — clients can't redirect a reload at an arbitrary file.
97    /// Behind an `RwLock` so datasets registered at runtime can be added
98    /// without a restart.
99    configs: RwLock<HashMap<String, DatasetConfig>>,
100    /// Hot-swappable snapshot of all currently loaded datasets.
101    datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
102    /// Per-name reload mutex. Serialises concurrent reloads of the same
103    /// dataset; reloads of different datasets proceed in parallel.
104    reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
105}
106
107impl Store {
108    /// Load every dataset declared in `cfg`.
109    pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
110        // One-shot init for the deltalake S3 backend. Safe to call more
111        // than once — the handlers are idempotent.
112        if cfg
113            .datasets
114            .iter()
115            .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
116        {
117            deltalake::aws::register_handlers(None);
118        }
119
120        // NOTE: identifier handling for the raw-SQL endpoint is done by
121        // rewriting schema column/table references to quoted canonical
122        // names before execution (see `query_sql`). DataFusion's default
123        // identifier normalization is left ON so unquoted *aliases* and
124        // CTE names stay case-insensitive, matching DuckDB.
125        let ctx = build_tuned_context(&cfg.datafusion);
126        let mut datasets = HashMap::with_capacity(cfg.datasets.len());
127        let mut configs = HashMap::with_capacity(cfg.datasets.len());
128
129        for d in &cfg.datasets {
130            log::info!(
131                "Loading dataset '{}' ({} @ {})",
132                d.name,
133                d.source.kind.as_str(),
134                d.source.location
135            );
136            // Force lazy when the source exceeds the server size threshold.
137            // S3-aware: local sources are stat'd, S3 sources are sized by
138            // listing the object store under their prefix.
139            let d: std::borrow::Cow<'_, DatasetConfig> = match should_force_lazy(d, &cfg.server)
140                .await
141            {
142                Some(bytes) => {
143                    log::info!(
144                        "dataset '{}': {:.1} MiB exceeds force_lazy_above_mb = {} → forcing lazy",
145                        d.name,
146                        bytes as f64 / (1024.0 * 1024.0),
147                        cfg.server.force_lazy_above_mb
148                    );
149                    let mut forced = d.clone();
150                    forced.lazy = true;
151                    std::borrow::Cow::Owned(forced)
152                }
153                None => std::borrow::Cow::Borrowed(d),
154            };
155            let d = d.as_ref();
156            let (state, provider) = match build_dataset(d, &ctx).await {
157                Ok(built) => built,
158                Err(AppError::EmptyDataset(msg)) => {
159                    log::warn!("skipping empty dataset '{}': {msg}", d.name);
160                    continue;
161                }
162                // An S3 source we're not authorized to read (bad creds, no
163                // bucket/prefix policy, expired token) returns a 403. Don't
164                // abort the whole registry for one inaccessible dataset —
165                // log and skip it, exactly like an empty source. The rest of
166                // the datasets still load and serve traffic.
167                Err(e) if d.source.is_s3() && is_s3_access_denied(&e.to_string()) => {
168                    log::warn!(
169                        "skipping dataset '{}': S3 access denied — check credentials \
170                         and bucket policy ({e})",
171                        d.name
172                    );
173                    continue;
174                }
175                Err(e) => return Err(e),
176            };
177            ctx.register_table(d.name.as_str(), provider)?;
178            datasets.insert(d.name.clone(), Arc::new(state));
179            configs.insert(d.name.clone(), d.clone());
180        }
181        Ok(Self {
182            ctx,
183            max_page_size: cfg.server.max_page_size.max(1),
184            configs: RwLock::new(configs),
185            datasets: ArcSwap::from_pointee(datasets),
186            reload_locks: Mutex::new(HashMap::new()),
187        })
188    }
189
190    /// Sorted list of dataset names.
191    pub fn names(&self) -> Vec<String> {
192        let snap = self.datasets.load();
193        let mut v: Vec<String> = snap.keys().cloned().collect();
194        v.sort();
195        v
196    }
197
198    pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
199        self.datasets
200            .load()
201            .get(name)
202            .cloned()
203            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
204    }
205
206    /// JSON for the first row of the dataset, or `null` if empty. Used by
207    /// `GET /api/datasets/{name}/schema` for discoverability.
208    pub async fn sample(&self, name: &str) -> Result<String, AppError> {
209        let st = self.dataset(name)?;
210
211        // Lazy datasets have no resident batch — pull one row via SQL.
212        if st.lazy {
213            let table = DatasetSchema::quote_ident(&st.schema.name);
214            let sql = format!("SELECT * FROM {table} LIMIT 1");
215            let df = self.ctx.sql(&sql).await?;
216            let batches = df.collect().await?;
217            if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
218                return Ok("null".into());
219            }
220            let arr = serialize(&batches[0].slice(0, 1))?;
221            let trimmed = arr.trim();
222            let inner = trimmed
223                .strip_prefix('[')
224                .and_then(|s| s.strip_suffix(']'))
225                .unwrap_or(trimmed);
226            return Ok(inner.to_string());
227        }
228
229        let first = match st.data.iter().find(|b| b.num_rows() > 0) {
230            Some(b) => b,
231            None => return Ok("null".into()),
232        };
233        let arr = serialize(&first.slice(0, 1))?;
234        // strip the outer [] to return a single object
235        let trimmed = arr.trim();
236        let inner = trimmed
237            .strip_prefix('[')
238            .and_then(|s| s.strip_suffix(']'))
239            .unwrap_or(trimmed);
240        Ok(inner.to_string())
241    }
242
243    /// Rebuild `name` from disk and atomically swap it in. Concurrent queries
244    /// against the same name continue to see the *old* `Arc<DatasetState>`
245    /// until they finish; the old data is dropped once the last reference
246    /// goes away.
247    pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
248        // 1. Look up the dataset config. Not finding it = 404.
249        let cfg = self
250            .configs
251            .read()
252            .unwrap()
253            .get(name)
254            .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
255            .clone();
256
257        // 2. Per-name lock: only one reload of this dataset at a time.
258        let lock = {
259            let mut locks = self.reload_locks.lock().unwrap();
260            locks
261                .entry(name.to_string())
262                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
263                .clone()
264        };
265        let _guard = lock.lock().await;
266
267        let started = std::time::Instant::now();
268
269        // Invalidate the object-store LIST cache before rebuilding. For an
270        // S3 *prefix* source the files are often rewritten with new UUID
271        // names; without dropping the cached listing the rebuilt
272        // `ListingTable` would replay the stale keys and fail with
273        // "object at location … not found". A plain prefix is therefore
274        // re-listed fresh; an explicit filename/glob source is re-resolved
275        // against the same configured pattern. Clearing the whole cache is
276        // cheap — other datasets simply re-list on their next query.
277        //
278        // NOTE: this must clear the cache that actually lives on the
279        // `RuntimeEnv`, not just the one we opt into via
280        // `[datafusion] list_files_cache`. DataFusion ≥ 53 installs a
281        // *default* `DefaultListFilesCache` (1 MiB, infinite TTL) on every
282        // `SessionContext` even when no cache is configured, so the staleness
283        // bites regardless of our flag.
284        if let Some(cache) = self.ctx.runtime_env().cache_manager.get_list_files_cache() {
285            cache.clear();
286        }
287
288        // 3. Heavy lifting (source read + index build). Parquet/delta
289        // readers are themselves async, so we don't wrap in `web::block`.
290        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
291        let rows = state.num_rows();
292
293        // 4. Atomic swap.
294        //   a) Replace the MemTable inside the SessionContext.
295        //   b) ArcSwap a new snapshot map with the updated Arc<DatasetState>.
296        // In-flight queries already hold the old provider + old Arc; they
297        // run to completion. New queries see the new data.
298        let _ = self.ctx.deregister_table(name)?;
299        self.ctx.register_table(name, provider)?;
300
301        let mut new_map = (**self.datasets.load()).clone();
302        new_map.insert(name.to_string(), Arc::new(state));
303        self.datasets.store(Arc::new(new_map));
304
305        let elapsed_ms = started.elapsed().as_millis();
306        log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
307        Ok(ReloadStats { rows, elapsed_ms })
308    }
309
310    /// Register a brand-new dataset from `cfg` at runtime. Opens the source,
311    /// registers a provider in the shared `SessionContext`, and inserts it
312    /// into the live snapshot so it is immediately queryable — no restart.
313    pub async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
314        cfg.validate_for_register()?;
315
316        // Fast pre-check before taking the (async) per-name lock.
317        if self.datasets.load().contains_key(&cfg.name) {
318            return Err(AppError::InvalidValue(format!(
319                "dataset '{}' already exists",
320                cfg.name
321            )));
322        }
323
324        // Serialise against a concurrent register/reload of the same name.
325        let lock = {
326            let mut locks = self.reload_locks.lock().unwrap();
327            locks
328                .entry(cfg.name.clone())
329                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
330                .clone()
331        };
332        let _guard = lock.lock().await;
333
334        // Re-check under the lock — another task may have won the race.
335        if self.datasets.load().contains_key(&cfg.name) {
336            return Err(AppError::InvalidValue(format!(
337                "dataset '{}' already exists",
338                cfg.name
339            )));
340        }
341
342        // One-shot init for the deltalake S3 backend, mirroring `load`.
343        // Idempotent — safe to call again for a runtime-registered dataset.
344        if cfg.source.kind == SourceKind::Delta && cfg.source.is_s3() {
345            deltalake::aws::register_handlers(None);
346        }
347
348        let started = std::time::Instant::now();
349        let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
350        let rows = state.num_rows();
351        let columns = state.schema.columns.len();
352
353        self.ctx.register_table(cfg.name.as_str(), provider)?;
354
355        let mut new_map = (**self.datasets.load()).clone();
356        new_map.insert(cfg.name.clone(), Arc::new(state));
357        self.datasets.store(Arc::new(new_map));
358        self.configs
359            .write()
360            .unwrap()
361            .insert(cfg.name.clone(), cfg.clone());
362
363        let elapsed_ms = started.elapsed().as_millis();
364        log::info!(
365            "registered dataset '{}' ({} @ {}): {rows} rows in {elapsed_ms} ms",
366            cfg.name,
367            cfg.source.kind.as_str(),
368            cfg.source.location
369        );
370        Ok(DatasetSummary {
371            name: cfg.name,
372            columns,
373            rows,
374        })
375    }
376
377    /// Run a `QueryRequest` against `name`. Empty predicates → O(1) Arrow
378    /// slice. Otherwise → DataFusion SQL on the single registered table.
379    /// Lazy datasets skip the in-memory hot paths and always dispatch to SQL.
380    pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
381        let batch = self.query_batch(name, req).await?;
382        if batch.num_rows() == 0 {
383            return Ok("[]".to_string());
384        }
385        serialize(&batch)
386    }
387
388    /// Rewrite registered table / column references in a raw-SQL string to
389    /// their canonical, quoted spelling so matching is case-insensitive
390    /// (like DuckDB). Builds the lookup from every currently loaded
391    /// dataset; unknown identifiers (aliases, CTE names) are left for the
392    /// engine's default normalization to handle.
393    fn canonicalize_sql(&self, sql: &str) -> String {
394        let snap = self.datasets.load();
395        let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
396        let mut columns: HashMap<String, String> = HashMap::new();
397        for (name, state) in snap.iter() {
398            tables.insert(name.to_lowercase(), name.clone());
399            for col in &state.schema.columns {
400                columns
401                    .entry(col.name.to_lowercase())
402                    .or_insert_with(|| col.name.clone());
403            }
404        }
405        datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
406    }
407
408    /// Execute a pre-validated raw `SELECT` and return the JSON `data`
409    /// array. The statement has already passed
410    /// [`datapress_core::sql::validate`]; here it is wrapped in an outer
411    /// `LIMIT max_rows` so the result is bounded regardless of the user's
412    /// own clauses, executed through the shared `SessionContext`, and run
413    /// through the same fast JSON encoder as [`Self::query`].
414    pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
415        let cap = max_rows.max(1);
416        let sql = self.canonicalize_sql(sql);
417        // DESCRIBE yields a schema listing and cannot be nested in a
418        // subquery on DataFusion, so run it directly. Its row count is
419        // bounded by the column count and the slice below still enforces
420        // `cap`; everything else is wrapped in an outer LIMIT.
421        let wrapped = if datapress_core::sql::is_describe(&sql) {
422            sql
423        } else {
424            format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
425        };
426        let df = self.ctx.sql(&wrapped).await?;
427        let batches = df.collect().await?;
428        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
429            return Ok("[]".to_string());
430        }
431        let batch = if batches.len() == 1 {
432            batches.into_iter().next().expect("checked len")
433        } else {
434            compute::concat_batches(&batches[0].schema(), batches.iter())?
435        };
436        // Defence in depth: the outer LIMIT already bounds the row count,
437        // but slice anyway so a planning quirk can never blow the cap.
438        let batch = if batch.num_rows() as u64 > cap {
439            batch.slice(0, cap as usize)
440        } else {
441            batch
442        };
443        serialize(&batch)
444    }
445
446    /// Same plan as [`Self::query_sql`], but encode the bounded result as
447    /// an Arrow IPC stream instead of JSON. Backs the Arrow
448    /// content-negotiated branch of `POST /api/v1/sql`.
449    pub async fn query_sql_arrow_stream(
450        &self,
451        sql: &str,
452        max_rows: u64,
453    ) -> Result<ArrowIpcStream, AppError> {
454        let cap = max_rows.max(1);
455        let sql = self.canonicalize_sql(sql);
456        // DESCRIBE cannot be nested in a subquery on DataFusion (see
457        // `query_sql`); run it directly. Its output is bounded by the
458        // column count, so the missing outer LIMIT is harmless.
459        let wrapped = if datapress_core::sql::is_describe(&sql) {
460            sql
461        } else {
462            format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
463        };
464        let df = self.ctx.sql(&wrapped).await?;
465        let batches = df.collect().await?;
466        Ok(stream_arrow_batches(batches))
467    }
468
469    /// Same plan as [`Self::query`], but encode the result page as an
470    /// Arrow IPC stream (one schema message + one batch + EOS). Empty
471    /// results still produce a valid, self-describing zero-batch stream.
472    pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
473        let batch = self.query_batch(name, req).await?;
474        let schema = batch.schema();
475        let mut buf = Vec::with_capacity(8 * 1024);
476        {
477            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
478            if batch.num_rows() > 0 {
479                w.write(&batch)?;
480            }
481            w.finish()?;
482        }
483        Ok(buf)
484    }
485
486    pub async fn query_arrow_stream(
487        &self,
488        name: &str,
489        req: &QueryRequest,
490    ) -> Result<ArrowIpcStream, AppError> {
491        let batches = self.query_batches(name, req).await?;
492        Ok(stream_arrow_batches(batches))
493    }    pub async fn query_arrow_stream_all(
494        &self,
495        name: &str,
496        req: &QueryRequest,
497    ) -> Result<ArrowIpcStream, AppError> {
498        let batches = self.query_batches_all(name, req).await?;
499        Ok(stream_arrow_batches(batches))
500    }
501
502    /// Encode the entire dataset as a single self-contained Parquet file.
503    ///
504    /// Collects every row (all columns, no predicates, no paging) and runs
505    /// it through a single [`parquet::arrow::ArrowWriter`], so the result
506    /// carries the row-group + footer metadata a Parquet reader needs to
507    /// answer `count(*)` straight from the footer. Powers the cached
508    /// `GET /datasets/{name}/parquet` HTTP endpoint.
509    pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
510        // All rows, all columns, no predicates / ordering / limit.
511        let req = QueryRequest {
512            columns: Vec::new(),
513            predicates: Vec::new(),
514            group_by: Vec::new(),
515            aggregations: Vec::new(),
516            having: Vec::new(),
517            distinct: false,
518            order_by: Vec::new(),
519            limit: None,
520            page: 1,
521            page_size: 1,
522        };
523        let st = self.dataset(name)?;
524        let batches = self.query_batches_all(name, &req).await?;
525        // Use the actual batch schema when we have rows so the writer schema
526        // matches exactly (projection/nullability); fall back to the
527        // dataset schema for an empty dataset.
528        let schema = batches
529            .first()
530            .map(|b| b.schema())
531            .unwrap_or_else(|| st.arrow_schema.clone());
532
533        let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
534        {
535            let props = parquet::file::properties::WriterProperties::builder()
536                .set_compression(parquet::basic::Compression::SNAPPY)
537                .build();
538            let mut writer =
539                parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
540                    .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
541            for batch in &batches {
542                if batch.num_rows() > 0 {
543                    writer
544                        .write(batch)
545                        .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
546                }
547            }
548            writer
549                .close()
550                .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
551        }
552        Ok(bytes::Bytes::from(buf))
553    }
554
555    /// Compute the result page as a single `RecordBatch`. Shared between
556    /// the JSON and Arrow IPC encoders.
557    async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
558        let batches = self.query_batches(name, req).await?;
559        if batches.is_empty() {
560            return Ok(RecordBatch::new_empty(Arc::new(
561                arrow::datatypes::Schema::empty(),
562            )));
563        }
564        if batches.len() == 1 {
565            return Ok(batches.into_iter().next().expect("checked len"));
566        }
567        if batches.iter().all(|b| b.num_rows() == 0) {
568            return Ok(RecordBatch::new_empty(batches[0].schema()));
569        }
570        let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
571        Ok(batch)
572    }
573
574    /// Compute the result page as Arrow batches. Arrow IPC responses can
575    /// write these directly, while JSON callers concatenate via
576    /// [`Self::query_batch`] for the existing row conversion path.
577    async fn query_batches(
578        &self,
579        name: &str,
580        req: &QueryRequest,
581    ) -> Result<Vec<RecordBatch>, AppError> {
582        let st = self.dataset(name)?;
583
584        let page = req.page.max(1);
585        let page_size = req.page_size.clamp(1, self.max_page_size);
586        let offset = ((page - 1) * page_size) as usize;
587        let limit = page_size as usize;
588
589        self.query_batches_inner(st, req, Some((offset, limit)))
590            .await
591    }
592
593    /// Compute all matching rows as Arrow batches for the one-request
594    /// streaming endpoint. `page` and `page_size` are intentionally ignored;
595    /// optional `limit` still caps the total result size.
596    async fn query_batches_all(
597        &self,
598        name: &str,
599        req: &QueryRequest,
600    ) -> Result<Vec<RecordBatch>, AppError> {
601        let st = self.dataset(name)?;
602        self.query_batches_inner(st, req, None).await
603    }
604
605    async fn query_batches_inner(
606        &self,
607        st: Arc<DatasetState>,
608        req: &QueryRequest,
609        page_window: Option<(usize, usize)>,
610    ) -> Result<Vec<RecordBatch>, AppError> {
611        let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
612
613        // In-memory hot paths only fire when:
614        //   - the dataset is materialised,
615        //   - the caller did not ask for ordering,
616        //   - and did not ask for a hard `limit` cap on a paged request.
617        // Both of the latter two require sorting / capping that the SQL
618        // engine handles uniformly across all data types.
619        let can_fast_path = !st.lazy
620            && req.order_by.is_empty()
621            && (page_window.is_none() || req.limit.is_none())
622            && req.group_by.is_empty()
623            && !req.distinct;
624
625        if can_fast_path {
626            let total = st.num_rows();
627
628            // No predicates -> O(1) raw Arrow slices over resident batches,
629            // no engine overhead.
630            if req.predicates.is_empty() {
631                if page_window.is_none() && req.limit.is_none() {
632                    return st
633                        .data
634                        .iter()
635                        .cloned()
636                        .map(|batch| project(&st.schema, batch, &req.columns))
637                        .collect();
638                }
639                let start = offset.min(total);
640                let len = limit.min(total - start);
641                let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
642                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
643            }
644
645            // Index fast path: if every predicate is eq/in on an indexed column,
646            // resolve via the pre-built equality index.
647            if let Some(rows) = try_index(&st.index, &req.predicates) {
648                let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
649                return Ok(vec![project(&st.schema, batch, &req.columns)?]);
650            }
651        }
652
653        // Fallback (and only path for lazy datasets): DataFusion SQL.
654        let (sql, params) = match page_window {
655            Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
656            None => build_query_stream_sql(&st.schema, req)?,
657        };
658        let mut df = self.ctx.sql(&sql).await?;
659        if !params.is_empty() {
660            df = df.with_param_values(params)?;
661        }
662        let batches = df.collect().await?;
663        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
664            let schema = batches
665                .first()
666                .map(|b| b.schema())
667                .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
668            return Ok(vec![RecordBatch::new_empty(schema)]);
669        }
670        Ok(batches)
671    }
672}
673
674fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
675    let schema = batches
676        .first()
677        .map(|batch| batch.schema())
678        .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
679    let (mut writer, stream) = arrow_ipc_stream_channel(8);
680
681    tokio::task::spawn_blocking(move || {
682        let result = (|| -> Result<(), AppError> {
683            let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
684            for batch in batches {
685                if batch.num_rows() > 0 {
686                    w.write(&batch)?;
687                }
688            }
689            w.finish()?;
690            Ok(())
691        })();
692        if let Err(err) = result {
693            log::error!("datafusion arrow stream failed: {err}");
694            writer.send_error(err);
695        }
696    });
697
698    stream
699}
700
701impl Store {
702    /// Return the number of rows matching `req.predicates`. With no
703    /// predicates this is a cheap metadata lookup on materialised datasets
704    /// and a `SELECT COUNT(*)` on lazy ones.
705    pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
706        let st = self.dataset(name)?;
707
708        if !st.lazy {
709            // No predicates → resident row count, no scan.
710            if req.predicates.is_empty() {
711                return Ok(st.num_rows() as i64);
712            }
713            // Index fast path: same eligibility rules as `query`.
714            if let Some(rows) = try_index(&st.index, &req.predicates) {
715                return Ok(rows.len() as i64);
716            }
717        }
718
719        // Fallback: DataFusion SQL — same predicate translation as `query`,
720        // with predicate values bound as typed parameters.
721        let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
722        let mut df = self.ctx.sql(&sql).await?;
723        if !params.is_empty() {
724            df = df.with_param_values(params)?;
725        }
726        let batches = df.collect().await?;
727        let n = batches
728            .first()
729            .and_then(|b| {
730                b.column(0)
731                    .as_any()
732                    .downcast_ref::<arrow::array::Int64Array>()
733            })
734            .filter(|a| !a.is_empty())
735            .map(|a| a.value(0))
736            .unwrap_or(0);
737        Ok(n)
738    }
739}
740
741// ---------------------------------------------------------------------------
742// Dataset loading
743// ---------------------------------------------------------------------------
744
745/// Build a tuned `SessionContext` for the whole `Store`.
746///
747/// Everything here is opt-in via the `[datafusion]` config block; with the
748/// defaults this is equivalent to a plain `SessionContext::new()`. Lazy
749/// datasets (`ListingTable` over parquet, possibly on S3) benefit most:
750///
751/// * `pushdown_filters` — evaluate predicates *during* the parquet decode
752///   so filtered-out rows in a surviving row group are never materialised.
753/// * `reorder_filters` — let the scan reorder pushed-down predicates by
754///   selectivity (only meaningful with `pushdown_filters`).
755/// * `list_files_cache` — a `ListFilesCache` on the `RuntimeEnv` so repeated
756///   lazy queries reuse object-store `LIST` results instead of re-listing the
757///   prefix every time — the dominant per-query cost on S3.
758///
759/// Row-group / page-index / bloom-filter pruning and `metadata_size_hint`
760/// are already on by default in this DataFusion version, so they are left
761/// untouched.
762///
763/// Note: DataFusion ≥ 53 installs a *default* `DefaultListFilesCache` on the
764/// `RuntimeEnv` even with the defaults below, so a reload must explicitly
765/// invalidate it (see [`Store::reload`]) — otherwise a re-listed S3 prefix
766/// would keep serving stale, now-deleted object keys.
767fn build_tuned_context(cfg: &DataFusionConfig) -> SessionContext {
768    let mut config = SessionConfig::new();
769    {
770        let opts = config.options_mut();
771        opts.execution.parquet.pushdown_filters = cfg.pushdown_filters;
772        opts.execution.parquet.reorder_filters = cfg.reorder_filters;
773    }
774
775    if !cfg.list_files_cache {
776        return SessionContext::new_with_config(config);
777    }
778
779    // Cache object-store listings so repeated lazy/S3 queries skip the
780    // LIST round-trips. A zero TTL means infinite (never expires).
781    let ttl = (cfg.list_files_cache_ttl_secs > 0)
782        .then(|| Duration::from_secs(cfg.list_files_cache_ttl_secs));
783    let list_cache = Arc::new(DefaultListFilesCache::new(
784        cfg.list_files_cache_mb.saturating_mul(1024 * 1024),
785        ttl,
786    ));
787    let cache_manager = CacheManagerConfig::default().with_list_files_cache(Some(list_cache));
788
789    let runtime = RuntimeEnvBuilder::new()
790        .with_cache_manager(cache_manager)
791        .build_arc()
792        .expect("failed to build DataFusion runtime env");
793
794    SessionContext::new_with_config_rt(config, runtime)
795}
796
797async fn build_dataset(
798    d: &DatasetConfig,
799    ctx: &SessionContext,
800) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
801    // Lazy datasets: register a streaming provider straight against the
802    // source and skip the materialise / index / partition pipeline below.
803    // Parquet uses a `ListingTable`; delta uses deltalake's own DataFusion
804    // `TableProvider` (which reads the transaction log once for the file
805    // list, then streams parquet row groups per query with predicate
806    // pushdown + file skipping).
807    if d.lazy {
808        match (d.source.kind, d.source.is_s3()) {
809            (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
810            (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
811            (SourceKind::Delta, _) => return build_lazy_delta(d, ctx).await,
812        }
813    }
814
815    // Fetch raw RecordBatches from whichever backing store the dataset
816    // is configured to use. All four (parquet, delta) x (local, s3)
817    // combinations converge into one Vec<RecordBatch>; the materialisation
818    // / indexing / partitioning logic below is shared.
819    let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
820        (SourceKind::Parquet, false) => read_local_parquet(d)?,
821        (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
822        (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
823        (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
824    };
825    if raw_batches.is_empty() {
826        return Err(AppError::EmptyDataset(format!(
827            "dataset '{}': source produced no batches",
828            d.name
829        )));
830    }
831    // A source can also resolve to one or more *zero-row* batches (e.g. an
832    // empty Delta table, or a parquet file with only a schema). Treat that as
833    // empty too so it's logged and skipped rather than registered as a 0-row
834    // dataset that shows up in discovery / explore.
835    if raw_batches.iter().all(|b| b.num_rows() == 0) {
836        return Err(AppError::EmptyDataset(format!(
837            "dataset '{}': source has a schema but no rows",
838            d.name
839        )));
840    }
841
842    let chunks = raw_batches;
843    let arrow_sch = chunks[0].schema();
844
845    // Build DatasetSchema from the Arrow schema.
846    let columns: Vec<ColumnInfo> = arrow_sch
847        .fields()
848        .iter()
849        .map(|f| {
850            let dt = f.data_type();
851            ColumnInfo {
852                name: f.name().clone(),
853                logical: arrow_to_logical(dt),
854                sql_type: format!("{dt:?}"),
855                nullable: f.is_nullable(),
856            }
857        })
858        .collect();
859    let schema = DatasetSchema::new(&d.name, columns)
860        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
861
862    // Build the equality index per the per-dataset policy. Operates on the
863    // chunked representation directly so we never have to materialise a
864    // single concatenated batch (which would double peak RSS on wide
865    // schemas — see `DatasetState` docs).
866    let index = build_eq_index_with_policy(&chunks, &d.index);
867
868    // Partition for parallel scans by the SQL fallback path. We distribute
869    // the existing batches round-robin across `n_parts` partitions instead
870    // of re-slicing a concatenated batch — `clone()` on a RecordBatch is
871    // an Arc-clone of the column buffers, not a copy.
872    let n_parts = std::thread::available_parallelism()
873        .map(|n| n.get())
874        .unwrap_or(4);
875    let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
876    for (i, b) in chunks.iter().enumerate() {
877        if b.num_rows() == 0 {
878            continue;
879        }
880        parts[i % n_parts].push(b.clone());
881    }
882    parts.retain(|p| !p.is_empty());
883    let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
884
885    let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
886    let mem_mb: usize = chunks
887        .iter()
888        .flat_map(|b| b.columns().iter())
889        .map(|c| c.get_buffer_memory_size())
890        .sum::<usize>()
891        / 1_048_576;
892    log::info!(
893        "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
894        d.name,
895        d.source.kind.as_str(),
896        total_rows,
897        schema.columns.len(),
898        mem_mb,
899        chunks.len(),
900        index.len()
901    );
902
903    Ok((
904        DatasetState {
905            schema,
906            data: chunks,
907            arrow_schema: arrow_sch,
908            index,
909            lazy: false,
910        },
911        provider,
912    ))
913}
914
915/// Build a lazy state + `ListingTable` provider for a local parquet dataset.
916/// The dataset is never read into RAM; DataFusion streams row groups on
917/// each query. The returned `DatasetState.data` is an empty `Vec` —
918/// `arrow_schema` still carries the inferred Arrow schema for discovery.
919async fn build_lazy_local_parquet(
920    d: &DatasetConfig,
921    ctx: &SessionContext,
922) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
923    let (url, part_keys) = lazy_local_listing(d)?;
924
925    let mut opts =
926        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
927    if !part_keys.is_empty() {
928        opts = opts.with_table_partition_cols(
929            part_keys
930                .iter()
931                .map(|k| (k.clone(), DataType::Utf8))
932                .collect(),
933        );
934    }
935
936    let session_state = ctx.state();
937    // `infer_schema` returns the *file* schema (without partition columns);
938    // `ListingTable` appends the declared partition columns on top.
939    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
940        AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
941    })?;
942
943    // An empty listing yields an empty schema (DataFusion does not error on a
944    // prefix/glob that matches no files). Treat that as an empty dataset so
945    // the load loop logs and skips it rather than registering 0 columns.
946    if file_schema.fields().is_empty() {
947        return Err(AppError::EmptyDataset(format!(
948            "dataset '{}': no .parquet files at {}",
949            d.name, d.source.location
950        )));
951    }
952
953    let cfg = ListingTableConfig::new(url)
954        .with_listing_options(opts)
955        .with_schema(file_schema.clone());
956    let table = ListingTable::try_new(cfg).map_err(|e| {
957        AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
958    })?;
959    let provider: Arc<dyn TableProvider> = Arc::new(table);
960
961    // Discovery schema = file columns + partition columns (Utf8).
962    let mut fields: Vec<Field> = file_schema
963        .fields()
964        .iter()
965        .map(|f| f.as_ref().clone())
966        .collect();
967    for k in &part_keys {
968        if !fields.iter().any(|f| f.name() == k) {
969            fields.push(Field::new(k, DataType::Utf8, false));
970        }
971    }
972    let arrow_sch = Arc::new(Schema::new(fields));
973
974    let columns: Vec<ColumnInfo> = arrow_sch
975        .fields()
976        .iter()
977        .map(|f| {
978            let dt = f.data_type();
979            ColumnInfo {
980                name: f.name().clone(),
981                logical: arrow_to_logical(dt),
982                sql_type: format!("{dt:?}"),
983                nullable: f.is_nullable(),
984            }
985        })
986        .collect();
987    let schema = DatasetSchema::new(&d.name, columns)
988        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
989
990    log::info!(
991        "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
992        d.name,
993        d.source.kind.as_str(),
994        schema.columns.len(),
995        part_keys.len()
996    );
997
998    Ok((
999        DatasetState {
1000            schema,
1001            data: Vec::new(),
1002            arrow_schema: arrow_sch,
1003            index: EqIndex::default(),
1004            lazy: true,
1005        },
1006        provider,
1007    ))
1008}
1009
1010/// Resolve a local lazy-parquet location into a single `ListingTableUrl`
1011/// rooted at the dataset base plus the ordered hive partition keys (if any).
1012/// Handles three shapes: a glob (`root/city=*/*.parquet`), a directory
1013/// (hive root or flat folder of parquets), and a single `*.parquet` file.
1014fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1015    let loc = &d.source.location;
1016
1017    if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1018        let parts: Vec<&str> = loc.split('/').collect();
1019        let first_wild = parts
1020            .iter()
1021            .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1022            .unwrap_or(parts.len());
1023        let base = parts[..first_wild].join("/");
1024        let base = if base.is_empty() {
1025            "/".to_string()
1026        } else {
1027            base
1028        };
1029        // Partition keys: `key=…` components between the base and the file
1030        // pattern (the final component).
1031        let upper = parts.len().saturating_sub(1);
1032        let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1033            .iter()
1034            .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1035            .filter(|k| !k.is_empty())
1036            .collect();
1037        return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
1038    }
1039
1040    let path = std::path::Path::new(loc);
1041    if path.is_dir() {
1042        let keys = discover_hive_keys(path);
1043        return Ok((dir_url(path, d)?, keys));
1044    }
1045
1046    let url = ListingTableUrl::parse(loc)
1047        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
1048    Ok((url, Vec::new()))
1049}
1050
1051/// Parse a directory path into a `ListingTableUrl` (trailing slash so
1052/// DataFusion treats it as a directory root, not a single object).
1053fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
1054    let s = path.to_str().ok_or_else(|| {
1055        AppError::Internal(format!(
1056            "dataset '{}': non-utf8 path {}",
1057            d.name,
1058            path.display()
1059        ))
1060    })?;
1061    let s = if s.ends_with('/') {
1062        s.to_string()
1063    } else {
1064        format!("{s}/")
1065    };
1066    ListingTableUrl::parse(&s)
1067        .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
1068}
1069
1070/// Walk down a directory following the first `key=value` subdirectory at
1071/// each level to discover the ordered hive partition keys. Returns an empty
1072/// vec for a flat (non-partitioned) folder.
1073fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
1074    let mut keys = Vec::new();
1075    let mut cur = base.to_path_buf();
1076    loop {
1077        let Ok(rd) = std::fs::read_dir(&cur) else {
1078            break;
1079        };
1080        let mut next: Option<(String, std::path::PathBuf)> = None;
1081        for entry in rd.flatten() {
1082            let p = entry.path();
1083            if !p.is_dir() {
1084                continue;
1085            }
1086            let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
1087                continue;
1088            };
1089            if let Some((k, v)) = name.split_once('=')
1090                && !k.is_empty()
1091                && !v.is_empty()
1092            {
1093                next = Some((k.to_string(), p));
1094                break;
1095            }
1096        }
1097        match next {
1098            Some((k, p)) => {
1099                keys.push(k);
1100                cur = p;
1101            }
1102            None => break,
1103        }
1104    }
1105    keys
1106}
1107
1108/// Lazy S3 parquet: register the dataset's S3 object store on `ctx`, then
1109/// build a `ListingTable` rooted at the `s3://bucket/prefix/` location with
1110/// any discovered hive partition columns. DataFusion does the directory
1111/// listing through the registered store and streams row groups on each
1112/// query — no local enumeration needed.
1113async fn build_lazy_s3_parquet(
1114    d: &DatasetConfig,
1115    ctx: &SessionContext,
1116) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1117    register_s3_object_store(d, ctx)?;
1118
1119    let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
1120
1121    // An empty S3 prefix yields an empty inferred schema (no error). Treat it
1122    // as an empty dataset so the load loop logs and skips it.
1123    if file_schema.fields().is_empty() {
1124        return Err(AppError::EmptyDataset(format!(
1125            "dataset '{}': no .parquet files at {}",
1126            d.name, d.source.location
1127        )));
1128    }
1129
1130    // Discovery schema = file columns + partition columns (Utf8).
1131    let mut fields: Vec<Field> = file_schema
1132        .fields()
1133        .iter()
1134        .map(|f| f.as_ref().clone())
1135        .collect();
1136    for k in &part_keys {
1137        if !fields.iter().any(|f| f.name() == k) {
1138            fields.push(Field::new(k, DataType::Utf8, false));
1139        }
1140    }
1141    let arrow_sch = Arc::new(Schema::new(fields));
1142
1143    let columns: Vec<ColumnInfo> = arrow_sch
1144        .fields()
1145        .iter()
1146        .map(|f| {
1147            let dt = f.data_type();
1148            ColumnInfo {
1149                name: f.name().clone(),
1150                logical: arrow_to_logical(dt),
1151                sql_type: format!("{dt:?}"),
1152                nullable: f.is_nullable(),
1153            }
1154        })
1155        .collect();
1156    let schema = DatasetSchema::new(&d.name, columns)
1157        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1158
1159    log::info!(
1160        "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
1161        d.name,
1162        d.source.kind.as_str(),
1163        schema.columns.len(),
1164        part_keys.len()
1165    );
1166
1167    Ok((
1168        DatasetState {
1169            schema,
1170            data: Vec::new(),
1171            arrow_schema: arrow_sch,
1172            index: EqIndex::default(),
1173            lazy: true,
1174        },
1175        provider,
1176    ))
1177}
1178
1179/// Build a `ListingTable` provider for an S3 parquet source, resolving the
1180/// base prefix and hive partition keys via [`s3_listing`]. The registered
1181/// S3 object store must already be present on `ctx`. Returns the provider,
1182/// the inferred *file* schema (without partition columns), and the ordered
1183/// partition keys.
1184async fn build_s3_listing_table(
1185    d: &DatasetConfig,
1186    ctx: &SessionContext,
1187) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
1188    let (url, part_keys) = s3_listing(d, ctx).await?;
1189
1190    let mut opts =
1191        ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1192    if !part_keys.is_empty() {
1193        opts = opts.with_table_partition_cols(
1194            part_keys
1195                .iter()
1196                .map(|k| (k.clone(), DataType::Utf8))
1197                .collect(),
1198        );
1199    }
1200
1201    let session_state = ctx.state();
1202    let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1203        AppError::Internal(format!(
1204            "dataset '{}': infer parquet schema on s3: {e}",
1205            d.name
1206        ))
1207    })?;
1208
1209    let cfg = ListingTableConfig::new(url)
1210        .with_listing_options(opts)
1211        .with_schema(file_schema.clone());
1212    let table = ListingTable::try_new(cfg).map_err(|e| {
1213        AppError::Internal(format!(
1214            "dataset '{}': ListingTable::try_new (s3): {e}",
1215            d.name
1216        ))
1217    })?;
1218    Ok((Arc::new(table), file_schema, part_keys))
1219}
1220
1221/// Resolve an S3 parquet source into a `(base ListingTableUrl, partition
1222/// keys)` pair. Hive keys come from the location glob when present
1223/// (`s3://bucket/events/year=*/...`), otherwise — for a plain prefix — they
1224/// are discovered by listing the registered object store. Honours the
1225/// dataset's `partitioning` mode (`auto` / `hive` / `none`).
1226async fn s3_listing(
1227    d: &DatasetConfig,
1228    ctx: &SessionContext,
1229) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1230    let s3 = d.s3.clone().unwrap_or_default();
1231    let want_partitions = !matches!(s3.partitioning, Partitioning::None);
1232    let loc = &d.source.location;
1233
1234    if d.source.has_glob() {
1235        let (base, keys) = split_glob_base_keys(loc);
1236        let base = format!("{}/", base.trim_end_matches('/'));
1237        let url = ListingTableUrl::parse(&base).map_err(|e| {
1238            AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1239        })?;
1240        let keys = if want_partitions { keys } else { Vec::new() };
1241        return Ok((url, keys));
1242    }
1243
1244    let base = if loc.ends_with('/') {
1245        loc.clone()
1246    } else {
1247        format!("{loc}/")
1248    };
1249    let url = ListingTableUrl::parse(&base).map_err(|e| {
1250        AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1251    })?;
1252    let keys = if want_partitions {
1253        discover_s3_hive_keys(ctx, &url).await
1254    } else {
1255        Vec::new()
1256    };
1257    Ok((url, keys))
1258}
1259
1260/// Split a glob location into its non-wildcard base path and the ordered
1261/// hive partition keys (`key=…` directory components between the base and
1262/// the final file pattern). Works for both local and `s3://` paths.
1263fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1264    let parts: Vec<&str> = loc.split('/').collect();
1265    let first_wild = parts
1266        .iter()
1267        .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1268        .unwrap_or(parts.len());
1269    let base = parts[..first_wild].join("/");
1270    let base = if base.is_empty() {
1271        "/".to_string()
1272    } else {
1273        base
1274    };
1275    let upper = parts.len().saturating_sub(1);
1276    let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1277        .iter()
1278        .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1279        .filter(|k| !k.is_empty())
1280        .collect();
1281    (base, keys)
1282}
1283
1284/// Discover ordered hive partition keys for an S3 prefix by walking the
1285/// object store one `key=value/` level at a time via delimiter listings.
1286/// Best-effort: any listing error stops discovery and returns what was found
1287/// so far (empty = treat as a flat folder).
1288async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1289    let store = match ctx.runtime_env().object_store(url.object_store()) {
1290        Ok(s) => s,
1291        Err(_) => return Vec::new(),
1292    };
1293    let mut keys = Vec::new();
1294    let mut prefix = url.prefix().clone();
1295    loop {
1296        let listing = match store.list_with_delimiter(Some(&prefix)).await {
1297            Ok(l) => l,
1298            Err(_) => break,
1299        };
1300        let mut next: Option<object_store::path::Path> = None;
1301        for cp in &listing.common_prefixes {
1302            if let Some(seg) = cp.parts().next_back() {
1303                let seg = seg.as_ref().to_string();
1304                if let Some((k, v)) = seg.split_once('=')
1305                    && !k.is_empty()
1306                    && !v.is_empty()
1307                {
1308                    keys.push(k.to_string());
1309                    next = Some(cp.clone());
1310                    break;
1311                }
1312            }
1313        }
1314        match next {
1315            Some(p) => prefix = p,
1316            None => break,
1317        }
1318    }
1319    keys
1320}
1321
1322/// Original local-parquet code path — sync file I/O. We set a large reader
1323/// batch size so wide schemas (hundreds of columns) don't pay per-array
1324/// metadata overhead on thousands of small (default 1024-row) batches.
1325///
1326/// Two memory-saving knobs are applied here:
1327///
1328/// * **Column projection** — if `d.columns` is non-empty, only those
1329///   columns are decoded; everything else is skipped at the parquet reader
1330///   level (no Arrow array is ever allocated for the dropped columns).
1331/// * **Dictionary preservation** — Utf8 columns whose parquet column chunks
1332///   carry a dictionary page are materialised as Arrow
1333///   `Dictionary(Int32, Utf8)` instead of plain `Utf8`. Low-cardinality
1334///   string columns (state, country, severity, …) stay represented as
1335///   `n_unique` string slots plus an Int32 index per row instead of
1336///   `n_rows` independent strings — typically 10×–50× smaller for
1337///   real-world data.
1338fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1339    let files = d.resolve_local_parquet_files()?;
1340    let mut all = Vec::new();
1341    let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1342        None
1343    } else {
1344        Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1345    };
1346
1347    for f in &files {
1348        let file = std::fs::File::open(f)
1349            .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1350
1351        // First pass: peek the parquet metadata + default Arrow schema so we
1352        // can (a) decide a column projection and (b) override Utf8 columns
1353        // that are dictionary-encoded in the file so the reader materialises
1354        // them as Arrow Dictionary arrays instead of expanding to plain Utf8.
1355        let probe = ParquetRecordBatchReaderBuilder::try_new(
1356            file.try_clone()
1357                .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1358        )?;
1359        let parquet_schema = probe.parquet_schema().clone();
1360        let arrow_schema = probe.schema().clone();
1361        let metadata = probe.metadata().clone();
1362        drop(probe);
1363
1364        // Column projection (top-level / leaf indices for flat schemas).
1365        let projection = if let Some(w) = &wanted {
1366            let indices: Vec<usize> = arrow_schema
1367                .fields()
1368                .iter()
1369                .enumerate()
1370                .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1371                .map(|(i, _)| i)
1372                .collect();
1373            if indices.is_empty() {
1374                return Err(AppError::Internal(format!(
1375                    "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1376                    d.name,
1377                    d.columns,
1378                    f.display()
1379                )));
1380            }
1381            ProjectionMask::roots(&parquet_schema, indices)
1382        } else {
1383            ProjectionMask::all()
1384        };
1385
1386        // Dictionary override: any Utf8 column whose first row group carries
1387        // a dictionary page is re-typed to Dictionary(Int32, Utf8). The
1388        // override schema must still describe every column in the parquet
1389        // file (projection is applied separately). Skipped entirely when
1390        // the dataset has `dict_encode = false` — escape hatch for cases
1391        // where the override interacts badly with null propagation in the
1392        // downstream engine.
1393        let mut new_fields: Vec<Field> = arrow_schema
1394            .fields()
1395            .iter()
1396            .map(|f| f.as_ref().clone())
1397            .collect();
1398        if d.dict_encode
1399            && let Some(rg0) = metadata.row_groups().first()
1400        {
1401            for (i, fld) in arrow_schema.fields().iter().enumerate() {
1402                if !matches!(
1403                    fld.data_type(),
1404                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1405                ) {
1406                    continue;
1407                }
1408                if let Some(col) = rg0.columns().get(i)
1409                    && col.dictionary_page_offset().is_some()
1410                {
1411                    new_fields[i] = Field::new(
1412                        fld.name(),
1413                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1414                        fld.is_nullable(),
1415                    );
1416                }
1417            }
1418        }
1419        let forced_schema = Arc::new(Schema::new(new_fields));
1420
1421        let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1422        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1423            .with_batch_size(65_536)
1424            .with_projection(projection)
1425            .build()?;
1426        // Hive-style partition columns (`city=NYC/…`) live in the path, not
1427        // the file. Fold them in as constant Utf8 columns so they show up in
1428        // the schema and are queryable — matching the DuckDB backend.
1429        let pairs = hive_pairs(f);
1430        for batch in reader {
1431            let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1432            all.push(if pairs.is_empty() {
1433                batch
1434            } else {
1435                append_partition_cols(&batch, &pairs)?
1436            });
1437        }
1438    }
1439    if all.is_empty() {
1440        return Err(AppError::Internal(format!(
1441            "dataset '{}': parquet source is empty",
1442            d.name
1443        )));
1444    }
1445    Ok(all)
1446}
1447
1448/// Ordered hive-style partition `(key, value)` pairs encoded in a path, i.e.
1449/// directory components shaped like `key=value` (e.g. `year=2024/city=NYC`).
1450fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1451    path.components()
1452        .filter_map(|c| c.as_os_str().to_str())
1453        .filter_map(|seg| {
1454            let (k, v) = seg.split_once('=')?;
1455            if k.is_empty() || v.is_empty() || v.contains('=') {
1456                return None;
1457            }
1458            Some((k.to_string(), v.to_string()))
1459        })
1460        .collect()
1461}
1462
1463/// Append constant Utf8 columns for each hive partition pair. A partition
1464/// key that collides with a real file column is skipped (the file wins).
1465fn append_partition_cols(
1466    batch: &RecordBatch,
1467    pairs: &[(String, String)],
1468) -> Result<RecordBatch, AppError> {
1469    let n = batch.num_rows();
1470    let mut fields: Vec<Field> = batch
1471        .schema()
1472        .fields()
1473        .iter()
1474        .map(|f| f.as_ref().clone())
1475        .collect();
1476    let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1477    for (k, v) in pairs {
1478        if fields.iter().any(|f| f.name() == k) {
1479            continue;
1480        }
1481        fields.push(Field::new(k, DataType::Utf8, false));
1482        cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1483    }
1484    RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1485        .map_err(|e| AppError::Internal(e.to_string()))
1486}
1487
1488/// Register an `AmazonS3` object store on the SessionContext, build a
1489/// `ListingTable` (with any hive partition columns) over the dataset prefix,
1490/// and stream the whole dataset back through `DataFrame::collect`. Using a
1491/// ListingTable here — rather than a bare `read_parquet` — means S3 sources
1492/// get the same hive-partition handling as local parquet.
1493async fn read_s3_parquet(
1494    d: &DatasetConfig,
1495    ctx: &SessionContext,
1496) -> Result<Vec<RecordBatch>, AppError> {
1497    register_s3_object_store(d, ctx)?;
1498    let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1499    let df = ctx
1500        .read_table(provider)
1501        .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1502    Ok(df.collect().await?)
1503}
1504
1505/// Open a Delta table (local or S3) and return the deltalake `DeltaTable`
1506/// handle. The transaction log is read here to resolve the current file
1507/// list + schema; the table's object store is registered on the session
1508/// `RuntimeEnv` lazily at scan time, so callers don't need to register a
1509/// store themselves. A location that doesn't exist, is empty, or was never
1510/// committed maps to [`AppError::EmptyDataset`] so startup can log-and-skip.
1511async fn open_delta_table(
1512    d: &DatasetConfig,
1513    opts: HashMap<String, String>,
1514) -> Result<deltalake::DeltaTable, AppError> {
1515    let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1516        AppError::Internal(format!(
1517            "dataset '{}': bad delta location '{}': {e}",
1518            d.name, d.source.location
1519        ))
1520    })?;
1521    deltalake::open_table_with_storage_options(url, opts)
1522        .await
1523        .map_err(|e| {
1524            // A Delta location that doesn't exist, is empty, or was never
1525            // committed has no files in its log segment. deltalake (kernel
1526            // 0.32) surfaces every one of these as:
1527            //   "Not a Delta table: Generic delta kernel error: No files in
1528            //    log segment"
1529            // — covering a missing local dir AND a non-existent S3 prefix
1530            // alike. Match case-insensitively (the kernel capitalises the
1531            // phrases) and treat it like any other empty dataset so startup
1532            // logs and skips instead of aborting the whole process.
1533            let msg = e.to_string();
1534            let low = msg.to_lowercase();
1535            if low.contains("no files in log segment") || low.contains("not a delta table") {
1536                AppError::EmptyDataset(format!(
1537                    "delta location '{}' has no committed files ({msg})",
1538                    d.source.location
1539                ))
1540            } else {
1541                AppError::Internal(format!(
1542                    "dataset '{}': delta open '{}': {msg}",
1543                    d.name, d.source.location
1544                ))
1545            }
1546        })
1547}
1548
1549/// Open a Delta table (local or S3) and return its DataFusion
1550/// `TableProvider`. The provider reads the transaction log to resolve the
1551/// current file list and registers the table's object store on the session
1552/// `RuntimeEnv` lazily at scan time, so callers don't need to register a
1553/// store themselves.
1554async fn open_delta_provider(
1555    d: &DatasetConfig,
1556    opts: HashMap<String, String>,
1557) -> Result<Arc<dyn TableProvider>, AppError> {
1558    let table = open_delta_table(d, opts).await?;
1559    table.table_provider().await.map_err(|e| {
1560        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1561    })
1562}
1563
1564/// Resolve the deltalake storage-options for a dataset: empty for local
1565/// tables, S3 credentials/endpoint for S3-backed ones.
1566fn delta_storage_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1567    if d.source.is_s3() {
1568        delta_s3_options(d)
1569    } else {
1570        Ok(HashMap::new())
1571    }
1572}
1573
1574/// Open a Delta table (local or S3) and stream every row back as a Vec of
1575/// `RecordBatch`. We materialise eagerly so the rest of the backend can
1576/// treat all datasets uniformly (single in-memory batch + eq-index).
1577async fn read_delta(
1578    d: &DatasetConfig,
1579    opts: HashMap<String, String>,
1580) -> Result<Vec<RecordBatch>, AppError> {
1581    let provider = open_delta_provider(d, opts).await?;
1582    // Drive a full scan via a throwaway SessionContext so we end up with
1583    // an in-memory Vec<RecordBatch> the shared materialise path can use.
1584    //
1585    // A Delta table can open cleanly (valid log + schema) yet still fail to
1586    // scan: its add actions may reference data files that no longer exist in
1587    // storage (e.g. vacuumed away), or are otherwise unreadable. Rather than
1588    // abort the whole registry for one broken table, map scan failures to
1589    // `EmptyDataset` so `Store::load` logs and skips it — mirroring the
1590    // bounded-probe behaviour on the lazy path.
1591    let scan_ctx = SessionContext::new();
1592    let df = scan_ctx.read_table(provider).map_err(|e| {
1593        AppError::EmptyDataset(format!(
1594            "delta location '{}' could not be scanned, skipping ({e})",
1595            d.source.location
1596        ))
1597    })?;
1598    df.collect().await.map_err(|e| {
1599        AppError::EmptyDataset(format!(
1600            "delta location '{}' could not be scanned, skipping ({e})",
1601            d.source.location
1602        ))
1603    })
1604}
1605
1606/// Build a lazy state + deltalake `TableProvider` for a Delta dataset
1607/// (local or S3). The table is never read into RAM: deltalake reads the
1608/// transaction log once here to resolve the file list and discovery schema,
1609/// then streams parquet row groups on each query (with predicate pushdown
1610/// and Delta file skipping). The returned `DatasetState.data` is empty.
1611async fn build_lazy_delta(
1612    d: &DatasetConfig,
1613    _ctx: &SessionContext,
1614) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1615    let table = open_delta_table(d, delta_storage_options(d)?).await?;
1616
1617    // An empty Delta table carries a valid schema in its log but resolves to
1618    // zero data files. The eager path catches this naturally (a full scan
1619    // yields no batches), but the lazy path never scans — so check the file
1620    // list explicitly and skip, matching the eager behaviour. Without this an
1621    // empty Delta table would register as a 0-row dataset and show up in
1622    // discovery / explore.
1623    let file_count = table
1624        .get_file_uris()
1625        .map(|it| it.count())
1626        .map_err(|e| AppError::Internal(format!("dataset '{}': delta file list: {e}", d.name)))?;
1627    if file_count == 0 {
1628        return Err(AppError::EmptyDataset(format!(
1629            "delta location '{}' has a schema but no data files",
1630            d.source.location
1631        )));
1632    }
1633
1634    let provider = table.table_provider().await.map_err(|e| {
1635        AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1636    })?;
1637
1638    // The file-list check above only catches a Delta table with *no* data
1639    // files. A table can still be effectively empty in ways the log doesn't
1640    // make obvious: its add actions may reference files that contain zero
1641    // rows, or files that no longer exist in storage (e.g. vacuumed). Those
1642    // register fine here but then return nothing — or hard-error — the moment
1643    // they're queried, leaving a broken dataset visible in discovery/explore.
1644    //
1645    // The eager path catches all of this for free (a full scan yields no rows
1646    // or surfaces the read error up front), but the lazy path never scans. So
1647    // probe with a bounded one-row scan: it reads at most a single row group
1648    // from a single file, costing almost nothing on a healthy table, but lets
1649    // us log-and-skip an empty or unreadable table instead of registering it.
1650    {
1651        let probe_ctx = SessionContext::new();
1652        let probe = probe_ctx
1653            .read_table(provider.clone())
1654            .and_then(|df| df.limit(0, Some(1)));
1655        match probe {
1656            Ok(df) => match df.collect().await {
1657                Ok(batches) if batches.iter().all(|b| b.num_rows() == 0) => {
1658                    return Err(AppError::EmptyDataset(format!(
1659                        "delta location '{}' resolves to no rows",
1660                        d.source.location
1661                    )));
1662                }
1663                Ok(_) => {}
1664                Err(e) => {
1665                    return Err(AppError::EmptyDataset(format!(
1666                        "delta location '{}' could not be scanned, skipping ({e})",
1667                        d.source.location
1668                    )));
1669                }
1670            },
1671            Err(e) => {
1672                return Err(AppError::EmptyDataset(format!(
1673                    "delta location '{}' could not be scanned, skipping ({e})",
1674                    d.source.location
1675                )));
1676            }
1677        }
1678    }
1679
1680    // The provider's schema is the full logical schema (data + partition
1681    // columns), which is exactly the discovery schema we want.
1682    let arrow_sch = provider.schema();
1683    let columns: Vec<ColumnInfo> = arrow_sch
1684        .fields()
1685        .iter()
1686        .map(|f| {
1687            let dt = f.data_type();
1688            ColumnInfo {
1689                name: f.name().clone(),
1690                logical: arrow_to_logical(dt),
1691                sql_type: format!("{dt:?}"),
1692                nullable: f.is_nullable(),
1693            }
1694        })
1695        .collect();
1696    let schema = DatasetSchema::new(&d.name, columns)
1697        .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1698
1699    log::info!(
1700        "dataset '{}' [{}, lazy]: {} cols, no materialise, no index",
1701        d.name,
1702        d.source.kind.as_str(),
1703        schema.columns.len()
1704    );
1705
1706    Ok((
1707        DatasetState {
1708            schema,
1709            data: Vec::new(),
1710            arrow_schema: arrow_sch,
1711            index: EqIndex::default(),
1712            lazy: true,
1713        },
1714        provider,
1715    ))
1716}
1717
1718/// Build the storage-options HashMap that `deltalake::open_table_with_storage_options`
1719/// expects for S3 access. Keys mirror the AWS env-var names; deltalake
1720/// passes them through to object_store internally.
1721fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1722    let creds = d.resolved_creds();
1723    let region = d.resolved_region();
1724    let s3 = d.s3.clone().unwrap_or_default();
1725    let (bucket, _) = d.source.s3_bucket()?;
1726
1727    let mut opts = HashMap::new();
1728    opts.insert("AWS_REGION".into(), region);
1729    if let Some(ep) = s3.effective_endpoint(bucket) {
1730        opts.insert("AWS_ENDPOINT_URL".into(), ep);
1731    }
1732    if s3.allow_http {
1733        opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1734    }
1735    opts.insert(
1736        "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1737        (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1738    );
1739    if let Some(k) = creds.access_key_id {
1740        opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1741    }
1742    if let Some(s) = creds.secret_access_key {
1743        opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1744    }
1745    if let Some(t) = creds.session_token {
1746        opts.insert("AWS_SESSION_TOKEN".into(), t);
1747    }
1748    // Read-only paths don't need the S3 lock-provider plumbing.
1749    opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1750    Ok(opts)
1751}
1752
1753/// Construct an `AmazonS3` object_store from the dataset's `[dataset.s3]`
1754/// block + resolved credentials and register it on `ctx` under
1755/// `s3://bucket/`.
1756fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1757    let (bucket, _key) = d.source.s3_bucket()?;
1758    let creds = d.resolved_creds();
1759    let region = d.resolved_region();
1760    let s3 = d.s3.clone().unwrap_or_default();
1761
1762    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1763        AppError::Internal(format!(
1764            "dataset '{}': build S3 store for '{bucket}': {e}",
1765            d.name
1766        ))
1767    })?;
1768
1769    let url = Url::parse(&format!("s3://{bucket}"))
1770        .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1771    ctx.register_object_store(&url, Arc::new(store));
1772    Ok(())
1773}
1774
1775/// Best-effort detection of an S3 authorization failure (HTTP 403) from an
1776/// error message. object_store, the AWS SDK, deltalake, and MinIO all
1777/// surface a denied read as some mix of "Access Denied" / "AccessDenied"
1778/// (the S3 error code), "Forbidden", or a bare "403" in the rendered error
1779/// chain. Matched case-insensitively. Only consulted for sources we already
1780/// know are S3, so a stray "forbidden" in unrelated text can't misfire.
1781fn is_s3_access_denied(msg: &str) -> bool {
1782    let low = msg.to_lowercase();
1783    low.contains("access denied")
1784        || low.contains("accessdenied")
1785        || low.contains("forbidden")
1786        || low.contains("403")
1787}
1788
1789
1790///
1791/// Local sources are sized with a cheap filesystem stat (`estimate_local_bytes`);
1792/// S3 sources are sized by listing the object store under their prefix. S3
1793/// sizing is best-effort: a listing failure logs a warning and returns `None`
1794/// (don't force) so a transient S3 error never blocks startup.
1795async fn should_force_lazy(d: &DatasetConfig, server: &ServerConfig) -> Option<u64> {
1796    if d.lazy || server.force_lazy_above_mb == 0 {
1797        return None;
1798    }
1799    let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1800
1801    let bytes = if d.source.is_s3() {
1802        match estimate_s3_bytes(d).await {
1803            Ok(b) => b,
1804            Err(e) => {
1805                log::warn!(
1806                    "dataset '{}': could not measure S3 size for force_lazy_above_mb: {e}",
1807                    d.name
1808                );
1809                return None;
1810            }
1811        }
1812    } else {
1813        d.estimate_local_bytes()?
1814    };
1815
1816    (bytes > threshold).then_some(bytes)
1817}
1818
1819/// Sum the byte size of a dataset's S3 backing data by listing the object
1820/// store under the source prefix and adding up every `.parquet` object.
1821///
1822/// Works for both parquet and delta sources: delta data files are parquet
1823/// objects under the table root, so listing the root and counting parquet
1824/// gives a coarse (possibly over-counting un-vacuumed files) but cheap size
1825/// suitable for the force-lazy gate. Any glob wildcard tail is stripped so the
1826/// listing starts at the non-wildcard base prefix.
1827async fn estimate_s3_bytes(d: &DatasetConfig) -> Result<u64, AppError> {
1828    use futures_util::StreamExt;
1829    use object_store::ObjectStore;
1830
1831    let (bucket, _key) = d.source.s3_bucket()?;
1832    let creds = d.resolved_creds();
1833    let region = d.resolved_region();
1834    let s3 = d.s3.clone().unwrap_or_default();
1835    let store = build_s3(bucket, &region, &s3, &creds).map_err(|e| {
1836        AppError::Internal(format!(
1837            "dataset '{}': build S3 store for '{bucket}': {e}",
1838            d.name
1839        ))
1840    })?;
1841
1842    // Strip any glob wildcard tail, then reduce the base to a bucket-relative
1843    // key for object_store's `list` (which takes a key prefix, not a URL).
1844    let (base, _keys) = split_glob_base_keys(&d.source.location);
1845    let prefix_key = base
1846        .strip_prefix("s3://")
1847        .and_then(|rest| rest.split_once('/').map(|(_bucket, key)| key))
1848        .unwrap_or("")
1849        .trim_end_matches('/');
1850    let prefix =
1851        (!prefix_key.is_empty()).then(|| object_store::path::Path::from(prefix_key));
1852
1853    let mut total: u64 = 0;
1854    let mut stream = store.list(prefix.as_ref());
1855    while let Some(meta) = stream.next().await {
1856        let meta = meta.map_err(|e| {
1857            AppError::Internal(format!(
1858                "dataset '{}': s3 list under '{prefix_key}': {e}",
1859                d.name
1860            ))
1861        })?;
1862        if meta.location.as_ref().ends_with(".parquet") {
1863            total = total.saturating_add(meta.size);
1864        }
1865    }
1866    Ok(total)
1867}
1868
1869fn build_s3(
1870    bucket: &str,
1871    region: &str,
1872    s3: &S3Config,
1873    creds: &ResolvedCreds,
1874) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1875    let mut b = AmazonS3Builder::new()
1876        .with_bucket_name(bucket)
1877        .with_region(region)
1878        .with_allow_http(s3.allow_http)
1879        .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1880    if let Some(ep) = s3.effective_endpoint(bucket) {
1881        b = b.with_endpoint(ep);
1882    }
1883    if let Some(k) = creds.access_key_id.as_deref() {
1884        b = b.with_access_key_id(k);
1885    }
1886    if let Some(s) = creds.secret_access_key.as_deref() {
1887        b = b.with_secret_access_key(s);
1888    }
1889    if let Some(t) = creds.session_token.as_deref() {
1890        b = b.with_token(t);
1891    }
1892    b.build()
1893}
1894
1895fn arrow_to_logical(dt: &DataType) -> LogicalType {
1896    match dt {
1897        DataType::Boolean => LogicalType::Bool,
1898        DataType::Int8
1899        | DataType::Int16
1900        | DataType::Int32
1901        | DataType::Int64
1902        | DataType::UInt8
1903        | DataType::UInt16
1904        | DataType::UInt32
1905        | DataType::UInt64 => LogicalType::Int,
1906        DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1907        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1908        // Dictionary-encoded strings are reported as plain strings — clients
1909        // (and the rest of the backend) shouldn't have to care that we keep
1910        // a compressed representation in memory.
1911        DataType::Dictionary(_, v)
1912            if matches!(
1913                v.as_ref(),
1914                DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1915            ) =>
1916        {
1917            LogicalType::Utf8
1918        }
1919        DataType::Date32
1920        | DataType::Date64
1921        | DataType::Time32(_)
1922        | DataType::Time64(_)
1923        | DataType::Timestamp(_, _)
1924        | DataType::Duration(_)
1925        | DataType::Interval(_) => LogicalType::Temporal,
1926        _ => LogicalType::Other,
1927    }
1928}
1929
1930// ---------------------------------------------------------------------------
1931// Per-batch projection
1932// ---------------------------------------------------------------------------
1933
1934fn project(
1935    schema: &DatasetSchema,
1936    batch: RecordBatch,
1937    columns: &[String],
1938) -> Result<RecordBatch, AppError> {
1939    if columns.is_empty() {
1940        return Ok(batch);
1941    }
1942    let indices: Vec<usize> = columns
1943        .iter()
1944        .map(|c| {
1945            schema
1946                .find(c)
1947                .map(|info| schema.by_name[&info.name.to_lowercase()])
1948        })
1949        .collect::<Result<_, _>>()?;
1950    let fields: Vec<Field> = indices
1951        .iter()
1952        .map(|&i| batch.schema().field(i).clone())
1953        .collect();
1954    let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1955    Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1956}
1957
1958// ---------------------------------------------------------------------------
1959// SQL builder
1960// ---------------------------------------------------------------------------
1961
1962/// Accumulates the typed literal values for a parameterised query.
1963///
1964/// Predicate values are never interpolated into the SQL text. Instead each
1965/// value is pushed here and the builder emits a positional placeholder
1966/// (`$1`, `$2`, …) referencing it. The collected [`ScalarValue`]s are bound
1967/// to the logical plan via [`DataFrame::with_param_values`], so user input
1968/// reaches the engine as typed scalars and can never alter the query
1969/// structure (no SQL injection surface, no escaping to get wrong).
1970#[derive(Default)]
1971struct Params {
1972    values: Vec<ScalarValue>,
1973}
1974
1975impl Params {
1976    fn new() -> Self {
1977        Self::default()
1978    }
1979
1980    /// Bind `v` and return its `$N` placeholder token.
1981    fn bind(&mut self, v: ScalarValue) -> String {
1982        self.values.push(v);
1983        format!("${}", self.values.len())
1984    }
1985
1986    fn into_values(self) -> Vec<ScalarValue> {
1987        self.values
1988    }
1989}
1990
1991fn build_query_sql(
1992    schema: &DatasetSchema,
1993    req: &QueryRequest,
1994    max_page_size: u64,
1995) -> Result<(String, Vec<ScalarValue>), AppError> {
1996    let (limit, offset) = req.effective_limit_offset(max_page_size);
1997    build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1998}
1999
2000fn build_query_stream_sql(
2001    schema: &DatasetSchema,
2002    req: &QueryRequest,
2003) -> Result<(String, Vec<ScalarValue>), AppError> {
2004    let suffix = req
2005        .limit
2006        .map(|limit| format!(" LIMIT {limit}"))
2007        .unwrap_or_default();
2008    build_query_sql_with_suffix(schema, req, &suffix)
2009}
2010
2011fn build_query_sql_with_suffix(
2012    schema: &DatasetSchema,
2013    req: &QueryRequest,
2014    suffix: &str,
2015) -> Result<(String, Vec<ScalarValue>), AppError> {
2016    let agg_plan = req.agg_plan(schema)?;
2017
2018    let cols = if let Some(plan) = &agg_plan {
2019        // Group cols, then aggregations, each aliased to the JSON output key.
2020        let mut parts: Vec<String> = plan
2021            .group_cols
2022            .iter()
2023            .map(|c| DatasetSchema::quote_ident(c))
2024            .collect();
2025        for a in &plan.aggs {
2026            let expr = a.sql_expr()?;
2027            parts.push(format!(
2028                "{expr} AS {}",
2029                DatasetSchema::quote_ident(&a.alias)
2030            ));
2031        }
2032        parts.join(", ")
2033    } else if req.columns.is_empty() {
2034        if req.distinct {
2035            "DISTINCT *".to_string()
2036        } else {
2037            "*".to_string()
2038        }
2039    } else {
2040        let list = req
2041            .columns
2042            .iter()
2043            .map(|c| {
2044                schema
2045                    .find(c)
2046                    .map(|info| DatasetSchema::quote_ident(&info.name))
2047            })
2048            .collect::<Result<Vec<_>, _>>()?
2049            .join(", ");
2050        if req.distinct {
2051            format!("DISTINCT {list}")
2052        } else {
2053            list
2054        }
2055    };
2056
2057    let mut params = Params::new();
2058    let clauses: Vec<String> = req
2059        .predicates
2060        .iter()
2061        .map(|p| pred_to_sql(schema, p, &mut params))
2062        .collect::<Result<_, _>>()?;
2063
2064    let table = DatasetSchema::quote_ident(&schema.name);
2065    let where_clause = if clauses.is_empty() {
2066        String::new()
2067    } else {
2068        format!(" WHERE {}", clauses.join(" AND "))
2069    };
2070    let group_clause = match &agg_plan {
2071        Some(p) => format!(
2072            " GROUP BY {}",
2073            p.group_cols
2074                .iter()
2075                .map(|c| DatasetSchema::quote_ident(c))
2076                .collect::<Vec<_>>()
2077                .join(", "),
2078        ),
2079        None => String::new(),
2080    };
2081    let having_clause = {
2082        let resolved = req.having_plan(agg_plan.as_ref())?;
2083        if resolved.is_empty() {
2084            String::new()
2085        } else {
2086            let clauses: Vec<String> = resolved
2087                .iter()
2088                .map(|(lhs, p)| pred_to_sql_with_lhs(lhs, p, &mut params))
2089                .collect::<Result<_, _>>()?;
2090            format!(" HAVING {}", clauses.join(" AND "))
2091        }
2092    };
2093    let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
2094        Some(s) => format!(" ORDER BY {s}"),
2095        None => String::new(),
2096    };
2097    let sql =
2098        format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{having_clause}{order_clause}{suffix}");
2099    Ok((sql, params.into_values()))
2100}
2101
2102fn build_count_sql(
2103    schema: &DatasetSchema,
2104    predicates: &[Predicate],
2105) -> Result<(String, Vec<ScalarValue>), AppError> {
2106    let mut params = Params::new();
2107    let clauses: Vec<String> = predicates
2108        .iter()
2109        .map(|p| pred_to_sql(schema, p, &mut params))
2110        .collect::<Result<_, _>>()?;
2111    let table = DatasetSchema::quote_ident(&schema.name);
2112    let where_clause = if clauses.is_empty() {
2113        String::new()
2114    } else {
2115        format!(" WHERE {}", clauses.join(" AND "))
2116    };
2117    let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
2118    Ok((sql, params.into_values()))
2119}
2120
2121fn pred_to_sql(
2122    schema: &DatasetSchema,
2123    pred: &Predicate,
2124    params: &mut Params,
2125) -> Result<String, AppError> {
2126    let info = schema.find(&pred.col)?;
2127    let col = DatasetSchema::quote_ident(&info.name);
2128    pred_to_sql_with_lhs(&col, pred, params)
2129}
2130
2131/// Render a predicate against a pre-resolved left-hand-side SQL
2132/// expression. The dataset-`WHERE` path resolves a quoted column name as
2133/// the LHS (see [`pred_to_sql`]); the `HAVING` path passes an aggregate
2134/// expression such as `COUNT(*)` instead. Both share the operator and
2135/// value-binding logic here.
2136fn pred_to_sql_with_lhs(
2137    col: &str,
2138    pred: &Predicate,
2139    params: &mut Params,
2140) -> Result<String, AppError> {
2141    match pred.op.as_str() {
2142        "is_null" => return Ok(format!("{col} IS NULL")),
2143        "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
2144        _ => {}
2145    }
2146
2147    let val = pred
2148        .val
2149        .as_ref()
2150        .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
2151
2152    if pred.op == "in" {
2153        let items = val
2154            .as_array()
2155            .filter(|a| !a.is_empty())
2156            .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
2157        let placeholders: Vec<String> = items
2158            .iter()
2159            .map(|item| Ok(params.bind(json_to_scalar(item)?)))
2160            .collect::<Result<_, AppError>>()?;
2161        return Ok(format!("{col} IN ({})", placeholders.join(", ")));
2162    }
2163
2164    let sql_op = match pred.op.as_str() {
2165        "eq" => "=",
2166        "neq" => "!=",
2167        "gt" => ">",
2168        "gte" => ">=",
2169        "lt" => "<",
2170        "lte" => "<=",
2171        "like" => "LIKE",
2172        "ilike" => "ILIKE",
2173        other => return Err(AppError::UnknownOperator(other.into())),
2174    };
2175    let placeholder = params.bind(json_to_scalar(val)?);
2176    Ok(format!("{col} {sql_op} {placeholder}"))
2177}
2178
2179/// Convert a JSON predicate value into a typed Arrow [`ScalarValue`] for
2180/// binding as a query parameter. The engine applies the usual numeric
2181/// widening / comparison coercion against the target column type.
2182fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
2183    match val {
2184        JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
2185        JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
2186        JsonValue::Null => Ok(ScalarValue::Null),
2187        JsonValue::Number(n) => {
2188            if let Some(i) = n.as_i64() {
2189                Ok(ScalarValue::Int64(Some(i)))
2190            } else if let Some(u) = n.as_u64() {
2191                Ok(ScalarValue::UInt64(Some(u)))
2192            } else if let Some(f) = n.as_f64() {
2193                Ok(ScalarValue::Float64(Some(f)))
2194            } else {
2195                Err(AppError::InvalidValue(
2196                    "unsupported numeric literal in predicate".into(),
2197                ))
2198            }
2199        }
2200        _ => Err(AppError::InvalidValue(
2201            "unsupported literal type in predicate".into(),
2202        )),
2203    }
2204}
2205
2206// ---------------------------------------------------------------------------
2207// Equality index — built once at startup, queried on every predicate request
2208// ---------------------------------------------------------------------------
2209
2210fn json_index_key(val: &JsonValue) -> Option<String> {
2211    match val {
2212        JsonValue::String(s) => Some(s.clone()),
2213        JsonValue::Number(n) => Some(n.to_string()),
2214        JsonValue::Bool(b) => Some(b.to_string()),
2215        _ => None,
2216    }
2217}
2218
2219fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2220    let mut out = Vec::new();
2221    let (mut i, mut j) = (0, 0);
2222    while i < a.len() && j < b.len() {
2223        match a[i].cmp(&b[j]) {
2224            Ordering::Equal => {
2225                out.push(a[i]);
2226                i += 1;
2227                j += 1;
2228            }
2229            Ordering::Less => i += 1,
2230            Ordering::Greater => j += 1,
2231        }
2232    }
2233    out
2234}
2235
2236fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2237    let mut out = Vec::with_capacity(a.len() + b.len());
2238    let (mut i, mut j) = (0, 0);
2239    while i < a.len() && j < b.len() {
2240        match a[i].cmp(&b[j]) {
2241            Ordering::Less => {
2242                out.push(a[i]);
2243                i += 1;
2244            }
2245            Ordering::Greater => {
2246                out.push(b[j]);
2247                j += 1;
2248            }
2249            Ordering::Equal => {
2250                out.push(a[i]);
2251                i += 1;
2252                j += 1;
2253            }
2254        }
2255    }
2256    out.extend_from_slice(&a[i..]);
2257    out.extend_from_slice(&b[j..]);
2258    out
2259}
2260
2261fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2262    if predicates.is_empty() || index.is_empty() {
2263        return None;
2264    }
2265
2266    // Fast path: a single `eq` predicate resolves to exactly one index bucket,
2267    // so borrow it directly instead of cloning the row-id vector.
2268    if let [pred] = predicates
2269        && pred.op.as_str() == "eq"
2270    {
2271        let col_lower = pred.col.to_lowercase();
2272        let col_map = index.get(&col_lower)?;
2273        let key = json_index_key(pred.val.as_ref()?)?;
2274        return Some(match col_map.get(&key) {
2275            Some(rows) => Cow::Borrowed(rows.as_slice()),
2276            None => Cow::Owned(Vec::new()),
2277        });
2278    }
2279
2280    let mut result: Option<Vec<u32>> = None;
2281    for pred in predicates {
2282        let col_lower = pred.col.to_lowercase();
2283        let col_map = index.get(&col_lower)?;
2284
2285        let rows: Vec<u32> = match pred.op.as_str() {
2286            "eq" => {
2287                let key = json_index_key(pred.val.as_ref()?)?;
2288                col_map.get(&key).cloned().unwrap_or_default()
2289            }
2290            "in" => {
2291                let items = pred.val.as_ref()?.as_array()?;
2292                let mut merged: Vec<u32> = Vec::new();
2293                for item in items {
2294                    if let Some(r) = col_map.get(&json_index_key(item)?) {
2295                        merged = union_sorted(&merged, r);
2296                    }
2297                }
2298                merged
2299            }
2300            _ => return None,
2301        };
2302
2303        result = Some(match result {
2304            None => rows,
2305            Some(r) => intersect_sorted(&r, &rows),
2306        });
2307    }
2308    result.map(Cow::Owned)
2309}
2310
2311/// Benchmark-only hooks for the equality-index hot path. Hidden from docs and
2312/// not part of the stable API — exposed solely so `benches/index.rs` can build
2313/// an `EqIndex` and exercise `try_index` directly.
2314#[doc(hidden)]
2315pub mod bench {
2316    use super::{EqIndex, FastMap, json_index_key, try_index};
2317    use datapress_core::models::Predicate;
2318    use serde_json::Value as JsonValue;
2319    use std::borrow::Cow;
2320
2321    /// Opaque equality index wrapper for benches (the inner alias is private).
2322    pub struct BenchIndex(EqIndex);
2323
2324    /// Build an index with a single `col` whose `val` bucket holds `rows`.
2325    /// The bucket key is derived with the same `json_index_key` the query path
2326    /// uses, so a matching `eq` predicate is guaranteed to hit.
2327    pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
2328        let key = json_index_key(val).expect("benchable index key");
2329        let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
2330        col_map.insert(key, rows);
2331        let mut index: EqIndex = EqIndex::default();
2332        index.insert(col.to_string(), col_map);
2333        BenchIndex(index)
2334    }
2335
2336    /// Resolve `predicates` against the index — the timed operation.
2337    pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2338        try_index(&idx.0, predicates)
2339    }
2340
2341    /// Reference implementation of the pre-`Cow` behaviour: always clones the
2342    /// matched bucket into an owned `Vec` (what `try_index` did before the
2343    /// single-`eq` borrow fast path). Used as the `clone-before` baseline so
2344    /// the borrow win is measured against the old allocation cost in-process.
2345    pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
2346        let [pred] = predicates else { return None };
2347        if pred.op.as_str() != "eq" {
2348            return None;
2349        }
2350        let col_lower = pred.col.to_lowercase();
2351        let col_map = idx.0.get(&col_lower)?;
2352        let key = json_index_key(pred.val.as_ref()?)?;
2353        Some(col_map.get(&key).cloned().unwrap_or_default())
2354    }
2355}
2356
2357/// Return rows `[offset, offset+limit)` from a chunked dataset by slicing
2358/// the underlying batches (zero-copy) and concatenating the (small) page.
2359fn slice_global(
2360    chunks: &[RecordBatch],
2361    schema: &Arc<Schema>,
2362    offset: usize,
2363    limit: usize,
2364) -> Result<RecordBatch, AppError> {
2365    if limit == 0 || chunks.is_empty() {
2366        return Ok(RecordBatch::new_empty(schema.clone()));
2367    }
2368    let mut out = Vec::new();
2369    let mut to_skip = offset;
2370    let mut remaining = limit;
2371    for b in chunks {
2372        if remaining == 0 {
2373            break;
2374        }
2375        let n = b.num_rows();
2376        if to_skip >= n {
2377            to_skip -= n;
2378            continue;
2379        }
2380        let take = remaining.min(n - to_skip);
2381        out.push(b.slice(to_skip, take));
2382        to_skip = 0;
2383        remaining -= take;
2384    }
2385    if out.is_empty() {
2386        return Ok(RecordBatch::new_empty(schema.clone()));
2387    }
2388    compute::concat_batches(schema, out.iter()).map_err(AppError::from)
2389}
2390
2391/// Materialise the page `rows[offset..offset+limit]` from a chunked dataset.
2392/// Row ids are global (across the concatenation of all chunks). We map each
2393/// requested row to its (chunk, local-index), `take` per chunk, then stitch
2394/// the per-chunk results back together preserving the original row order.
2395fn take_page(
2396    chunks: &[RecordBatch],
2397    schema: &Arc<Schema>,
2398    rows: &[u32],
2399    offset: usize,
2400    limit: usize,
2401) -> Result<RecordBatch, AppError> {
2402    let start = offset.min(rows.len());
2403    let len = limit.min(rows.len() - start);
2404    if len == 0 || chunks.is_empty() {
2405        return Ok(RecordBatch::new_empty(schema.clone()));
2406    }
2407
2408    // Prefix-sum table: `offsets[i]` is the first global row id of chunk `i`,
2409    // and `offsets.last()` is the total row count.
2410    let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
2411    let mut acc: u32 = 0;
2412    offsets.push(0);
2413    for b in chunks {
2414        acc = acc
2415            .checked_add(b.num_rows() as u32)
2416            .expect("row count exceeds u32::MAX");
2417        offsets.push(acc);
2418    }
2419
2420    // Bucket each global row id into the chunk that contains it, remembering
2421    // the original output position so we can restore page order at the end.
2422    let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
2423    for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
2424        let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
2425        let local = gid - offsets[bi];
2426        buckets[bi].push((out_pos as u32, local));
2427    }
2428
2429    // Per-chunk take, recording the destination index for each emitted row.
2430    let mut takens: Vec<RecordBatch> = Vec::new();
2431    let mut dest: Vec<u32> = Vec::with_capacity(len);
2432    for (bi, bucket) in buckets.iter().enumerate() {
2433        if bucket.is_empty() {
2434            continue;
2435        }
2436        let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
2437        let cols: Vec<ArrayRef> = chunks[bi]
2438            .columns()
2439            .iter()
2440            .map(|c| {
2441                arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
2442                    .map_err(AppError::from)
2443            })
2444            .collect::<Result<_, _>>()?;
2445        takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
2446        dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
2447    }
2448
2449    // Stitch per-chunk results then permute to restore the requested order.
2450    let stitched = compute::concat_batches(schema, takens.iter())?;
2451    let mut inv = vec![0u32; len];
2452    for (i, &d) in dest.iter().enumerate() {
2453        inv[d as usize] = i as u32;
2454    }
2455    let perm = UInt32Array::from(inv);
2456    let cols: Vec<ArrayRef> = stitched
2457        .columns()
2458        .iter()
2459        .map(|c| {
2460            arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
2461                .map_err(AppError::from)
2462        })
2463        .collect::<Result<_, _>>()?;
2464    RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
2465}
2466
2467/// Build the equality index per the dataset's policy, against the chunked
2468/// representation. Row ids are global across the concatenation of all
2469/// chunks (so they remain compatible with `take_page` / `slice_global`).
2470fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
2471    use rayon::prelude::*;
2472
2473    if cfg.mode == IndexMode::None || chunks.is_empty() {
2474        return EqIndex::default();
2475    }
2476
2477    let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
2478        Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
2479    } else {
2480        None
2481    };
2482
2483    let max_card = if cfg.mode == IndexMode::Auto {
2484        Some(cfg.max_cardinality)
2485    } else {
2486        None
2487    };
2488
2489    // Per-chunk starting global row id.
2490    let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
2491    let mut acc: u32 = 0;
2492    for b in chunks {
2493        batch_offsets.push(acc);
2494        acc = acc
2495            .checked_add(b.num_rows() as u32)
2496            .expect("row count exceeds u32::MAX");
2497    }
2498
2499    let schema = chunks[0].schema();
2500
2501    schema
2502        .fields()
2503        .par_iter()
2504        .enumerate()
2505        .filter_map(|(ci, field)| {
2506            let col_lower = field.name().to_lowercase();
2507            if let Some(a) = &allow
2508                && !a.contains_key(&col_lower)
2509            {
2510                return None;
2511            }
2512
2513            // Only build for index-friendly types; skip everything else
2514            // up-front so we don't pay the per-chunk dispatch cost.
2515            let dtype = field.data_type();
2516            let dict_utf8 = matches!(dtype,
2517                DataType::Dictionary(k, v)
2518                    if matches!(k.as_ref(), DataType::Int32)
2519                    && matches!(v.as_ref(), DataType::Utf8));
2520            match dtype {
2521                DataType::Utf8
2522                | DataType::Utf8View
2523                | DataType::Boolean
2524                | DataType::Int8
2525                | DataType::Int16
2526                | DataType::Int32
2527                | DataType::Int64 => {}
2528                _ if dict_utf8 => {}
2529                _ => return None,
2530            }
2531
2532            let mut map: FastMap<String, Vec<u32>> = FastMap::default();
2533
2534            for (bi, batch) in chunks.iter().enumerate() {
2535                let base = batch_offsets[bi];
2536                let col = batch.column(ci);
2537
2538                macro_rules! index_col {
2539                    ($arr_ty:ty) => {{
2540                        let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2541                        for row in 0..arr.len() {
2542                            if arr.is_null(row) {
2543                                continue;
2544                            }
2545                            let key = arr.value(row).to_string();
2546                            let gid = base + row as u32;
2547                            if let Some(v) = map.get_mut(&key) {
2548                                v.push(gid);
2549                            } else {
2550                                if let Some(mc) = max_card {
2551                                    if map.len() >= mc {
2552                                        return None;
2553                                    }
2554                                }
2555                                map.insert(key, vec![gid]);
2556                            }
2557                        }
2558                    }};
2559                }
2560
2561                if dict_utf8 {
2562                    // Dictionary(Int32, Utf8): iterate keys + look up the
2563                    // string value from the (small) dictionary. We allocate
2564                    // the key string only when the value is new — repeated
2565                    // values reuse the existing HashMap entry by hash, but
2566                    // `HashMap::get_mut` still needs the key, so we use a
2567                    // borrowed lookup via `get` first to avoid the alloc.
2568                    let arr = col
2569                        .as_any()
2570                        .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2571                        )?;
2572                    let keys = arr.keys();
2573                    let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2574                    for row in 0..arr.len() {
2575                        if arr.is_null(row) {
2576                            continue;
2577                        }
2578                        let k = keys.value(row) as usize;
2579                        let s = values.value(k);
2580                        let gid = base + row as u32;
2581                        if let Some(v) = map.get_mut(s) {
2582                            v.push(gid);
2583                        } else {
2584                            if let Some(mc) = max_card
2585                                && map.len() >= mc
2586                            {
2587                                return None;
2588                            }
2589                            map.insert(s.to_string(), vec![gid]);
2590                        }
2591                    }
2592                } else {
2593                    match dtype {
2594                        DataType::Utf8 => index_col!(StringArray),
2595                        DataType::Utf8View => index_col!(StringViewArray),
2596                        DataType::Boolean => index_col!(BooleanArray),
2597                        DataType::Int8 => index_col!(Int8Array),
2598                        DataType::Int16 => index_col!(Int16Array),
2599                        DataType::Int32 => index_col!(Int32Array),
2600                        DataType::Int64 => index_col!(Int64Array),
2601                        _ => unreachable!(),
2602                    }
2603                }
2604            }
2605
2606            Some((col_lower, map))
2607        })
2608        .collect()
2609}
2610
2611// ---------------------------------------------------------------------------
2612// Serialise-time temporal cast: convert Timestamp/Date/Time columns to Utf8
2613// on the page batch right before JSON encoding. We deliberately do **not**
2614// pay this cost at load time — a `Date32` is 4 bytes per row, its ISO-8601
2615// rendering is ~10–24 bytes per row, and a wide dataset full of temporal
2616// columns would balloon resident RAM. The cast is applied per returned page
2617// after pagination, so the cost is paid only for rows the caller requested.
2618// ---------------------------------------------------------------------------
2619
2620/// Returns true for Arrow types that `write_value` can render directly. Any
2621/// type returning false is pre-cast to Utf8 in [`cast_for_serialize`] so the
2622/// JSON output is faithful rather than silently `null`.
2623fn writable_inline(dt: &DataType) -> bool {
2624    match dt {
2625        DataType::Utf8
2626        | DataType::LargeUtf8
2627        | DataType::Utf8View
2628        | DataType::Boolean
2629        | DataType::Int8
2630        | DataType::Int16
2631        | DataType::Int32
2632        | DataType::Int64
2633        | DataType::UInt8
2634        | DataType::UInt16
2635        | DataType::UInt32
2636        | DataType::UInt64
2637        | DataType::Float32
2638        | DataType::Float64
2639        | DataType::Decimal128(_, _)
2640        | DataType::Decimal256(_, _) => true,
2641        DataType::Dictionary(k, v)
2642            if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2643        {
2644            true
2645        }
2646        _ => false,
2647    }
2648}
2649
2650/// Cast any column whose dtype isn't directly writable by `write_value` to
2651/// `Utf8`, on the bounded page batch. Covers temporals (Timestamp/Date/Time)
2652/// — kept native in resident memory to save RAM — and also any exotic dtype
2653/// (Float16, Binary, List, Struct, Decimal-with-unsupported-precision, …)
2654/// so the JSON serializer never falls back to writing literal `null`.
2655fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2656    let schema = batch.schema();
2657    let to_cast: Vec<usize> = schema
2658        .fields()
2659        .iter()
2660        .enumerate()
2661        .filter_map(|(i, f)| {
2662            if writable_inline(f.data_type()) {
2663                None
2664            } else {
2665                Some(i)
2666            }
2667        })
2668        .collect();
2669    if to_cast.is_empty() {
2670        return Ok(batch.clone());
2671    }
2672    let new_fields: Vec<Field> = schema
2673        .fields()
2674        .iter()
2675        .enumerate()
2676        .map(|(i, f)| {
2677            if to_cast.contains(&i) {
2678                Field::new(f.name(), DataType::Utf8, f.is_nullable())
2679            } else {
2680                f.as_ref().clone()
2681            }
2682        })
2683        .collect();
2684    let new_schema = Arc::new(Schema::new(new_fields));
2685    let cols: Vec<ArrayRef> = batch
2686        .columns()
2687        .iter()
2688        .enumerate()
2689        .map(|(i, c)| {
2690            if to_cast.contains(&i) {
2691                compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2692            } else {
2693                Ok(c.clone())
2694            }
2695        })
2696        .collect::<Result<_, _>>()?;
2697    RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2698}
2699
2700// ---------------------------------------------------------------------------
2701// Compute helpers — retained for symmetry; reserved for future inline scan
2702// path. Currently the engine fallback handles all non-index queries.
2703// ---------------------------------------------------------------------------
2704
2705#[allow(dead_code)]
2706#[derive(Clone, Copy)]
2707enum CmpOp {
2708    Eq,
2709    Neq,
2710    Gt,
2711    Gte,
2712    Lt,
2713    Lte,
2714    Like,
2715    ILike,
2716}
2717
2718#[allow(dead_code)]
2719fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2720    let arr = col
2721        .as_any()
2722        .downcast_ref::<StringArray>()
2723        .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2724    let s = Scalar::new(StringArray::from(vec![val]));
2725    Ok(eq(arr, &s)?)
2726}
2727
2728#[allow(dead_code)]
2729fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2730    macro_rules! num_cmp {
2731        ($arr_type:ty, $cast:ty) => {{
2732            let n = val
2733                .as_f64()
2734                .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2735                as $cast;
2736            let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2737            let s = Scalar::new(<$arr_type>::from(vec![n]));
2738            Ok(match op {
2739                CmpOp::Eq => eq(arr, &s)?,
2740                CmpOp::Neq => neq(arr, &s)?,
2741                CmpOp::Gt => gt(arr, &s)?,
2742                CmpOp::Gte => gt_eq(arr, &s)?,
2743                CmpOp::Lt => lt(arr, &s)?,
2744                CmpOp::Lte => lt_eq(arr, &s)?,
2745                CmpOp::Like | CmpOp::ILike => {
2746                    return Err(AppError::InvalidValue(
2747                        "LIKE requires a string column".into(),
2748                    ));
2749                }
2750            })
2751        }};
2752    }
2753    match col.data_type() {
2754        DataType::Utf8 => {
2755            let s = val
2756                .as_str()
2757                .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2758            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2759            let sc = Scalar::new(StringArray::from(vec![s]));
2760            Ok(match op {
2761                CmpOp::Eq => eq(arr, &sc)?,
2762                CmpOp::Neq => neq(arr, &sc)?,
2763                CmpOp::Gt => gt(arr, &sc)?,
2764                CmpOp::Gte => gt_eq(arr, &sc)?,
2765                CmpOp::Lt => lt(arr, &sc)?,
2766                CmpOp::Lte => lt_eq(arr, &sc)?,
2767                CmpOp::Like => compute::like(arr, &sc)?,
2768                CmpOp::ILike => compute::ilike(arr, &sc)?,
2769            })
2770        }
2771        DataType::Int8 => num_cmp!(Int8Array, i8),
2772        DataType::Int16 => num_cmp!(Int16Array, i16),
2773        DataType::Int32 => num_cmp!(Int32Array, i32),
2774        DataType::Int64 => num_cmp!(Int64Array, i64),
2775        DataType::Float32 => num_cmp!(Float32Array, f32),
2776        DataType::Float64 => num_cmp!(Float64Array, f64),
2777        dt => Err(AppError::InvalidValue(format!(
2778            "unsupported type for comparison: {dt:?}"
2779        ))),
2780    }
2781}
2782
2783// ---------------------------------------------------------------------------
2784// Serialisation
2785// ---------------------------------------------------------------------------
2786
2787pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2788    // Temporal columns are kept native in resident memory (compact). Cast
2789    // them — plus any other dtype `write_value` can't render directly — to
2790    // Utf8 here, on the bounded page batch, so the JSON output is faithful
2791    // without paying the load-time RAM cost.
2792    let batch = cast_for_serialize(batch)?;
2793    let schema = batch.schema();
2794    let n_rows = batch.num_rows();
2795
2796    let keys: Vec<Vec<u8>> = schema
2797        .fields()
2798        .iter()
2799        .map(|f| {
2800            let mut k = Vec::with_capacity(f.name().len() + 3);
2801            k.push(b'"');
2802            k.extend_from_slice(f.name().as_bytes());
2803            k.extend_from_slice(b"\":");
2804            k
2805        })
2806        .collect();
2807
2808    // Resolve every column's concrete Arrow array type exactly ONCE here,
2809    // instead of paying a `data_type()` match + `downcast_ref` for every
2810    // single cell. The inner row loop then dispatches over a small enum,
2811    // which the compiler turns into a cheap jump table.
2812    let encoders: Vec<ColEnc> = batch
2813        .columns()
2814        .iter()
2815        .map(|c| ColEnc::new(c.as_ref()))
2816        .collect();
2817
2818    let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2819    let mut itoa_buf = itoa::Buffer::new();
2820    let mut ryu_buf = ryu::Buffer::new();
2821    buf.push(b'[');
2822
2823    for row in 0..n_rows {
2824        if row > 0 {
2825            buf.push(b',');
2826        }
2827        buf.push(b'{');
2828        for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2829            if i > 0 {
2830                buf.push(b',');
2831            }
2832            buf.extend_from_slice(key);
2833            enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2834        }
2835        buf.push(b'}');
2836    }
2837
2838    buf.push(b']');
2839    Ok(unsafe { String::from_utf8_unchecked(buf) })
2840}
2841
2842/// Per-column JSON encoder. The concrete Arrow array type is resolved once
2843/// (in [`ColEnc::new`]) so the hot row loop dispatches over this small enum
2844/// instead of repeating `data_type()` matching + `downcast_ref` for every
2845/// cell. Each variant borrows the already-downcast typed array.
2846enum ColEnc<'a> {
2847    Utf8(&'a StringArray),
2848    LargeUtf8(&'a LargeStringArray),
2849    Utf8View(&'a StringViewArray),
2850    /// Dictionary-encoded UTF-8 (Int32 keys). Holds the keys array and the
2851    /// pre-downcast string values so neither is re-resolved per row.
2852    DictI32Utf8(
2853        &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2854        &'a StringArray,
2855    ),
2856    Bool(&'a BooleanArray),
2857    I8(&'a Int8Array),
2858    I16(&'a Int16Array),
2859    I32(&'a Int32Array),
2860    I64(&'a Int64Array),
2861    U8(&'a UInt8Array),
2862    U16(&'a UInt16Array),
2863    U32(&'a UInt32Array),
2864    U64(&'a UInt64Array),
2865    Dec128(&'a Decimal128Array),
2866    Dec256(&'a Decimal256Array),
2867    F32(&'a Float32Array),
2868    F64(&'a Float64Array),
2869    /// Anything else: fall back to the generic `write_value` dispatch.
2870    Other(&'a dyn Array),
2871}
2872
2873impl<'a> ColEnc<'a> {
2874    fn new(col: &'a dyn Array) -> ColEnc<'a> {
2875        macro_rules! dc {
2876            ($t:ty) => {
2877                col.as_any().downcast_ref::<$t>().unwrap()
2878            };
2879        }
2880        match col.data_type() {
2881            DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2882            DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2883            DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2884            DataType::Dictionary(key, value)
2885                if matches!(key.as_ref(), DataType::Int32)
2886                    && matches!(value.as_ref(), DataType::Utf8) =>
2887            {
2888                let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2889                let values = dict
2890                    .values()
2891                    .as_any()
2892                    .downcast_ref::<StringArray>()
2893                    .unwrap();
2894                ColEnc::DictI32Utf8(dict, values)
2895            }
2896            DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2897            DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2898            DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2899            DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2900            DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2901            DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2902            DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2903            DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2904            DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2905            DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2906            DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2907            DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2908            DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2909            _ => ColEnc::Other(col),
2910        }
2911    }
2912
2913    #[inline]
2914    fn write(
2915        &self,
2916        buf: &mut Vec<u8>,
2917        row: usize,
2918        itoa_buf: &mut itoa::Buffer,
2919        ryu_buf: &mut ryu::Buffer,
2920    ) {
2921        macro_rules! int {
2922            ($arr:expr) => {{
2923                if $arr.is_null(row) {
2924                    buf.extend_from_slice(b"null");
2925                } else {
2926                    buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
2927                }
2928            }};
2929        }
2930        match self {
2931            ColEnc::Utf8(a) => {
2932                if a.is_null(row) {
2933                    buf.extend_from_slice(b"null");
2934                } else {
2935                    write_str(buf, a.value(row));
2936                }
2937            }
2938            ColEnc::LargeUtf8(a) => {
2939                if a.is_null(row) {
2940                    buf.extend_from_slice(b"null");
2941                } else {
2942                    write_str(buf, a.value(row));
2943                }
2944            }
2945            ColEnc::Utf8View(a) => {
2946                if a.is_null(row) {
2947                    buf.extend_from_slice(b"null");
2948                } else {
2949                    write_str(buf, a.value(row));
2950                }
2951            }
2952            ColEnc::DictI32Utf8(keys, values) => {
2953                if keys.is_null(row) {
2954                    buf.extend_from_slice(b"null");
2955                } else {
2956                    let k = keys.keys().value(row) as usize;
2957                    write_str(buf, values.value(k));
2958                }
2959            }
2960            ColEnc::Bool(a) => {
2961                if a.is_null(row) {
2962                    buf.extend_from_slice(b"null");
2963                } else {
2964                    buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
2965                }
2966            }
2967            ColEnc::I8(a) => int!(a),
2968            ColEnc::I16(a) => int!(a),
2969            ColEnc::I32(a) => int!(a),
2970            ColEnc::I64(a) => int!(a),
2971            ColEnc::U8(a) => int!(a),
2972            ColEnc::U16(a) => int!(a),
2973            ColEnc::U32(a) => int!(a),
2974            ColEnc::U64(a) => int!(a),
2975            ColEnc::Dec128(a) => {
2976                if a.is_null(row) {
2977                    buf.extend_from_slice(b"null");
2978                } else {
2979                    write_str(buf, &a.value_as_string(row));
2980                }
2981            }
2982            ColEnc::Dec256(a) => {
2983                if a.is_null(row) {
2984                    buf.extend_from_slice(b"null");
2985                } else {
2986                    write_str(buf, &a.value_as_string(row));
2987                }
2988            }
2989            ColEnc::F32(a) => {
2990                if a.is_null(row) {
2991                    buf.extend_from_slice(b"null");
2992                } else {
2993                    let v = a.value(row);
2994                    if v.is_finite() {
2995                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2996                    } else {
2997                        buf.extend_from_slice(b"null");
2998                    }
2999                }
3000            }
3001            ColEnc::F64(a) => {
3002                if a.is_null(row) {
3003                    buf.extend_from_slice(b"null");
3004                } else {
3005                    let v = a.value(row);
3006                    if v.is_finite() {
3007                        buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
3008                    } else {
3009                        buf.extend_from_slice(b"null");
3010                    }
3011                }
3012            }
3013            ColEnc::Other(col) => {
3014                if col.is_null(row) {
3015                    buf.extend_from_slice(b"null");
3016                } else {
3017                    write_value(buf, *col, row);
3018                }
3019            }
3020        }
3021    }
3022}
3023
3024#[inline]
3025fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
3026    match col.data_type() {
3027        DataType::Utf8 => write_str(
3028            buf,
3029            col.as_any()
3030                .downcast_ref::<StringArray>()
3031                .unwrap()
3032                .value(row),
3033        ),
3034        DataType::LargeUtf8 => write_str(
3035            buf,
3036            col.as_any()
3037                .downcast_ref::<LargeStringArray>()
3038                .unwrap()
3039                .value(row),
3040        ),
3041        DataType::Utf8View => write_str(
3042            buf,
3043            col.as_any()
3044                .downcast_ref::<StringViewArray>()
3045                .unwrap()
3046                .value(row),
3047        ),
3048        DataType::Dictionary(key, value)
3049            if matches!(key.as_ref(), DataType::Int32)
3050                && matches!(value.as_ref(), DataType::Utf8) =>
3051        {
3052            let dict = col
3053                .as_any()
3054                .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
3055                .unwrap();
3056            let keys = dict.keys();
3057            let values = dict
3058                .values()
3059                .as_any()
3060                .downcast_ref::<StringArray>()
3061                .unwrap();
3062            let k = keys.value(row) as usize;
3063            write_str(buf, values.value(k));
3064        }
3065        DataType::Boolean => {
3066            let v = col
3067                .as_any()
3068                .downcast_ref::<BooleanArray>()
3069                .unwrap()
3070                .value(row);
3071            buf.extend_from_slice(if v { b"true" } else { b"false" });
3072        }
3073        DataType::Int8 => {
3074            let mut b = itoa::Buffer::new();
3075            buf.extend_from_slice(
3076                b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
3077                    .as_bytes(),
3078            );
3079        }
3080        DataType::Int16 => {
3081            let mut b = itoa::Buffer::new();
3082            buf.extend_from_slice(
3083                b.format(
3084                    col.as_any()
3085                        .downcast_ref::<Int16Array>()
3086                        .unwrap()
3087                        .value(row),
3088                )
3089                .as_bytes(),
3090            );
3091        }
3092        DataType::Int32 => {
3093            let mut b = itoa::Buffer::new();
3094            buf.extend_from_slice(
3095                b.format(
3096                    col.as_any()
3097                        .downcast_ref::<Int32Array>()
3098                        .unwrap()
3099                        .value(row),
3100                )
3101                .as_bytes(),
3102            );
3103        }
3104        DataType::Int64 => {
3105            let mut b = itoa::Buffer::new();
3106            buf.extend_from_slice(
3107                b.format(
3108                    col.as_any()
3109                        .downcast_ref::<Int64Array>()
3110                        .unwrap()
3111                        .value(row),
3112                )
3113                .as_bytes(),
3114            );
3115        }
3116        DataType::UInt8 => {
3117            let mut b = itoa::Buffer::new();
3118            buf.extend_from_slice(
3119                b.format(
3120                    col.as_any()
3121                        .downcast_ref::<UInt8Array>()
3122                        .unwrap()
3123                        .value(row),
3124                )
3125                .as_bytes(),
3126            );
3127        }
3128        DataType::UInt16 => {
3129            let mut b = itoa::Buffer::new();
3130            buf.extend_from_slice(
3131                b.format(
3132                    col.as_any()
3133                        .downcast_ref::<UInt16Array>()
3134                        .unwrap()
3135                        .value(row),
3136                )
3137                .as_bytes(),
3138            );
3139        }
3140        DataType::UInt32 => {
3141            let mut b = itoa::Buffer::new();
3142            buf.extend_from_slice(
3143                b.format(
3144                    col.as_any()
3145                        .downcast_ref::<UInt32Array>()
3146                        .unwrap()
3147                        .value(row),
3148                )
3149                .as_bytes(),
3150            );
3151        }
3152        DataType::UInt64 => {
3153            let mut b = itoa::Buffer::new();
3154            buf.extend_from_slice(
3155                b.format(
3156                    col.as_any()
3157                        .downcast_ref::<UInt64Array>()
3158                        .unwrap()
3159                        .value(row),
3160                )
3161                .as_bytes(),
3162            );
3163        }
3164        DataType::Decimal128(_, _) => {
3165            let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
3166            write_str(buf, &arr.value_as_string(row));
3167        }
3168        DataType::Decimal256(_, _) => {
3169            let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
3170            write_str(buf, &arr.value_as_string(row));
3171        }
3172        DataType::Float32 => {
3173            let v = col
3174                .as_any()
3175                .downcast_ref::<Float32Array>()
3176                .unwrap()
3177                .value(row);
3178            if v.is_finite() {
3179                let mut b = ryu::Buffer::new();
3180                buf.extend_from_slice(b.format_finite(v).as_bytes());
3181            } else {
3182                buf.extend_from_slice(b"null");
3183            }
3184        }
3185        DataType::Float64 => {
3186            let v = col
3187                .as_any()
3188                .downcast_ref::<Float64Array>()
3189                .unwrap()
3190                .value(row);
3191            if v.is_finite() {
3192                let mut b = ryu::Buffer::new();
3193                buf.extend_from_slice(b.format_finite(v).as_bytes());
3194            } else {
3195                buf.extend_from_slice(b"null");
3196            }
3197        }
3198        // Any dtype not handled above must have been pre-cast to Utf8 by
3199        // `cast_for_serialize`. Hitting this arm is a bug — surface it as a
3200        // visible JSON string rather than a silent null so it can't be
3201        // mistaken for a real NULL value.
3202        other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
3203    }
3204}
3205
3206#[inline]
3207fn write_str(buf: &mut Vec<u8>, s: &str) {
3208    buf.push(b'"');
3209    for &byte in s.as_bytes() {
3210        match byte {
3211            b'"' => buf.extend_from_slice(b"\\\""),
3212            b'\\' => buf.extend_from_slice(b"\\\\"),
3213            b'\n' => buf.extend_from_slice(b"\\n"),
3214            b'\r' => buf.extend_from_slice(b"\\r"),
3215            b'\t' => buf.extend_from_slice(b"\\t"),
3216            0x00..=0x1f => {
3217                buf.extend_from_slice(b"\\u00");
3218                const HEX: &[u8] = b"0123456789abcdef";
3219                buf.push(HEX[(byte >> 4) as usize]);
3220                buf.push(HEX[(byte & 0xf) as usize]);
3221            }
3222            b => buf.push(b),
3223        }
3224    }
3225    buf.push(b'"');
3226}
3227
3228// ---------------------------------------------------------------------------
3229// Backend trait impl — wires the store into the generic core handlers.
3230// ---------------------------------------------------------------------------
3231
3232#[async_trait]
3233impl Backend for Store {
3234    fn names(&self) -> Vec<String> {
3235        Store::names(self)
3236    }
3237
3238    fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
3239        let st = self.dataset(name)?;
3240        Ok(DatasetSummary {
3241            name: st.schema.name.clone(),
3242            columns: st.schema.columns.len(),
3243            rows: st.num_rows(),
3244        })
3245    }
3246
3247    fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
3248        let st = self.dataset(name)?;
3249        Ok(Arc::new(st.schema.clone()))
3250    }
3251
3252    fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
3253        let st = self.dataset(name)?;
3254        // Report indexed columns in the dataset's declared schema order
3255        // so the `/schema` response is deterministic.
3256        let mut cols: Vec<String> = st
3257            .schema
3258            .columns
3259            .iter()
3260            .map(|c| c.name.clone())
3261            .filter(|n| st.index.contains_key(n))
3262            .collect();
3263        // Any indexed columns not in `schema.columns` (shouldn't happen,
3264        // but be defensive) get appended sorted.
3265        let mut extras: Vec<String> = st
3266            .index
3267            .keys()
3268            .filter(|n| !cols.iter().any(|c| c == *n))
3269            .cloned()
3270            .collect();
3271        extras.sort();
3272        cols.extend(extras);
3273        Ok(cols)
3274    }
3275
3276    async fn sample(&self, name: &str) -> Result<String, AppError> {
3277        Store::sample(self, name).await
3278    }
3279
3280    async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
3281        Store::query(self, name, req).await
3282    }
3283
3284    async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
3285        Store::query_arrow(self, name, req).await
3286    }
3287
3288    async fn query_arrow_stream(
3289        &self,
3290        name: &str,
3291        req: &QueryRequest,
3292    ) -> Result<ArrowIpcStream, AppError> {
3293        Store::query_arrow_stream(self, name, req).await
3294    }
3295
3296    async fn query_arrow_stream_all(
3297        &self,
3298        name: &str,
3299        req: &QueryRequest,
3300    ) -> Result<ArrowIpcStream, AppError> {
3301        Store::query_arrow_stream_all(self, name, req).await
3302    }
3303
3304    async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
3305        Store::count(self, name, req).await
3306    }
3307
3308    async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
3309        Store::query_sql(self, sql, max_rows).await
3310    }
3311
3312    async fn query_sql_arrow_stream(
3313        &self,
3314        sql: &str,
3315        max_rows: u64,
3316    ) -> Result<ArrowIpcStream, AppError> {
3317        Store::query_sql_arrow_stream(self, sql, max_rows).await
3318    }
3319
3320    async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
3321        Store::parquet(self, name).await
3322    }
3323
3324    async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
3325        Store::reload(self, name).await
3326    }
3327
3328    async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
3329        Store::register(self, cfg).await
3330    }
3331}
3332
3333#[cfg(test)]
3334mod tests {
3335    use super::is_s3_access_denied;
3336
3337    #[test]
3338    fn detects_s3_access_denied_variants() {
3339        // Representative messages from object_store / AWS / deltalake / MinIO.
3340        for msg in [
3341            "Generic S3 error: Error performing get request: response error \"<Error><Code>AccessDenied</Code></Error>\", status: 403",
3342            "Client error with status 403 Forbidden",
3343            "S3 error: Access Denied",
3344            "request failed: 403 Forbidden",
3345        ] {
3346            assert!(is_s3_access_denied(msg), "should flag: {msg}");
3347        }
3348    }
3349
3350    #[test]
3351    fn ignores_unrelated_errors() {
3352        for msg in [
3353            "Not a Delta table: Generic delta kernel error: No files in log segment",
3354            "object at location data/part.parquet not found",
3355            "failed to infer parquet schema: invalid magic bytes",
3356        ] {
3357            assert!(!is_s3_access_denied(msg), "should not flag: {msg}");
3358        }
3359    }
3360}
3361