Skip to main content

datapress_core/
config.rs

1//! Runtime configuration loaded from `datasets.toml`.
2//!
3//! Each instance binds to a list of datasets. A dataset's `[dataset.source]`
4//! block selects the format (`parquet` or `delta`) and the location (a
5//! local path or an `s3://bucket/key` URL). When the location is on S3,
6//! an optional `[dataset.s3]` block carries non-secret connection details
7//! (region, endpoint, addressing style, …).
8//!
9//! Credentials are resolved at runtime via [`DatasetConfig::resolved_creds`]
10//! in this precedence order:
11//!
12//! 1. Per-dataset env vars `${PREFIX}_AWS_ACCESS_KEY_ID`,
13//!    `${PREFIX}_AWS_SECRET_ACCESS_KEY`, `${PREFIX}_AWS_SESSION_TOKEN`
14//!    where `${PREFIX}` is the dataset name uppercased with non-alphanumeric
15//!    characters replaced by `_` (e.g. `accidents` → `ACCIDENTS`,
16//!    `sales.eu-1` → `SALES_EU_1`).
17//! 2. Inline `access_key_id` / `secret_access_key` / `session_token` in the
18//!    `[dataset.s3]` block.
19//! 3. Plain `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` /
20//!    `AWS_SESSION_TOKEN`.
21//! 4. None — fall back to the engine's own provider chain
22//!    (`~/.aws/credentials`, IMDS, …).
23
24use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::{Deserialize, Serialize};
29
30use crate::errors::AppError;
31
32/// Absolute path of the `datasets.toml` this process was loaded from, set
33/// once by [`AppConfig::load`]. `None` when the config was constructed
34/// in-process (e.g. the Python bindings) rather than read from a file — in
35/// that case the explorer's "append to server config" export is unavailable.
36static SOURCE_CONFIG_PATH: std::sync::OnceLock<PathBuf> = std::sync::OnceLock::new();
37
38/// Path of the config file this process was loaded from, if any.
39pub fn source_config_path() -> Option<&'static std::path::Path> {
40    SOURCE_CONFIG_PATH.get().map(|p| p.as_path())
41}
42
43/// Mount paths the user MUST NOT pick for `[docs].path` or
44/// `[swagger].path` — they would shadow first-party routes (probes,
45/// API scopes, root).
46const RESERVED_MOUNTS: &[&str] = &[
47    "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
48];
49
50// ---------------------------------------------------------------------------
51// Public types
52// ---------------------------------------------------------------------------
53
54#[derive(Debug, Deserialize)]
55pub struct AppConfig {
56    #[serde(default)]
57    pub server: ServerConfig,
58    #[serde(default)]
59    pub docs: DocsConfig,
60    #[serde(default)]
61    pub swagger: SwaggerConfig,
62    #[serde(default)]
63    pub metrics: MetricsConfig,
64    #[serde(default)]
65    pub explorer: ExplorerConfig,
66    #[serde(default)]
67    pub sql: SqlConfig,
68    #[serde(default)]
69    pub datafusion: DataFusionConfig,
70    #[serde(default)]
71    pub auth: AuthConfig,
72    #[serde(rename = "dataset", default)]
73    pub datasets: Vec<DatasetConfig>,
74}
75
76#[derive(Debug, Deserialize)]
77#[serde(default)]
78pub struct ServerConfig {
79    /// Which engine to run. Must match the binary's compile-time feature.
80    pub backend: Backend,
81    /// Listen address. Defaults to loopback (127.0.0.1) — explicitly opt in
82    /// to 0.0.0.0 if you want to expose the port.
83    pub listen: IpAddr,
84    /// TCP port.
85    pub port: u16,
86    /// Number of actix worker threads. `None` (= unset) → one per CPU.
87    pub workers: Option<usize>,
88    /// Optional URL path prefix — useful when sitting behind a reverse
89    /// proxy that rewrites e.g. `/datapress/...` → `/...`. When set, every
90    /// route is mounted under this prefix (so the proxy can pass the URL
91    /// through unchanged). Must start with `/` and not end with `/`; the
92    /// empty string (default) means no prefix.
93    pub prefix: String,
94    /// Negotiate response compression (gzip / brotli / zstd) via the
95    /// `Accept-Encoding` request header. Enabled by default. Disable when
96    /// running behind a proxy that already compresses, or when the extra
97    /// CPU is not worth the bandwidth saving.
98    pub compress: bool,
99    /// Maximum accepted JSON request body size, in bytes. Larger bodies
100    /// are rejected with `413 Payload Too Large` before any handler runs.
101    /// Default `1 MiB`. Most query bodies are well under 10 KiB; this is
102    /// a DoS guard, not a tuning knob.
103    pub max_body_bytes: usize,
104    /// Maximum rows returned by a single `/query` page. Larger
105    /// `page_size` values are clamped before the backend runs.
106    /// Default `100_000`.
107    pub max_page_size: u64,
108    /// When > 0, any dataset whose backing files exceed this many
109    /// megabytes is forced into `lazy` mode at startup (streamed from
110    /// disk instead of materialised into RAM), even if `lazy` was not set
111    /// on the dataset. `0` (default) disables the size check. Local
112    /// sources are sized with a filesystem stat; on the DataFusion backend
113    /// S3 sources are sized by listing the object store under their prefix
114    /// (the DuckDB backend only sizes local sources — S3 datasets there
115    /// must opt in with an explicit `lazy = true`). Delta tables are
116    /// measured by summing their parquet data files.
117    pub force_lazy_above_mb: u64,
118    /// Per-request handler timeout, in milliseconds. If a handler hasn't
119    /// produced a response within this budget the request is aborted with
120    /// `504 Gateway Timeout`. Default `30_000` (30 s). Set `0` to disable.
121    pub request_timeout_ms: u64,
122    /// Grace period for in-flight requests after the server has received
123    /// `SIGTERM` / `SIGINT`, in seconds. The listening socket is closed
124    /// immediately; existing connections then have up to this many
125    /// seconds to finish before workers are force-stopped. Default `30`.
126    pub shutdown_timeout_secs: u64,
127    /// Optional DuckDB Quack remote SQL server. Only used by the DuckDB
128    /// backend; ignored by DataFusion.
129    pub quack: QuackConfig,
130}
131
132impl Default for ServerConfig {
133    fn default() -> Self {
134        Self {
135            backend: Backend::default(),
136            listen: IpAddr::from([127, 0, 0, 1]),
137            port: 8080,
138            workers: None,
139            prefix: String::new(),
140            compress: true,
141            max_body_bytes: 1024 * 1024,
142            max_page_size: 100_000,
143            force_lazy_above_mb: 0,
144            request_timeout_ms: 30_000,
145            shutdown_timeout_secs: 30,
146            quack: QuackConfig::default(),
147        }
148    }
149}
150
151/// Experimental DuckDB Quack remote protocol server.
152///
153/// Quack exposes the DuckDB SQL surface of the in-process database. Keep it
154/// disabled unless you intentionally want DuckDB clients to attach/query this
155/// process directly.
156#[derive(Debug, Clone, Deserialize)]
157#[serde(default)]
158pub struct QuackConfig {
159    /// Install/load the Quack extension and start `quack_serve` after
160    /// datasets are registered.
161    pub enabled: bool,
162    /// Quack URI to listen on. `quack:localhost` uses DuckDB's default
163    /// port 9494.
164    pub uri: String,
165    /// Optional explicit authentication token. If omitted, Quack generates
166    /// one at startup and DataPress logs it once.
167    pub token: Option<String>,
168    /// Allow binding a non-local hostname such as `quack:0.0.0.0:9494`.
169    /// For external exposure, put a TLS-terminating reverse proxy in front.
170    pub allow_other_hostname: bool,
171    /// Install a read-only authorization macro for remote queries. Enabled
172    /// by default to match DataPress' read-oriented HTTP API.
173    pub read_only: bool,
174}
175
176impl Default for QuackConfig {
177    fn default() -> Self {
178        Self {
179            enabled: false,
180            uri: "quack:localhost".into(),
181            token: None,
182            allow_other_hostname: false,
183            read_only: true,
184        }
185    }
186}
187
188impl QuackConfig {
189    /// Validate the enabled Quack configuration against DuckDB's current
190    /// safety rules. The extension treats only the literal `localhost` as
191    /// local unless `allow_other_hostname` is set.
192    pub fn validate_enabled(&self) -> Result<(), AppError> {
193        if self.uri.trim().is_empty() {
194            return Err(AppError::Internal(
195                "server.quack.uri must not be empty when server.quack.enabled = true".into(),
196            ));
197        }
198        if !self.uri.starts_with("quack:") {
199            return Err(AppError::Internal(format!(
200                "server.quack.uri must start with 'quack:' (got '{}')",
201                self.uri
202            )));
203        }
204        if !self.allow_other_hostname {
205            let host = self.hostname().unwrap_or_default();
206            if host != "localhost" {
207                return Err(AppError::Internal(format!(
208                    "server.quack.uri host must be 'localhost' unless \
209                     server.quack.allow_other_hostname = true (got '{}')",
210                    self.uri
211                )));
212            }
213        }
214        if let Some(token) = self.token.as_deref()
215            && token.len() < 4
216        {
217            return Err(AppError::Internal(
218                "server.quack.token must be at least 4 characters".into(),
219            ));
220        }
221        Ok(())
222    }
223
224    fn hostname(&self) -> Option<&str> {
225        let rest = self.uri.strip_prefix("quack:")?;
226        let rest = rest.strip_prefix("//").unwrap_or(rest);
227        let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
228        (!host.is_empty()).then_some(host)
229    }
230}
231
232#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
233#[serde(rename_all = "lowercase")]
234pub enum Backend {
235    #[default]
236    Datafusion,
237    Duckdb,
238}
239
240/// Embedded MkDocs documentation site (`[docs]` block).
241///
242/// Enabled by default — when the binary was built with the `docs`
243/// cargo feature, the site is served at [`DocsConfig::path`] out of
244/// the box. Set `enabled = false` in `datasets.toml` to suppress it
245/// (e.g. in prod). When the binary was built without the feature,
246/// `enabled = true` is harmless: the server logs a warning at startup
247/// and skips the mount. The mount path must be a non-trivial sub-path;
248/// reserved API and probe roots are rejected at startup.
249#[derive(Debug, Clone, Deserialize)]
250#[serde(default, deny_unknown_fields)]
251pub struct DocsConfig {
252    pub enabled: bool,
253    pub path: String,
254}
255
256impl Default for DocsConfig {
257    fn default() -> Self {
258        Self {
259            enabled: true,
260            path: "/mkdocs".into(),
261        }
262    }
263}
264
265/// Swagger UI + embedded OpenAPI spec (`[swagger]` block).
266///
267/// Enabled by default — when the binary was built with the `swagger`
268/// cargo feature, an interactive Swagger UI is served at
269/// [`SwaggerConfig::path`] (default `/docs`) and the raw OpenAPI JSON
270/// at `<path>/openapi.json`. Set `enabled = false` in `datasets.toml`
271/// to suppress it (e.g. in prod). When the binary was built without
272/// the feature, `enabled = true` is harmless: the server logs a
273/// warning at startup and skips the mount.
274///
275/// To let users sign in to the UI itself (Authorization Code + PKCE
276/// against any OIDC provider), populate the optional `[swagger.oauth2]`
277/// sub-block. Acquired tokens are attached as `Authorization: Bearer …`
278/// to every "Try it out" request — useful for exercising auth-protected
279/// endpoints from the docs page. This drives the UI only; it does not
280/// turn on server-side token validation.
281#[derive(Debug, Clone, Deserialize)]
282#[serde(default, deny_unknown_fields)]
283pub struct SwaggerConfig {
284    pub enabled: bool,
285    pub path: String,
286    pub oauth2: Option<SwaggerOAuth2Config>,
287}
288
289impl Default for SwaggerConfig {
290    fn default() -> Self {
291        Self {
292            enabled: true,
293            path: "/docs".into(),
294            oauth2: None,
295        }
296    }
297}
298
299/// OIDC single-sign-on for the Swagger UI (`[swagger.oauth2]`).
300///
301/// Configures the UI to drive an Authorization Code + PKCE flow against
302/// the given OIDC issuer. Swagger UI auto-discovers the authorize /
303/// token endpoints from `<issuer>/.well-known/openid-configuration`,
304/// so we don't need to pin them here.
305///
306/// All fields are required when the block is present — there is no
307/// sensible default for `issuer` or `client_id`.
308#[derive(Debug, Clone, Deserialize)]
309#[serde(deny_unknown_fields)]
310pub struct SwaggerOAuth2Config {
311    /// OIDC issuer URL, e.g.
312    /// `https://login.microsoftonline.com/<tenant>/v2.0` or
313    /// `https://accounts.google.com`. Must not end in `/`.
314    pub issuer: String,
315    /// Public OAuth2 client identifier registered with the IdP. The
316    /// client must be a SPA / public client (no secret) with
317    /// `https://<your-host>{swagger.path}/oauth2-redirect.html` listed
318    /// as an allowed redirect URI.
319    pub client_id: String,
320    /// Scopes to request by default. Will be pre-checked in the Swagger
321    /// UI authorize dialog; users can edit them before signing in.
322    /// `openid` is always added if missing.
323    #[serde(default)]
324    pub scopes: Vec<String>,
325    /// Use PKCE for the authorization code flow. Defaults to `true`;
326    /// disable only if your IdP doesn't support PKCE for public clients.
327    #[serde(default = "default_true")]
328    pub pkce: bool,
329}
330
331/// Prometheus metrics endpoint (`[metrics]` block).
332///
333/// Disabled by default. When `enabled = true` (and the binary was built
334/// with the `metrics` cargo feature), the server installs a middleware
335/// that records per-request HTTP counters and latency histograms, and
336/// exposes them in the Prometheus text exposition format at
337/// [`MetricsConfig::path`] (default `/metrics`).
338///
339/// The endpoint is mounted at a fixed, *unprefixed* path — like the
340/// health probes — so a scrape config doesn't need to know about any
341/// reverse-proxy `server.prefix`. It is **not** behind the `[auth]`
342/// layer: Prometheus scrapers rarely carry bearer tokens, and the
343/// endpoint exposes only aggregate request metrics (no row data). Keep
344/// it on a network the scraper can reach but the public cannot, e.g. by
345/// binding `server.listen` to a private interface.
346///
347/// When the binary was built without the `metrics` feature,
348/// `enabled = true` is harmless: the server logs a warning at startup
349/// and skips the endpoint.
350#[derive(Debug, Clone, Deserialize)]
351#[serde(default, deny_unknown_fields)]
352pub struct MetricsConfig {
353    pub enabled: bool,
354    pub path: String,
355}
356
357impl Default for MetricsConfig {
358    fn default() -> Self {
359        Self {
360            enabled: false,
361            path: "/metrics".into(),
362        }
363    }
364}
365
366/// Embedded dataset explorer UI (`[explorer]` block).
367///
368/// A server-rendered web app (Actix + Askama templates + htmx +
369/// Bootstrap) served at [`ExplorerConfig::path`] (default `/explore`).
370/// It offers a *discovery* view — per-dataset stats, schema, index and
371/// source configuration — and an in-browser *DuckDB* console (DuckDB-WASM)
372/// that queries each dataset's Parquet export directly.
373///
374/// Enabled by default. When the binary was built without the `explorer`
375/// cargo feature, `enabled = true` is harmless: the server logs a warning
376/// at startup and skips the mount. Set `enabled = false` to suppress it at
377/// runtime even when the feature is compiled in.
378#[derive(Debug, Clone, Deserialize)]
379#[serde(default, deny_unknown_fields)]
380pub struct ExplorerConfig {
381    pub enabled: bool,
382    pub path: String,
383}
384
385impl Default for ExplorerConfig {
386    fn default() -> Self {
387        Self {
388            enabled: true,
389            path: "/explore".into(),
390        }
391    }
392}
393
394/// Raw-SQL query endpoint (`[sql]` block).
395///
396/// Exposes `POST /api/v1/sql`, which accepts an arbitrary read-only
397/// `SELECT` in the request body and runs it against the engine. **Off by
398/// default** — raw SQL is a larger attack surface than the structured
399/// `/query` endpoint, so it must be opted into explicitly.
400///
401/// Phase 1 is scoped to a *single* dataset per query: the statement may
402/// reference at most one registered dataset (and no others / no files),
403/// enforced by a parse-time table allowlist. Cross-dataset joins are a
404/// future extension.
405///
406/// Safety rails applied to every accepted statement:
407/// - exactly one statement, and it must be a read-only `SELECT` / `WITH`,
408/// - every referenced table must be a registered dataset (no file
409///   functions, no `ATTACH`/`COPY`/`PRAGMA`/DDL/DML),
410/// - the result is hard-capped at [`SqlConfig::max_rows`] rows.
411#[derive(Debug, Clone, Deserialize)]
412#[serde(default, deny_unknown_fields)]
413pub struct SqlConfig {
414    /// Enable the `POST /api/v1/sql` endpoint. Default `false`.
415    pub enabled: bool,
416    /// Hard cap on the number of rows a single SQL query may return.
417    /// The query result is wrapped in an outer `LIMIT` so this bound is
418    /// enforced regardless of the user's own `LIMIT`. Default `100_000`.
419    pub max_rows: u64,
420}
421
422impl Default for SqlConfig {
423    fn default() -> Self {
424        Self {
425            enabled: false,
426            max_rows: 100_000,
427        }
428    }
429}
430
431/// DataFusion backend performance tuning (`[datafusion]` block).
432///
433/// Every knob is **off / stock by default**, so the backend behaves exactly
434/// like DataFusion out of the box unless you opt in. These mainly help lazy
435/// (`lazy = true`) parquet datasets, especially on object storage. Ignored by
436/// the DuckDB backend.
437#[derive(Debug, Clone, Deserialize)]
438#[serde(default, deny_unknown_fields)]
439pub struct DataFusionConfig {
440    /// Push row-level filters down into the parquet decoder so rows that fail
441    /// a predicate are never materialised (in addition to the row-group /
442    /// page-index pruning that always happens). DataFusion default is `false`
443    /// because for some workloads the extra per-row evaluation is not worth
444    /// it; turn it on for selective filters over large row groups.
445    pub pushdown_filters: bool,
446    /// Let the parquet scan reorder pushed-down predicates by estimated
447    /// selectivity. Only has an effect together with `pushdown_filters`.
448    /// DataFusion default is `false`.
449    pub reorder_filters: bool,
450    /// Cache object-store file listings on the shared runtime so repeated
451    /// lazy queries reuse `LIST` results instead of re-listing the source
452    /// prefix every time — the dominant per-query cost on S3. Default `false`.
453    pub list_files_cache: bool,
454    /// Memory budget for the file-listing cache, in MiB. Only used when
455    /// `list_files_cache = true`. Default `64`.
456    pub list_files_cache_mb: usize,
457    /// How long a cached listing stays valid, in seconds. Bounds how long it
458    /// takes for newly written files to become visible without an explicit
459    /// reload. `0` means no expiry (infinite). Default `60`.
460    pub list_files_cache_ttl_secs: u64,
461}
462
463impl Default for DataFusionConfig {
464    fn default() -> Self {
465        Self {
466            pushdown_filters: false,
467            reorder_filters: false,
468            list_files_cache: false,
469            list_files_cache_mb: 64,
470            list_files_cache_ttl_secs: 60,
471        }
472    }
473}
474
475/// OIDC bearer-token enforcement for the HTTP API (`[auth]` block).
476///
477/// Disabled by default. When `enabled = true`, the server validates
478/// every request's `Authorization: Bearer …` JWT against the JWKS
479/// discovered from the issuer's OIDC metadata
480/// (`<issuer>/.well-known/openid-configuration` → `jwks_uri`), then
481/// enforces the configured scope requirements per route.
482///
483/// Only compiled in when the binary was built with the `auth` cargo
484/// feature. Without the feature, `enabled = true` is rejected at
485/// startup so a misconfigured production deployment can't silently
486/// fall back to "no auth".
487///
488/// The Swagger UI's SSO support (`[swagger.oauth2]`) is *independent*
489/// of this block — `[swagger.oauth2]` only drives the UI's login
490/// dialog; `[auth]` is what enforces tokens on the API.
491#[derive(Debug, Clone, Deserialize)]
492#[serde(default, deny_unknown_fields)]
493pub struct AuthConfig {
494    /// Master switch. `false` (default) skips all auth processing.
495    pub enabled: bool,
496    /// OIDC issuer URL — must match the `iss` claim of every accepted
497    /// token. Required when `enabled = true`.
498    pub issuer: String,
499    /// Expected `aud` claim. When empty, audience validation is
500    /// skipped (not recommended in production).
501    pub audience: String,
502    /// Scopes a caller must hold to read datasets (GET endpoints +
503    /// POST `…/query` and `…/count`). Empty list means "no scope check,
504    /// just a valid token is enough".
505    pub read_scopes: Vec<String>,
506    /// Scopes required for admin/mutation endpoints (POST `…/reload`).
507    /// Empty list means "no scope check, just a valid token is enough".
508    pub reload_scopes: Vec<String>,
509    /// Allow unauthenticated GETs through. Useful for public datasets
510    /// and demo deployments. Defaults to `false`.
511    pub anonymous_read: bool,
512    /// Continue serving even if the JWKS fetch fails at startup.
513    /// When `true` (default), the server starts in a degraded mode that
514    /// rejects every auth'd request with 503 until JWKS becomes
515    /// reachable. When `false`, startup fails outright.
516    pub start_degraded: bool,
517    /// Allowed signing algorithms. Pinned to RS256 by default; never
518    /// include `HS*` or `none` here unless you really know what you're
519    /// doing.
520    pub algorithms: Vec<String>,
521    /// Clock-skew leeway for `exp`/`nbf` checks, in seconds.
522    pub leeway_secs: u64,
523    /// How often (in seconds) the background refresher re-fetches the
524    /// JWKS. On a `kid` cache miss the JWKS is also refreshed
525    /// out-of-band.
526    pub jwks_refresh_secs: u64,
527    /// Optional JSON-pointer into the JWT claims that extracts a
528    /// tenant identifier — attached to the principal and logged on
529    /// every request. Example: `"/tid"` (Azure AD), `"/org_id"`.
530    /// When empty, no tenant is extracted.
531    pub tenant_claim: String,
532    /// If non-empty, requests whose extracted tenant ID is not in this
533    /// list are rejected with 403. Has no effect when `tenant_claim`
534    /// is empty.
535    pub allowed_tenants: Vec<String>,
536    /// If `true`, `POST …/reload` accepts *either* a valid token with
537    /// `reload_scopes` *or* the legacy `X-Admin-Token` header. Defaults
538    /// to `true` for one-release backwards compatibility — flip to
539    /// `false` once your automation has migrated to OIDC.
540    pub admin_token_fallback: bool,
541}
542
543impl Default for AuthConfig {
544    fn default() -> Self {
545        Self {
546            enabled: false,
547            issuer: String::new(),
548            audience: String::new(),
549            read_scopes: Vec::new(),
550            reload_scopes: Vec::new(),
551            anonymous_read: false,
552            start_degraded: true,
553            algorithms: vec!["RS256".into()],
554            leeway_secs: 60,
555            jwks_refresh_secs: 3600,
556            tenant_claim: String::new(),
557            allowed_tenants: Vec::new(),
558            admin_token_fallback: true,
559        }
560    }
561}
562
563impl Backend {
564    pub fn as_str(self) -> &'static str {
565        match self {
566            Backend::Datafusion => "datafusion",
567            Backend::Duckdb => "duckdb",
568        }
569    }
570}
571
572#[derive(Debug, Clone, Deserialize, Serialize)]
573pub struct DatasetConfig {
574    pub name: String,
575    pub source: SourceConfig,
576    #[serde(default)]
577    pub s3: Option<S3Config>,
578    #[serde(default)]
579    pub index: IndexConfig,
580    /// Optional column projection applied at load time. When non-empty,
581    /// only the listed columns are read from the parquet/delta source —
582    /// every other column is skipped entirely (no decode, no allocation,
583    /// no resident memory). Empty (default) = read all columns. Names are
584    /// matched case-insensitively against the source schema.
585    #[serde(default)]
586    pub columns: Vec<String>,
587    /// When `true` (default), Utf8 columns that are dictionary-encoded in
588    /// the source parquet are read as Arrow `Dictionary(Int32, Utf8)`
589    /// instead of being expanded to plain Utf8. Massively cheaper in RAM
590    /// for low-cardinality columns. Set to `false` to bypass the override
591    /// — useful as a workaround if you observe null-handling oddities on
592    /// a particular parquet file.
593    #[serde(default = "default_true")]
594    pub dict_encode: bool,
595    /// When `true`, the backend should keep the dataset on disk and stream
596    /// it at query time instead of materialising it into RAM at startup.
597    /// Trades the in-memory hot paths (raw Arrow slice, equality index)
598    /// for bounded memory use on large / multi-file sources. Honoured by
599    /// the DataFusion backend (local + S3 parquet) and by the DuckDB
600    /// backend, which registers the dataset as a view over the source scan
601    /// (local + S3 parquet, and delta) rather than materialising a table.
602    #[serde(default)]
603    pub lazy: bool,
604    /// Column-level access control for query **predicates** — which columns
605    /// a caller may filter on (structured `predicates` / `count`, and any
606    /// reference on the raw-SQL endpoint). Mutually-exclusive `include`
607    /// (allowlist) / `exclude` (denylist). Empty (default) = no restriction.
608    #[serde(default)]
609    pub predicate_filter: ColumnFilter,
610    /// Column-level access control for **projection** — which columns a
611    /// caller may see or return (the `columns` projection, `group_by`,
612    /// aggregations, `order_by`, the `/schema` response and row sample, and
613    /// any reference on the raw-SQL endpoint). Columns denied here are
614    /// hidden as if they did not exist. Mutually-exclusive `include`
615    /// (allowlist) / `exclude` (denylist). Empty (default) = no restriction.
616    #[serde(default)]
617    pub projection_filter: ColumnFilter,
618}
619
620fn default_true() -> bool {
621    true
622}
623
624/// A mutually-exclusive column allow/deny list.
625///
626/// Set `include` to turn it into an **allowlist** — only the listed columns
627/// pass. Set `exclude` to turn it into a **denylist** — every column except
628/// the listed ones passes. Setting *both* is a configuration error (caught
629/// by [`ColumnFilter::validate`]). Leaving both empty (the default) imposes
630/// no restriction at all. Names are matched case-insensitively against the
631/// dataset's canonical column names.
632#[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq)]
633#[serde(default, deny_unknown_fields)]
634pub struct ColumnFilter {
635    #[serde(default)]
636    pub include: Vec<String>,
637    #[serde(default)]
638    pub exclude: Vec<String>,
639}
640
641impl ColumnFilter {
642    /// Whether this filter restricts anything (either list is non-empty).
643    pub fn is_active(&self) -> bool {
644        !self.include.is_empty() || !self.exclude.is_empty()
645    }
646
647    /// Whether `col` passes the filter. Case-insensitive. An empty filter
648    /// (neither list set) admits every column.
649    pub fn allows(&self, col: &str) -> bool {
650        let lc = col.to_lowercase();
651        if !self.include.is_empty() {
652            return self.include.iter().any(|c| c.to_lowercase() == lc);
653        }
654        if !self.exclude.is_empty() {
655            return !self.exclude.iter().any(|c| c.to_lowercase() == lc);
656        }
657        true
658    }
659
660    /// Reject a filter that sets both `include` and `exclude`. `ctx`
661    /// identifies the filter in the error message (e.g. `"predicate_filter"`).
662    pub fn validate(&self, dataset: &str, ctx: &str) -> Result<(), AppError> {
663        if !self.include.is_empty() && !self.exclude.is_empty() {
664            return Err(AppError::InvalidValue(format!(
665                "dataset '{dataset}': {ctx} may set 'include' or 'exclude', not both"
666            )));
667        }
668        Ok(())
669    }
670
671    /// The names listed by whichever side is active, for cross-checking
672    /// against the real schema at registration time (typos in a denylist
673    /// would otherwise silently expose a column).
674    pub fn listed(&self) -> &[String] {
675        if !self.include.is_empty() {
676            &self.include
677        } else {
678            &self.exclude
679        }
680    }
681}
682
683#[derive(Debug, Clone, Deserialize, Serialize)]
684pub struct SourceConfig {
685    pub kind: SourceKind,
686    /// Either a local filesystem path or an `s3://bucket/key` URL.
687    pub location: String,
688}
689
690#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
691#[serde(rename_all = "lowercase")]
692pub enum SourceKind {
693    #[default]
694    Parquet,
695    Delta,
696}
697
698impl SourceKind {
699    pub fn as_str(self) -> &'static str {
700        match self {
701            SourceKind::Parquet => "parquet",
702            SourceKind::Delta => "delta",
703        }
704    }
705}
706
707/// Non-secret S3 connection settings. Credentials are pulled from env / the
708/// AWS credential chain — see [`DatasetConfig::resolved_creds`].
709#[derive(Debug, Clone, Deserialize, Serialize)]
710#[serde(default)]
711pub struct S3Config {
712    pub region: Option<String>,
713    /// Custom endpoint (MinIO, R2, Wasabi, LocalStack, …). Omit for AWS.
714    pub endpoint: Option<String>,
715    /// `virtual` (default — `bucket.host`) or `path` (`host/bucket/`).
716    /// MinIO and most non-AWS providers require `path`.
717    pub addressing_style: AddressingStyle,
718    /// Allow plain-HTTP endpoints. Required for local MinIO over `http://…`.
719    pub allow_http: bool,
720    /// Inline credentials. Strongly discouraged in production — prefer env
721    /// vars (see module docs).
722    pub access_key_id: Option<String>,
723    pub secret_access_key: Option<String>,
724    pub session_token: Option<String>,
725    /// Hive partition-column handling for parquet sources. Defaults to
726    /// `auto` (detect from the path). See [`Partitioning`].
727    pub partitioning: Partitioning,
728    /// Whether the bucket is folded into a custom `endpoint` host for
729    /// virtual-hosted-style requests. Defaults to `auto`. See
730    /// [`BucketInHost`].
731    pub endpoint_bucket_in_host: BucketInHost,
732}
733
734impl Default for S3Config {
735    fn default() -> Self {
736        Self {
737            region: None,
738            endpoint: None,
739            addressing_style: AddressingStyle::Virtual,
740            allow_http: false,
741            access_key_id: None,
742            secret_access_key: None,
743            session_token: None,
744            partitioning: Partitioning::Auto,
745            endpoint_bucket_in_host: BucketInHost::Auto,
746        }
747    }
748}
749
750impl S3Config {
751    /// Resolve the endpoint URL to hand to the object store, optionally
752    /// folding `bucket` into the host for virtual-hosted-style requests so a
753    /// plain `endpoint` works the same way it does on DuckDB.
754    ///
755    /// Returns `None` when no custom endpoint is configured (AWS default).
756    /// The bucket is only prepended when it isn't already the leading host
757    /// label, so re-running this (or a config that already embeds the
758    /// bucket) never produces `bucket.bucket.host`.
759    pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
760        let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
761
762        let fold = match self.endpoint_bucket_in_host {
763            BucketInHost::False => false,
764            BucketInHost::True => true,
765            BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
766        };
767        if !fold {
768            return Some(ep.to_string());
769        }
770
771        let (scheme, host_and_path) = match ep.split_once("://") {
772            Some((s, rest)) => (Some(s), rest),
773            None => (None, ep),
774        };
775        // Split host from any trailing path so we prefix the host label only.
776        let (host, path) = match host_and_path.split_once('/') {
777            Some((h, p)) => (h, Some(p)),
778            None => (host_and_path, None),
779        };
780        // Guard against double-prefixing.
781        if host == bucket || host.starts_with(&format!("{bucket}.")) {
782            return Some(ep.to_string());
783        }
784        let new_host = format!("{bucket}.{host}");
785        let rebuilt = match (scheme, path) {
786            (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
787            (Some(s), None) => format!("{s}://{new_host}"),
788            (None, Some(p)) => format!("{new_host}/{p}"),
789            (None, None) => new_host,
790        };
791        Some(rebuilt)
792    }
793}
794
795#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
796#[serde(rename_all = "lowercase")]
797pub enum AddressingStyle {
798    #[default]
799    Virtual,
800    Path,
801}
802
803impl AddressingStyle {
804    pub fn as_str(self) -> &'static str {
805        match self {
806            AddressingStyle::Virtual => "virtual",
807            AddressingStyle::Path => "path",
808        }
809    }
810}
811
812/// How hive-style partition columns (`key=value/` path segments) are handled
813/// for an S3 parquet source. Local parquet always auto-detects; this option
814/// brings S3 in line and lets you force or disable the behaviour.
815#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
816#[serde(rename_all = "lowercase")]
817pub enum Partitioning {
818    /// Detect `key=value` segments from the location glob or by listing the
819    /// prefix. No partition columns are added when none are found.
820    #[default]
821    Auto,
822    /// Force hive partitioning. Partition keys are taken from the location
823    /// glob, or discovered by listing the prefix.
824    Hive,
825    /// Treat the source as a flat set of parquet files — never add partition
826    /// columns even if the path looks hive-partitioned.
827    None,
828}
829
830impl Partitioning {
831    pub fn as_str(self) -> &'static str {
832        match self {
833            Partitioning::Auto => "auto",
834            Partitioning::Hive => "hive",
835            Partitioning::None => "none",
836        }
837    }
838}
839
840/// Whether the bucket name is folded into the endpoint hostname for
841/// virtual-hosted-style requests against a custom endpoint. This aligns the
842/// DataFusion object-store path with DuckDB, which builds the virtual host
843/// itself — so the same plain `endpoint` works on both backends.
844#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
845#[serde(rename_all = "lowercase")]
846pub enum BucketInHost {
847    /// Fold the bucket into the host when `addressing_style = "virtual"` and
848    /// a custom `endpoint` is set (guarded against double-prefixing).
849    #[default]
850    Auto,
851    /// Always fold the bucket into the endpoint host.
852    True,
853    /// Never rewrite the endpoint — pass it through verbatim.
854    False,
855}
856
857impl BucketInHost {
858    pub fn as_str(self) -> &'static str {
859        match self {
860            BucketInHost::Auto => "auto",
861            BucketInHost::True => "true",
862            BucketInHost::False => "false",
863        }
864    }
865}
866
867#[derive(Debug, Clone, Deserialize, Serialize)]
868#[serde(default)]
869pub struct IndexConfig {
870    pub mode: IndexMode,
871    pub columns: Vec<String>,
872    pub max_cardinality: usize,
873}
874
875impl Default for IndexConfig {
876    fn default() -> Self {
877        Self {
878            mode: IndexMode::Auto,
879            columns: Vec::new(),
880            max_cardinality: 100_000,
881        }
882    }
883}
884
885#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
886#[serde(rename_all = "lowercase")]
887pub enum IndexMode {
888    #[default]
889    Auto,
890    None,
891    List,
892}
893
894/// Resolved S3 credentials. `None` fields mean "let the engine's default
895/// provider chain figure it out".
896#[derive(Debug, Clone, Default)]
897pub struct ResolvedCreds {
898    pub access_key_id: Option<String>,
899    pub secret_access_key: Option<String>,
900    pub session_token: Option<String>,
901}
902
903impl ResolvedCreds {
904    pub fn has_keypair(&self) -> bool {
905        self.access_key_id.is_some() && self.secret_access_key.is_some()
906    }
907}
908
909// ---------------------------------------------------------------------------
910// Loading + validation
911// ---------------------------------------------------------------------------
912
913impl AppConfig {
914    /// Read and validate a TOML config file.
915    pub fn load(path: &str) -> Result<Self, AppError> {
916        let raw = std::fs::read_to_string(path)
917            .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
918        let mut cfg: AppConfig =
919            toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
920        cfg.normalize();
921        cfg.validate()?;
922        // Remember where we loaded from so the explorer can optionally
923        // append newly-registered datasets back to this file. Ignore the
924        // error if it was already set (only the first load wins).
925        let _ = SOURCE_CONFIG_PATH.set(PathBuf::from(path));
926        Ok(cfg)
927    }
928
929    /// Canonicalise fields that are compared case-insensitively at runtime.
930    ///
931    /// Token scopes are lowercased when parsed out of a JWT (see `auth.rs`),
932    /// so the configured `read_scopes` / `reload_scopes` are lowercased here
933    /// once at load time. Without this an operator who writes
934    /// `"Datasets:Read"` would silently 403 every caller, since the token
935    /// side would have become `datasets:read`.
936    fn normalize(&mut self) {
937        for s in self
938            .auth
939            .read_scopes
940            .iter_mut()
941            .chain(self.auth.reload_scopes.iter_mut())
942        {
943            *s = s.to_ascii_lowercase();
944        }
945    }
946
947    fn validate(&self) -> Result<(), AppError> {
948        // Server prefix: empty, or must start with '/' and not end with '/'.
949        let p = &self.server.prefix;
950        if !p.is_empty() {
951            if !p.starts_with('/') {
952                return Err(AppError::Internal(format!(
953                    "server.prefix must start with '/' (got '{p}')"
954                )));
955            }
956            if p.ends_with('/') {
957                return Err(AppError::Internal(format!(
958                    "server.prefix must not end with '/' (got '{p}')"
959                )));
960            }
961        }
962
963        if self.datasets.is_empty() {
964            return Err(AppError::Internal(
965                "datasets.toml has no [[dataset]] entries".into(),
966            ));
967        }
968
969        if self.server.quack.enabled {
970            self.server.quack.validate_enabled()?;
971        }
972
973        // Validate the docs mount path even when the section is disabled,
974        // so an inactive config typo can't go unnoticed.
975        {
976            let dp = &self.docs.path;
977            if !dp.starts_with('/') {
978                return Err(AppError::Internal(format!(
979                    "docs.path must start with '/' (got '{dp}')"
980                )));
981            }
982            if dp.len() > 1 && dp.ends_with('/') {
983                return Err(AppError::Internal(format!(
984                    "docs.path must not end with '/' (got '{dp}')"
985                )));
986            }
987            if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
988                return Err(AppError::Internal(format!(
989                    "docs.path '{dp}' collides with a reserved route"
990                )));
991            }
992        }
993
994        // Same for the swagger UI mount.
995        {
996            let sp = &self.swagger.path;
997            if !sp.starts_with('/') {
998                return Err(AppError::Internal(format!(
999                    "swagger.path must start with '/' (got '{sp}')"
1000                )));
1001            }
1002            if sp.len() > 1 && sp.ends_with('/') {
1003                return Err(AppError::Internal(format!(
1004                    "swagger.path must not end with '/' (got '{sp}')"
1005                )));
1006            }
1007            if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
1008                return Err(AppError::Internal(format!(
1009                    "swagger.path '{sp}' collides with a reserved route"
1010                )));
1011            }
1012            if sp == &self.docs.path {
1013                return Err(AppError::Internal(format!(
1014                    "swagger.path and docs.path must differ (both '{sp}')"
1015                )));
1016            }
1017            if let Some(o) = &self.swagger.oauth2 {
1018                if o.issuer.trim().is_empty() {
1019                    return Err(AppError::Internal(
1020                        "swagger.oauth2.issuer must not be empty".into(),
1021                    ));
1022                }
1023                if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
1024                    return Err(AppError::Internal(format!(
1025                        "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
1026                        o.issuer
1027                    )));
1028                }
1029                if o.client_id.trim().is_empty() {
1030                    return Err(AppError::Internal(
1031                        "swagger.oauth2.client_id must not be empty".into(),
1032                    ));
1033                }
1034            }
1035        }
1036
1037        // Metrics endpoint mount path. Validated even when disabled so an
1038        // inactive config typo can't go unnoticed. `/metrics` is itself a
1039        // reserved mount (so docs/swagger can't shadow it), so we check the
1040        // remaining reserved routes — and the docs/swagger paths — for
1041        // collisions rather than the whole list.
1042        {
1043            let mp = &self.metrics.path;
1044            if !mp.starts_with('/') {
1045                return Err(AppError::Internal(format!(
1046                    "metrics.path must start with '/' (got '{mp}')"
1047                )));
1048            }
1049            if mp.len() > 1 && mp.ends_with('/') {
1050                return Err(AppError::Internal(format!(
1051                    "metrics.path must not end with '/' (got '{mp}')"
1052                )));
1053            }
1054            if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
1055                return Err(AppError::Internal(format!(
1056                    "metrics.path '{mp}' collides with a reserved route"
1057                )));
1058            }
1059            if mp == &self.docs.path {
1060                return Err(AppError::Internal(format!(
1061                    "metrics.path and docs.path must differ (both '{mp}')"
1062                )));
1063            }
1064            if mp == &self.swagger.path {
1065                return Err(AppError::Internal(format!(
1066                    "metrics.path and swagger.path must differ (both '{mp}')"
1067                )));
1068            }
1069        }
1070
1071        // Explorer UI mount path. Validated even when disabled so an
1072        // inactive config typo can't go unnoticed.
1073        {
1074            let ep = &self.explorer.path;
1075            if !ep.starts_with('/') {
1076                return Err(AppError::Internal(format!(
1077                    "explorer.path must start with '/' (got '{ep}')"
1078                )));
1079            }
1080            if ep.len() > 1 && ep.ends_with('/') {
1081                return Err(AppError::Internal(format!(
1082                    "explorer.path must not end with '/' (got '{ep}')"
1083                )));
1084            }
1085            if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
1086                return Err(AppError::Internal(format!(
1087                    "explorer.path '{ep}' collides with a reserved route"
1088                )));
1089            }
1090            if ep == &self.docs.path {
1091                return Err(AppError::Internal(format!(
1092                    "explorer.path and docs.path must differ (both '{ep}')"
1093                )));
1094            }
1095            if ep == &self.swagger.path {
1096                return Err(AppError::Internal(format!(
1097                    "explorer.path and swagger.path must differ (both '{ep}')"
1098                )));
1099            }
1100            if ep == &self.metrics.path {
1101                return Err(AppError::Internal(format!(
1102                    "explorer.path and metrics.path must differ (both '{ep}')"
1103                )));
1104            }
1105        }
1106
1107        // Auth block — only meaningful when `enabled = true`. The cargo
1108        // feature gate is enforced separately in `server::serve` so a
1109        // binary built without `--features auth` and a config with
1110        // `auth.enabled = true` aborts with a clear error.
1111        if self.auth.enabled {
1112            let a = &self.auth;
1113            if a.issuer.trim().is_empty() {
1114                return Err(AppError::Internal(
1115                    "auth.issuer must not be empty when auth.enabled = true".into(),
1116                ));
1117            }
1118            if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
1119                return Err(AppError::Internal(format!(
1120                    "auth.issuer must be an absolute http(s) URL (got '{}')",
1121                    a.issuer
1122                )));
1123            }
1124            for alg in &a.algorithms {
1125                match alg.as_str() {
1126                    "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
1127                    | "PS512" => {}
1128                    other => {
1129                        return Err(AppError::Internal(format!(
1130                            "auth.algorithms[{other}] is not allowed; pick one of \
1131                         RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
1132                        )));
1133                    }
1134                }
1135            }
1136            if a.algorithms.is_empty() {
1137                return Err(AppError::Internal(
1138                    "auth.algorithms must not be empty".into(),
1139                ));
1140            }
1141            if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
1142                return Err(AppError::Internal(format!(
1143                    "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
1144                    a.tenant_claim
1145                )));
1146            }
1147            if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
1148                return Err(AppError::Internal(
1149                    "auth.allowed_tenants is set but auth.tenant_claim is empty — \
1150                     can't enforce a tenant allow-list without a claim to extract from"
1151                        .into(),
1152                ));
1153            }
1154        }
1155
1156        let mut seen = HashSet::new();
1157        for d in &self.datasets {
1158            if !seen.insert(d.name.as_str()) {
1159                return Err(AppError::Internal(format!(
1160                    "duplicate dataset name: {}",
1161                    d.name
1162                )));
1163            }
1164            if d.name.is_empty() {
1165                return Err(AppError::Internal("dataset name must not be empty".into()));
1166            }
1167            // URL-safe: alphanum + _ - .
1168            if !d
1169                .name
1170                .chars()
1171                .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1172            {
1173                return Err(AppError::Internal(format!(
1174                    "dataset name '{}' must be alphanumeric (plus _ - .)",
1175                    d.name
1176                )));
1177            }
1178
1179            if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
1180                return Err(AppError::Internal(format!(
1181                    "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1182                    d.name
1183                )));
1184            }
1185
1186            // Location-specific checks.
1187            if d.source.is_s3() {
1188                d.source.s3_bucket()?;
1189                if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1190                    && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1191                    && std::env::var("AWS_REGION").is_err()
1192                    && std::env::var("AWS_DEFAULT_REGION").is_err()
1193                {
1194                    log::warn!(
1195                        "dataset '{}': S3 source without explicit region — \
1196                         relying on AWS_REGION env var",
1197                        d.name
1198                    );
1199                }
1200            } else {
1201                // Local path. For parquet we can fully resolve to a file
1202                // list up front; for delta we only check that the directory
1203                // exists (delta has its own layout — _delta_log/, …).
1204                match d.source.kind {
1205                    SourceKind::Parquet => {
1206                        d.resolve_local_parquet_files()?;
1207                    }
1208                    SourceKind::Delta => {
1209                        let p = Path::new(&d.source.location);
1210                        if !p.exists() {
1211                            return Err(AppError::Internal(format!(
1212                                "dataset '{}': delta location does not exist: {}",
1213                                d.name, d.source.location
1214                            )));
1215                        }
1216                    }
1217                }
1218            }
1219        }
1220        Ok(())
1221    }
1222}
1223
1224impl SourceConfig {
1225    pub fn is_s3(&self) -> bool {
1226        self.location.starts_with("s3://")
1227    }
1228
1229    /// True when the location already contains a glob metacharacter
1230    /// (`*`, `?`, or `[`).
1231    pub fn has_glob(&self) -> bool {
1232        self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1233    }
1234
1235    /// Location to hand to a backend that needs an explicit parquet glob
1236    /// (DuckDB). When the location is a plain S3 prefix with no glob, append
1237    /// a recursive `**/*.parquet` so DuckDB lists the prefix the same way
1238    /// DataFusion's object-store listing does. Globbed or non-S3 locations
1239    /// are returned unchanged.
1240    pub fn s3_recursive_parquet_glob(&self) -> String {
1241        if !self.is_s3() || self.has_glob() {
1242            return self.location.clone();
1243        }
1244        let trimmed = self.location.trim_end_matches('/');
1245        format!("{trimmed}/**/*.parquet")
1246    }
1247
1248    /// Returns `(bucket, key_prefix_or_empty)` for an `s3://…` location.
1249    pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1250        let rest = self
1251            .location
1252            .strip_prefix("s3://")
1253            .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1254        let (bucket, key) = match rest.split_once('/') {
1255            Some((b, k)) => (b, k),
1256            None => (rest, ""),
1257        };
1258        if bucket.is_empty() {
1259            return Err(AppError::Internal(format!(
1260                "s3 URL missing bucket: {}",
1261                self.location
1262            )));
1263        }
1264        Ok((bucket, key))
1265    }
1266}
1267
1268impl DatasetConfig {
1269    /// Validate a dataset config supplied at runtime (e.g. registered live
1270    /// through the explorer) with the same rules the startup loader applies
1271    /// to each `[[dataset]]`: non-empty, URL-safe name and a coherent index
1272    /// configuration. Source reachability is left to the backend, which
1273    /// surfaces a clear error when it tries to open the source.
1274    pub fn validate_for_register(&self) -> Result<(), AppError> {
1275        if self.name.is_empty() {
1276            return Err(AppError::InvalidValue(
1277                "dataset name must not be empty".into(),
1278            ));
1279        }
1280        if !self
1281            .name
1282            .chars()
1283            .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1284        {
1285            return Err(AppError::InvalidValue(format!(
1286                "dataset name '{}' must be alphanumeric (plus _ - .)",
1287                self.name
1288            )));
1289        }
1290        if self.index.mode == IndexMode::List && self.index.columns.is_empty() {
1291            return Err(AppError::InvalidValue(format!(
1292                "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1293                self.name
1294            )));
1295        }
1296        self.predicate_filter.validate(&self.name, "predicate_filter")?;
1297        self.projection_filter
1298            .validate(&self.name, "projection_filter")?;
1299        if self.source.is_s3() {
1300            self.source.s3_bucket()?;
1301        }
1302        Ok(())
1303    }
1304
1305    /// Render this dataset as a standalone TOML `[[dataset]]` block suitable
1306    /// for pasting into (or appending to) a `datasets.toml`. Fields are
1307    /// emitted scalars-first so the output is valid TOML, and default-valued
1308    /// sections (`[dataset.s3]`, a default `[dataset.index]`, an empty
1309    /// projection) are omitted to keep the snippet minimal.
1310    pub fn to_toml_block(&self) -> Result<String, AppError> {
1311        #[derive(Serialize)]
1312        struct Block {
1313            name: String,
1314            #[serde(skip_serializing_if = "Vec::is_empty")]
1315            columns: Vec<String>,
1316            dict_encode: bool,
1317            lazy: bool,
1318            source: SourceConfig,
1319            #[serde(skip_serializing_if = "Option::is_none")]
1320            s3: Option<S3Config>,
1321            #[serde(skip_serializing_if = "Option::is_none")]
1322            index: Option<IndexConfig>,
1323            #[serde(skip_serializing_if = "Option::is_none")]
1324            predicate_filter: Option<ColumnFilter>,
1325            #[serde(skip_serializing_if = "Option::is_none")]
1326            projection_filter: Option<ColumnFilter>,
1327        }
1328        #[derive(Serialize)]
1329        struct Doc {
1330            dataset: [Block; 1],
1331        }
1332        let doc = Doc {
1333            dataset: [Block {
1334                name: self.name.clone(),
1335                columns: self.columns.clone(),
1336                dict_encode: self.dict_encode,
1337                lazy: self.lazy,
1338                source: self.source.clone(),
1339                s3: self.s3.clone(),
1340                index: if self.index.is_default() {
1341                    None
1342                } else {
1343                    Some(self.index.clone())
1344                },
1345                predicate_filter: self
1346                    .predicate_filter
1347                    .is_active()
1348                    .then(|| self.predicate_filter.clone()),
1349                projection_filter: self
1350                    .projection_filter
1351                    .is_active()
1352                    .then(|| self.projection_filter.clone()),
1353            }],
1354        };
1355        toml::to_string_pretty(&doc)
1356            .map_err(|e| AppError::Internal(format!("failed to render dataset TOML: {e}")))
1357    }
1358
1359    /// Append this dataset's `[[dataset]]` block to the config file this
1360    /// process was loaded from, so a runtime-registered dataset survives a
1361    /// restart. Returns the path written to.
1362    ///
1363    /// Errors with `AppError::InvalidValue` when the process has no on-disk
1364    /// config ([`source_config_path`] is `None` — e.g. the Python bindings),
1365    /// and with `AppError::Internal` when rendering or the file write fails.
1366    /// Shared by the versioned API (`POST /api/v1/datasets/persist`) and the
1367    /// explorer's persist action.
1368    pub fn persist_to_source_config(&self) -> Result<PathBuf, AppError> {
1369        use std::io::Write;
1370        let path = source_config_path().ok_or_else(|| {
1371            AppError::InvalidValue("server has no on-disk config file to append to".into())
1372        })?;
1373        let block = self.to_toml_block()?;
1374        let mut file = std::fs::OpenOptions::new()
1375            .append(true)
1376            .open(path)
1377            .map_err(|e| {
1378                AppError::Internal(format!("failed to open config {}: {e}", path.display()))
1379            })?;
1380        // Separate the appended block from existing content by a blank line.
1381        write!(file, "\n{block}").map_err(|e| {
1382            AppError::Internal(format!("failed to write config {}: {e}", path.display()))
1383        })?;
1384        Ok(path.to_path_buf())
1385    }
1386}
1387
1388impl IndexConfig {
1389    /// Whether this equals the serde default (used to omit the section from
1390    /// exported TOML when it carries no information).
1391    fn is_default(&self) -> bool {
1392        self.mode == IndexMode::Auto && self.columns.is_empty() && self.max_cardinality == 100_000
1393    }
1394}
1395
1396impl DatasetConfig {
1397    /// Expand `source.location` to a concrete list of local `.parquet`
1398    /// files. Only valid for `kind = parquet` on local paths — S3 and
1399    /// Delta sources are resolved by the backend itself.
1400    ///
1401    /// Accepts three location shapes:
1402    ///   * a single `*.parquet` file
1403    ///   * a directory (lists every `*.parquet` directly inside, non-recursive)
1404    ///   * a glob pattern containing `*`, `?` or `[…]` (e.g.
1405    ///     `data/year=2024/*.parquet`, `data/**/*.parquet`)
1406    pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1407        if self.source.is_s3() {
1408            return Err(AppError::Internal(format!(
1409                "dataset '{}': resolve_local_parquet_files called on s3 source",
1410                self.name
1411            )));
1412        }
1413        let loc = &self.source.location;
1414
1415        // Glob pattern? Expand and require at least one match.
1416        if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1417            let mut files: Vec<PathBuf> = glob::glob(loc)
1418                .map_err(|e| {
1419                    AppError::Internal(format!(
1420                        "dataset '{}': bad glob pattern '{loc}': {e}",
1421                        self.name
1422                    ))
1423                })?
1424                .filter_map(|r| r.ok())
1425                .filter(|p| {
1426                    p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1427                })
1428                .collect();
1429            files.sort();
1430            if files.is_empty() {
1431                return Err(AppError::EmptyDataset(format!(
1432                    "dataset '{}': glob '{loc}' matched no .parquet files",
1433                    self.name
1434                )));
1435            }
1436            return Ok(files);
1437        }
1438
1439        let path = Path::new(loc);
1440        if !path.exists() {
1441            return Err(AppError::Internal(format!(
1442                "dataset '{}': source path does not exist: {loc}",
1443                self.name
1444            )));
1445        }
1446
1447        if path.is_file() {
1448            if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1449                return Err(AppError::Internal(format!(
1450                    "dataset '{}': source must be a .parquet file",
1451                    self.name
1452                )));
1453            }
1454            return Ok(vec![path.to_path_buf()]);
1455        }
1456
1457        let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1458            .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1459            .filter_map(|entry| entry.ok().map(|e| e.path()))
1460            .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1461            .collect();
1462        files.sort();
1463        if files.is_empty() {
1464            return Err(AppError::EmptyDataset(format!(
1465                "dataset '{}': no *.parquet files found in {loc}",
1466                self.name
1467            )));
1468        }
1469        Ok(files)
1470    }
1471
1472    /// Estimate the on-disk byte size of this dataset's local backing
1473    /// files. Returns `None` for S3 sources (sizing would require a
1474    /// network round-trip) or when nothing can be measured.
1475    ///
1476    /// * `parquet` sums the resolved `.parquet` files (single file,
1477    ///   directory, or glob).
1478    /// * `delta` sums every `*.parquet` data file under the table root.
1479    ///   This slightly over-counts when stale files haven't been vacuumed,
1480    ///   which is fine for a coarse force-lazy threshold.
1481    pub fn estimate_local_bytes(&self) -> Option<u64> {
1482        if self.source.is_s3() {
1483            return None;
1484        }
1485        match self.source.kind {
1486            SourceKind::Parquet => {
1487                let files = self.resolve_local_parquet_files().ok()?;
1488                Some(
1489                    files
1490                        .iter()
1491                        .filter_map(|p| std::fs::metadata(p).ok())
1492                        .map(|m| m.len())
1493                        .sum(),
1494                )
1495            }
1496            SourceKind::Delta => {
1497                let root = self.source.location.trim_end_matches('/');
1498                let pattern = format!("{root}/**/*.parquet");
1499                let paths = glob::glob(&pattern).ok()?;
1500                Some(
1501                    paths
1502                        .filter_map(Result::ok)
1503                        .filter_map(|p| std::fs::metadata(&p).ok())
1504                        .filter(|m| m.is_file())
1505                        .map(|m| m.len())
1506                        .sum(),
1507                )
1508            }
1509        }
1510    }
1511
1512    /// Decide whether this dataset should be forced into lazy mode given
1513    /// the server's `force_lazy_above_mb` threshold. Returns `Some(bytes)`
1514    /// (the measured size) when it should be forced, so the caller can log
1515    /// it. Returns `None` when the dataset is already lazy, the threshold
1516    /// is disabled, the source is S3, or the measured size is unknown or at
1517    /// or below the threshold.
1518    pub fn force_lazy_bytes(&self, server: &ServerConfig) -> Option<u64> {
1519        if self.lazy || server.force_lazy_above_mb == 0 {
1520            return None;
1521        }
1522        let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1523        match self.estimate_local_bytes() {
1524            Some(bytes) if bytes > threshold => Some(bytes),
1525            _ => None,
1526        }
1527    }
1528
1529    /// Env-var prefix derived from the dataset name: uppercase with
1530    /// non-alphanumeric chars replaced by `_`. E.g. `sales.eu-1` →
1531    /// `SALES_EU_1`.
1532    pub fn env_prefix(&self) -> String {
1533        self.name
1534            .chars()
1535            .map(|c| {
1536                if c.is_ascii_alphanumeric() {
1537                    c.to_ascii_uppercase()
1538                } else {
1539                    '_'
1540                }
1541            })
1542            .collect()
1543    }
1544
1545    /// Resolve S3 credentials following the precedence chain documented at
1546    /// the top of this module. Returns an empty struct when nothing was
1547    /// found — the caller should then leave credential resolution to the
1548    /// engine's default provider chain.
1549    pub fn resolved_creds(&self) -> ResolvedCreds {
1550        let prefix = self.env_prefix();
1551        let from_env = |suffix: &str| {
1552            std::env::var(format!("{prefix}_{suffix}"))
1553                .ok()
1554                .filter(|s| !s.is_empty())
1555        };
1556        let inline = self.s3.as_ref();
1557        let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1558
1559        ResolvedCreds {
1560            access_key_id: from_env("AWS_ACCESS_KEY_ID")
1561                .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1562                .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1563            secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1564                .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1565                .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1566            session_token: from_env("AWS_SESSION_TOKEN")
1567                .or_else(|| inline.and_then(|s| s.session_token.clone()))
1568                .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1569        }
1570    }
1571
1572    /// Resolved S3 region: per-dataset env (`${PREFIX}_AWS_REGION`)
1573    /// → inline → `AWS_REGION` → `AWS_DEFAULT_REGION` → `us-east-1`.
1574    pub fn resolved_region(&self) -> String {
1575        let prefix = self.env_prefix();
1576        std::env::var(format!("{prefix}_AWS_REGION"))
1577            .ok()
1578            .filter(|s| !s.is_empty())
1579            .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1580            .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1581            .or_else(|| {
1582                std::env::var("AWS_DEFAULT_REGION")
1583                    .ok()
1584                    .filter(|s| !s.is_empty())
1585            })
1586            .unwrap_or_else(|| "us-east-1".to_string())
1587    }
1588}
1589
1590#[cfg(test)]
1591mod tests {
1592    use super::*;
1593
1594    #[test]
1595    fn server_defaults() {
1596        let s = ServerConfig::default();
1597        assert_eq!(s.backend, Backend::Datafusion);
1598        assert_eq!(s.port, 8080);
1599        assert!(s.compress);
1600        assert_eq!(s.max_body_bytes, 1024 * 1024);
1601        assert_eq!(s.max_page_size, 100_000);
1602        assert_eq!(s.force_lazy_above_mb, 0);
1603        assert_eq!(s.request_timeout_ms, 30_000);
1604        assert!(!s.quack.enabled);
1605        assert_eq!(s.quack.uri, "quack:localhost");
1606        assert!(s.quack.token.is_none());
1607        assert!(!s.quack.allow_other_hostname);
1608        assert!(s.quack.read_only);
1609        assert_eq!(s.prefix, "");
1610        assert!(s.listen.is_loopback());
1611    }
1612
1613    #[test]
1614    fn server_overrides_from_toml() {
1615        let toml = r#"
1616            [server]
1617            backend = "duckdb"
1618            port = 9000
1619            prefix = "/datapress"
1620            compress = false
1621            max_body_bytes = 4096
1622            max_page_size = 50000
1623            force_lazy_above_mb = 256
1624            request_timeout_ms = 0
1625
1626            [server.quack]
1627            enabled = true
1628            uri = "quack:localhost:9495"
1629            token = "test-token"
1630            read_only = false
1631            [[dataset]]
1632            name = "x"
1633            source.kind = "parquet"
1634            source.location = "/tmp/missing.parquet"
1635        "#;
1636        let cfg: AppConfig = toml::from_str(toml).unwrap();
1637        assert_eq!(cfg.server.backend, Backend::Duckdb);
1638        assert_eq!(cfg.server.port, 9000);
1639        assert_eq!(cfg.server.prefix, "/datapress");
1640        assert!(!cfg.server.compress);
1641        assert_eq!(cfg.server.max_body_bytes, 4096);
1642        assert_eq!(cfg.server.max_page_size, 50_000);
1643        assert_eq!(cfg.server.force_lazy_above_mb, 256);
1644        assert_eq!(cfg.server.request_timeout_ms, 0);
1645        assert!(cfg.server.quack.enabled);
1646        assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1647        assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1648        assert!(!cfg.server.quack.read_only);
1649        assert_eq!(cfg.datasets.len(), 1);
1650        assert_eq!(cfg.datasets[0].name, "x");
1651        assert!(cfg.datasets[0].dict_encode); // default
1652    }
1653
1654    #[test]
1655    fn force_lazy_bytes_logic() {
1656        // A unique temp dir with one 2 MiB "parquet" file (contents are
1657        // irrelevant — estimate_local_bytes only stats file lengths).
1658        let dir = std::env::temp_dir().join(format!(
1659            "dp-force-lazy-{}-{}",
1660            std::process::id(),
1661            std::time::SystemTime::now()
1662                .duration_since(std::time::UNIX_EPOCH)
1663                .unwrap()
1664                .as_nanos()
1665        ));
1666        std::fs::create_dir_all(&dir).unwrap();
1667        let two_mib = 2 * 1024 * 1024;
1668        let file = dir.join("data.parquet");
1669        std::fs::write(&file, vec![0u8; two_mib]).unwrap();
1670
1671        let mk = |kind: SourceKind, location: &str, lazy: bool| DatasetConfig {
1672            name: "t".into(),
1673            source: SourceConfig {
1674                kind,
1675                location: location.to_string(),
1676            },
1677            s3: None,
1678            index: IndexConfig::default(),
1679            columns: vec![],
1680            dict_encode: true,
1681            lazy,
1682            predicate_filter: Default::default(),
1683            projection_filter: Default::default(),
1684        };
1685        let server = |mb: u64| ServerConfig {
1686            force_lazy_above_mb: mb,
1687            ..ServerConfig::default()
1688        };
1689
1690        // Sizing: single file, directory walk, and delta dir walk all see 2 MiB.
1691        let file_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), false);
1692        assert_eq!(file_ds.estimate_local_bytes(), Some(two_mib as u64));
1693        let dir_ds = mk(SourceKind::Parquet, dir.to_str().unwrap(), false);
1694        assert_eq!(dir_ds.estimate_local_bytes(), Some(two_mib as u64));
1695        let delta_ds = mk(SourceKind::Delta, dir.to_str().unwrap(), false);
1696        assert_eq!(delta_ds.estimate_local_bytes(), Some(two_mib as u64));
1697
1698        // Threshold disabled → never force.
1699        assert_eq!(file_ds.force_lazy_bytes(&server(0)), None);
1700        // 1 MiB threshold, 2 MiB file → forced (returns measured size).
1701        assert_eq!(file_ds.force_lazy_bytes(&server(1)), Some(two_mib as u64));
1702        // 4 MiB threshold → not forced.
1703        assert_eq!(file_ds.force_lazy_bytes(&server(4)), None);
1704
1705        // Already-lazy datasets are never re-flagged.
1706        let lazy_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), true);
1707        assert_eq!(lazy_ds.force_lazy_bytes(&server(1)), None);
1708
1709        // S3 sources can't be measured → never auto-forced.
1710        let s3_ds = mk(SourceKind::Parquet, "s3://bucket/data.parquet", false);
1711        assert_eq!(s3_ds.estimate_local_bytes(), None);
1712        assert_eq!(s3_ds.force_lazy_bytes(&server(1)), None);
1713
1714        std::fs::remove_dir_all(&dir).ok();
1715    }
1716
1717    #[test]
1718    fn validate_rejects_bad_prefix() {
1719        let bad = ["no-leading-slash", "/trailing/"];
1720        for p in bad {
1721            let cfg = AppConfig {
1722                server: ServerConfig {
1723                    prefix: p.to_string(),
1724                    ..Default::default()
1725                },
1726                docs: DocsConfig::default(),
1727                swagger: SwaggerConfig::default(),
1728                metrics: MetricsConfig::default(),
1729                explorer: ExplorerConfig::default(),
1730                sql: SqlConfig::default(),
1731                datafusion: DataFusionConfig::default(),
1732                auth: AuthConfig::default(),
1733                datasets: vec![],
1734            };
1735            assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1736        }
1737    }
1738
1739    #[test]
1740    fn normalize_lowercases_configured_scopes() {
1741        let mut cfg = AppConfig {
1742            server: ServerConfig::default(),
1743            docs: DocsConfig::default(),
1744            swagger: SwaggerConfig::default(),
1745            metrics: MetricsConfig::default(),
1746            explorer: ExplorerConfig::default(),
1747            sql: SqlConfig::default(),
1748            datafusion: DataFusionConfig::default(),
1749            auth: AuthConfig {
1750                read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1751                reload_scopes: vec!["Datasets:Reload".into()],
1752                ..Default::default()
1753            },
1754            datasets: vec![],
1755        };
1756        cfg.normalize();
1757        assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1758        assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1759    }
1760
1761    #[test]
1762    fn validate_rejects_no_datasets() {
1763        let cfg = AppConfig {
1764            server: ServerConfig::default(),
1765            docs: DocsConfig::default(),
1766            swagger: SwaggerConfig::default(),
1767            metrics: MetricsConfig::default(),
1768            explorer: ExplorerConfig::default(),
1769            sql: SqlConfig::default(),
1770            datafusion: DataFusionConfig::default(),
1771            auth: AuthConfig::default(),
1772            datasets: vec![],
1773        };
1774        let err = cfg.validate().unwrap_err();
1775        assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1776    }
1777
1778    #[cfg(feature = "auth")]
1779    #[test]
1780    fn validate_accepts_auth_issuer_with_trailing_slash() {
1781        use std::io::Write;
1782
1783        let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1784        let _ = std::fs::remove_dir_all(&dir);
1785        std::fs::create_dir_all(&dir).unwrap();
1786        let file = dir.join("a.parquet");
1787        std::fs::File::create(&file)
1788            .unwrap()
1789            .write_all(b"x")
1790            .unwrap();
1791
1792        let cfg = AppConfig {
1793            server: ServerConfig::default(),
1794            docs: DocsConfig::default(),
1795            swagger: SwaggerConfig::default(),
1796            metrics: MetricsConfig::default(),
1797            explorer: ExplorerConfig::default(),
1798            sql: SqlConfig::default(),
1799            datafusion: DataFusionConfig::default(),
1800            auth: AuthConfig {
1801                enabled: true,
1802                issuer: "https://tenant.example.com/".into(),
1803                ..Default::default()
1804            },
1805            datasets: vec![DatasetConfig {
1806                name: "x".into(),
1807                source: SourceConfig {
1808                    kind: SourceKind::Parquet,
1809                    location: file.to_string_lossy().into_owned(),
1810                },
1811                s3: None,
1812                index: IndexConfig::default(),
1813                columns: vec![],
1814                dict_encode: true,
1815                lazy: false,
1816                predicate_filter: Default::default(),
1817                projection_filter: Default::default(),
1818            }],
1819        };
1820
1821        assert!(cfg.validate().is_ok());
1822        let _ = std::fs::remove_dir_all(&dir);
1823    }
1824
1825    #[test]
1826    fn validate_rejects_quack_non_local_host_without_override() {
1827        let cfg = AppConfig {
1828            server: ServerConfig {
1829                quack: QuackConfig {
1830                    enabled: true,
1831                    uri: "quack:127.0.0.1".into(),
1832                    token: Some("test-token".into()),
1833                    ..Default::default()
1834                },
1835                ..Default::default()
1836            },
1837            docs: DocsConfig::default(),
1838            swagger: SwaggerConfig::default(),
1839            metrics: MetricsConfig::default(),
1840            explorer: ExplorerConfig::default(),
1841            sql: SqlConfig::default(),
1842            datafusion: DataFusionConfig::default(),
1843            auth: AuthConfig::default(),
1844            datasets: vec![DatasetConfig {
1845                name: "x".into(),
1846                source: SourceConfig {
1847                    kind: SourceKind::Parquet,
1848                    location: "/tmp/missing.parquet".into(),
1849                },
1850                s3: None,
1851                index: IndexConfig::default(),
1852                columns: vec![],
1853                dict_encode: true,
1854                lazy: false,
1855                predicate_filter: Default::default(),
1856                projection_filter: Default::default(),
1857            }],
1858        };
1859        let err = cfg.validate().unwrap_err();
1860        assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1861    }
1862
1863    #[test]
1864    fn validate_rejects_bad_dataset_name() {
1865        let cfg: AppConfig = toml::from_str(
1866            r#"
1867            [[dataset]]
1868            name = "bad name!"
1869            source.kind = "parquet"
1870            source.location = "/tmp/whatever"
1871        "#,
1872        )
1873        .unwrap();
1874        let err = cfg.validate().unwrap_err();
1875        assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1876    }
1877
1878    #[test]
1879    fn validate_rejects_duplicate_names() {
1880        use std::io::Write;
1881        let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1882        let _ = std::fs::remove_dir_all(&dir);
1883        std::fs::create_dir_all(&dir).unwrap();
1884        let f = dir.join("a.parquet");
1885        std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1886        let path = f.to_str().unwrap();
1887
1888        let cfg: AppConfig = toml::from_str(&format!(
1889            r#"
1890            [[dataset]]
1891            name = "a"
1892            source.kind = "parquet"
1893            source.location = "{path}"
1894            [[dataset]]
1895            name = "a"
1896            source.kind = "parquet"
1897            source.location = "{path}"
1898        "#
1899        ))
1900        .unwrap();
1901        let err = cfg.validate().expect_err("expected error");
1902        assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1903
1904        let _ = std::fs::remove_dir_all(&dir);
1905    }
1906
1907    #[test]
1908    fn s3_bucket_parsing() {
1909        let mk = |loc: &str| SourceConfig {
1910            kind: SourceKind::Parquet,
1911            location: loc.into(),
1912        };
1913        let s1 = mk("s3://bucket/path/key");
1914        assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1915        let s2 = mk("s3://only-bucket");
1916        assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1917        assert!(mk("s3:///nokey").s3_bucket().is_err());
1918        assert!(mk("/local/path").s3_bucket().is_err());
1919    }
1920
1921    #[test]
1922    fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1923        let mk = |loc: &str| SourceConfig {
1924            kind: SourceKind::Parquet,
1925            location: loc.into(),
1926        };
1927        // Plain prefix -> recursive parquet glob (trailing slash trimmed).
1928        assert_eq!(
1929            mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1930            "s3://bucket/logs/**/*.parquet"
1931        );
1932        assert_eq!(
1933            mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1934            "s3://bucket/logs/**/*.parquet"
1935        );
1936        // Already globbed -> unchanged.
1937        assert_eq!(
1938            mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1939            "s3://bucket/logs/*.parquet"
1940        );
1941        // Non-S3 -> unchanged.
1942        assert_eq!(
1943            mk("/local/logs").s3_recursive_parquet_glob(),
1944            "/local/logs"
1945        );
1946    }
1947
1948    #[test]
1949    fn effective_endpoint_folds_bucket_per_mode() {
1950        let virt = S3Config {
1951            endpoint: Some("https://s3.example.com".into()),
1952            addressing_style: AddressingStyle::Virtual,
1953            ..Default::default()
1954        };
1955        // Auto + virtual -> bucket folded into host.
1956        assert_eq!(
1957            virt.effective_endpoint("mybucket").as_deref(),
1958            Some("https://mybucket.s3.example.com")
1959        );
1960        // Idempotent: already-prefixed host is left alone.
1961        let prefixed = S3Config {
1962            endpoint: Some("https://mybucket.s3.example.com".into()),
1963            ..virt.clone()
1964        };
1965        assert_eq!(
1966            prefixed.effective_endpoint("mybucket").as_deref(),
1967            Some("https://mybucket.s3.example.com")
1968        );
1969        // Path style (auto) -> host untouched.
1970        let path = S3Config {
1971            addressing_style: AddressingStyle::Path,
1972            ..virt.clone()
1973        };
1974        assert_eq!(
1975            path.effective_endpoint("mybucket").as_deref(),
1976            Some("https://s3.example.com")
1977        );
1978        // Explicit overrides win over addressing style.
1979        let forced_off = S3Config {
1980            endpoint_bucket_in_host: BucketInHost::False,
1981            ..virt.clone()
1982        };
1983        assert_eq!(
1984            forced_off.effective_endpoint("mybucket").as_deref(),
1985            Some("https://s3.example.com")
1986        );
1987        let forced_on = S3Config {
1988            endpoint_bucket_in_host: BucketInHost::True,
1989            ..path.clone()
1990        };
1991        assert_eq!(
1992            forced_on.effective_endpoint("mybucket").as_deref(),
1993            Some("https://mybucket.s3.example.com")
1994        );
1995        // No endpoint -> None.
1996        assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1997    }
1998
1999    #[test]
2000    fn env_prefix_sanitises_name() {
2001        let mk = |name: &str| DatasetConfig {
2002            name: name.into(),
2003            source: SourceConfig {
2004                kind: SourceKind::Parquet,
2005                location: "x".into(),
2006            },
2007            s3: None,
2008            index: IndexConfig::default(),
2009            columns: vec![],
2010            dict_encode: true,
2011            lazy: false,
2012            predicate_filter: Default::default(),
2013            projection_filter: Default::default(),
2014        };
2015        assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
2016        assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
2017        assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
2018    }
2019
2020    #[test]
2021    fn resolve_local_parquet_single_file_and_dir() {
2022        use std::io::Write;
2023        let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
2024        let _ = std::fs::remove_dir_all(&dir);
2025        std::fs::create_dir_all(&dir).unwrap();
2026        let f = dir.join("a.parquet");
2027        let mut fh = std::fs::File::create(&f).unwrap();
2028        fh.write_all(b"not really parquet").unwrap();
2029
2030        let mk = |loc: &str| DatasetConfig {
2031            name: "ds".into(),
2032            source: SourceConfig {
2033                kind: SourceKind::Parquet,
2034                location: loc.into(),
2035            },
2036            s3: None,
2037            index: IndexConfig::default(),
2038            columns: vec![],
2039            dict_encode: true,
2040            lazy: false,
2041            predicate_filter: Default::default(),
2042            projection_filter: Default::default(),
2043        };
2044
2045        // Direct file.
2046        let files = mk(f.to_str().unwrap())
2047            .resolve_local_parquet_files()
2048            .unwrap();
2049        assert_eq!(files, vec![f.clone()]);
2050
2051        // Directory.
2052        let files = mk(dir.to_str().unwrap())
2053            .resolve_local_parquet_files()
2054            .unwrap();
2055        assert_eq!(files, vec![f.clone()]);
2056
2057        // Missing path.
2058        assert!(
2059            mk("/no/such/place.parquet")
2060                .resolve_local_parquet_files()
2061                .is_err()
2062        );
2063
2064        let _ = std::fs::remove_dir_all(&dir);
2065    }
2066}