Skip to main content

datapress_core/
config.rs

1//! Runtime configuration loaded from `datasets.toml`.
2//!
3//! Each instance binds to a list of datasets. A dataset's `[dataset.source]`
4//! block selects the format (`parquet` or `delta`) and the location (a
5//! local path or an `s3://bucket/key` URL). When the location is on S3,
6//! an optional `[dataset.s3]` block carries non-secret connection details
7//! (region, endpoint, addressing style, …).
8//!
9//! Credentials are resolved at runtime via [`DatasetConfig::resolved_creds`]
10//! in this precedence order:
11//!
12//! 1. Per-dataset env vars `${PREFIX}_AWS_ACCESS_KEY_ID`,
13//!    `${PREFIX}_AWS_SECRET_ACCESS_KEY`, `${PREFIX}_AWS_SESSION_TOKEN`
14//!    where `${PREFIX}` is the dataset name uppercased with non-alphanumeric
15//!    characters replaced by `_` (e.g. `accidents` → `ACCIDENTS`,
16//!    `sales.eu-1` → `SALES_EU_1`).
17//! 2. Inline `access_key_id` / `secret_access_key` / `session_token` in the
18//!    `[dataset.s3]` block.
19//! 3. Plain `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` /
20//!    `AWS_SESSION_TOKEN`.
21//! 4. None — fall back to the engine's own provider chain
22//!    (`~/.aws/credentials`, IMDS, …).
23
24use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::{Deserialize, Serialize};
29
30use crate::errors::AppError;
31
32/// Absolute path of the `datasets.toml` this process was loaded from, set
33/// once by [`AppConfig::load`]. `None` when the config was constructed
34/// in-process (e.g. the Python bindings) rather than read from a file — in
35/// that case the explorer's "append to server config" export is unavailable.
36static SOURCE_CONFIG_PATH: std::sync::OnceLock<PathBuf> = std::sync::OnceLock::new();
37
38/// Path of the config file this process was loaded from, if any.
39pub fn source_config_path() -> Option<&'static std::path::Path> {
40    SOURCE_CONFIG_PATH.get().map(|p| p.as_path())
41}
42
43/// Mount paths the user MUST NOT pick for `[docs].path` or
44/// `[swagger].path` — they would shadow first-party routes (probes,
45/// API scopes, root).
46const RESERVED_MOUNTS: &[&str] = &[
47    "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
48];
49
50// ---------------------------------------------------------------------------
51// Public types
52// ---------------------------------------------------------------------------
53
54#[derive(Debug, Deserialize)]
55pub struct AppConfig {
56    #[serde(default)]
57    pub server: ServerConfig,
58    #[serde(default)]
59    pub docs: DocsConfig,
60    #[serde(default)]
61    pub swagger: SwaggerConfig,
62    #[serde(default)]
63    pub metrics: MetricsConfig,
64    #[serde(default)]
65    pub explorer: ExplorerConfig,
66    #[serde(default)]
67    pub sql: SqlConfig,
68    #[serde(default)]
69    pub datafusion: DataFusionConfig,
70    #[serde(default)]
71    pub auth: AuthConfig,
72    #[serde(rename = "dataset", default)]
73    pub datasets: Vec<DatasetConfig>,
74}
75
76#[derive(Debug, Deserialize)]
77#[serde(default)]
78pub struct ServerConfig {
79    /// Which engine to run. Must match the binary's compile-time feature.
80    pub backend: Backend,
81    /// Listen address. Defaults to loopback (127.0.0.1) — explicitly opt in
82    /// to 0.0.0.0 if you want to expose the port.
83    pub listen: IpAddr,
84    /// TCP port.
85    pub port: u16,
86    /// Number of actix worker threads. `None` (= unset) → one per CPU.
87    pub workers: Option<usize>,
88    /// Optional URL path prefix — useful when sitting behind a reverse
89    /// proxy that rewrites e.g. `/datapress/...` → `/...`. When set, every
90    /// route is mounted under this prefix (so the proxy can pass the URL
91    /// through unchanged). Must start with `/` and not end with `/`; the
92    /// empty string (default) means no prefix.
93    pub prefix: String,
94    /// Negotiate response compression (gzip / brotli / zstd) via the
95    /// `Accept-Encoding` request header. Enabled by default. Disable when
96    /// running behind a proxy that already compresses, or when the extra
97    /// CPU is not worth the bandwidth saving.
98    pub compress: bool,
99    /// Maximum accepted JSON request body size, in bytes. Larger bodies
100    /// are rejected with `413 Payload Too Large` before any handler runs.
101    /// Default `1 MiB`. Most query bodies are well under 10 KiB; this is
102    /// a DoS guard, not a tuning knob.
103    pub max_body_bytes: usize,
104    /// Maximum rows returned by a single `/query` page. Larger
105    /// `page_size` values are clamped before the backend runs.
106    /// Default `100_000`.
107    pub max_page_size: u64,
108    /// When > 0, any dataset whose backing files exceed this many
109    /// megabytes is forced into `lazy` mode at startup (streamed from
110    /// disk instead of materialised into RAM), even if `lazy` was not set
111    /// on the dataset. `0` (default) disables the size check. Local
112    /// sources are sized with a filesystem stat; on the DataFusion backend
113    /// S3 sources are sized by listing the object store under their prefix
114    /// (the DuckDB backend only sizes local sources — S3 datasets there
115    /// must opt in with an explicit `lazy = true`). Delta tables are
116    /// measured by summing their parquet data files.
117    pub force_lazy_above_mb: u64,
118    /// Per-request handler timeout, in milliseconds. If a handler hasn't
119    /// produced a response within this budget the request is aborted with
120    /// `504 Gateway Timeout`. Default `30_000` (30 s). Set `0` to disable.
121    pub request_timeout_ms: u64,
122    /// Grace period for in-flight requests after the server has received
123    /// `SIGTERM` / `SIGINT`, in seconds. The listening socket is closed
124    /// immediately; existing connections then have up to this many
125    /// seconds to finish before workers are force-stopped. Default `30`.
126    pub shutdown_timeout_secs: u64,
127    /// Optional DuckDB Quack remote SQL server. Only used by the DuckDB
128    /// backend; ignored by DataFusion.
129    pub quack: QuackConfig,
130    /// Optional PostgreSQL wire-protocol server. Only used by the DataFusion
131    /// backend (and only when compiled with the `pgwire` feature); ignored by
132    /// DuckDB.
133    pub pgwire: PgwireConfig,
134}
135
136impl Default for ServerConfig {
137    fn default() -> Self {
138        Self {
139            backend: Backend::default(),
140            listen: IpAddr::from([127, 0, 0, 1]),
141            port: 8080,
142            workers: None,
143            prefix: String::new(),
144            compress: true,
145            max_body_bytes: 1024 * 1024,
146            max_page_size: 100_000,
147            force_lazy_above_mb: 0,
148            request_timeout_ms: 30_000,
149            shutdown_timeout_secs: 30,
150            quack: QuackConfig::default(),
151            pgwire: PgwireConfig::default(),
152        }
153    }
154}
155
156/// Experimental DuckDB Quack remote protocol server.
157///
158/// Quack exposes the DuckDB SQL surface of the in-process database. Keep it
159/// disabled unless you intentionally want DuckDB clients to attach/query this
160/// process directly.
161#[derive(Debug, Clone, Deserialize)]
162#[serde(default)]
163pub struct QuackConfig {
164    /// Install/load the Quack extension and start `quack_serve` after
165    /// datasets are registered.
166    pub enabled: bool,
167    /// Quack URI to listen on. `quack:localhost` uses DuckDB's default
168    /// port 9494.
169    pub uri: String,
170    /// Optional explicit authentication token. If omitted, Quack generates
171    /// one at startup and DataPress logs it once.
172    pub token: Option<String>,
173    /// Allow binding a non-local hostname such as `quack:0.0.0.0:9494`.
174    /// For external exposure, put a TLS-terminating reverse proxy in front.
175    pub allow_other_hostname: bool,
176    /// Install a read-only authorization macro for remote queries. Enabled
177    /// by default to match DataPress' read-oriented HTTP API.
178    pub read_only: bool,
179}
180
181impl Default for QuackConfig {
182    fn default() -> Self {
183        Self {
184            enabled: false,
185            uri: "quack:localhost".into(),
186            token: None,
187            allow_other_hostname: false,
188            read_only: true,
189        }
190    }
191}
192
193impl QuackConfig {
194    /// Validate the enabled Quack configuration against DuckDB's current
195    /// safety rules. The extension treats only the literal `localhost` as
196    /// local unless `allow_other_hostname` is set.
197    pub fn validate_enabled(&self) -> Result<(), AppError> {
198        if self.uri.trim().is_empty() {
199            return Err(AppError::Internal(
200                "server.quack.uri must not be empty when server.quack.enabled = true".into(),
201            ));
202        }
203        if !self.uri.starts_with("quack:") {
204            return Err(AppError::Internal(format!(
205                "server.quack.uri must start with 'quack:' (got '{}')",
206                self.uri
207            )));
208        }
209        if !self.allow_other_hostname {
210            let host = self.hostname().unwrap_or_default();
211            if host != "localhost" {
212                return Err(AppError::Internal(format!(
213                    "server.quack.uri host must be 'localhost' unless \
214                     server.quack.allow_other_hostname = true (got '{}')",
215                    self.uri
216                )));
217            }
218        }
219        if let Some(token) = self.token.as_deref()
220            && token.len() < 4
221        {
222            return Err(AppError::Internal(
223                "server.quack.token must be at least 4 characters".into(),
224            ));
225        }
226        Ok(())
227    }
228
229    fn hostname(&self) -> Option<&str> {
230        let rest = self.uri.strip_prefix("quack:")?;
231        let rest = rest.strip_prefix("//").unwrap_or(rest);
232        let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
233        (!host.is_empty()).then_some(host)
234    }
235}
236
237/// Experimental PostgreSQL wire-protocol server (`[server.pgwire]` block).
238///
239/// When enabled on the DataFusion backend, BI tools (Power BI via Npgsql,
240/// `psql`, DBeaver, …) can connect and query the registered datasets as if
241/// this process were PostgreSQL. Off by default and a no-op unless the binary
242/// was built with the `pgwire` feature on `datapress-datafusion`.
243#[derive(Debug, Clone, Deserialize)]
244#[serde(default)]
245pub struct PgwireConfig {
246    /// Start the pgwire listener after datasets are registered.
247    pub enabled: bool,
248    /// Listen address. Defaults to loopback (127.0.0.1). Binding a
249    /// non-loopback address requires a password (and, since only cleartext
250    /// password auth is available, TLS as well).
251    pub listen: IpAddr,
252    /// TCP port. Defaults to the PostgreSQL default 5432.
253    pub port: u16,
254    /// Username clients must present. Defaults to `datapress`.
255    pub username: String,
256    /// Password clients must present. Optional only for a loopback-only
257    /// listener; required for any non-loopback bind.
258    pub password: Option<String>,
259    /// PEM certificate path for TLS. Must be set together with `tls_key`.
260    pub tls_cert: Option<PathBuf>,
261    /// PKCS#8 private-key path for TLS. Must be set together with `tls_cert`.
262    pub tls_key: Option<PathBuf>,
263}
264
265impl Default for PgwireConfig {
266    fn default() -> Self {
267        Self {
268            enabled: false,
269            listen: IpAddr::from([127, 0, 0, 1]),
270            port: 5432,
271            username: "datapress".into(),
272            password: None,
273            tls_cert: None,
274            tls_key: None,
275        }
276    }
277}
278
279impl PgwireConfig {
280    /// Validate the enabled pgwire configuration. Because the only available
281    /// authentication mechanism is cleartext password (SCRAM would need a
282    /// salted verifier the integration library does not expose), the rules
283    /// are deliberately strict about exposing an off-box listener:
284    ///
285    /// * a non-loopback `listen` requires a `password` — an unauthenticated
286    ///   SQL endpoint must never be reachable off the local host;
287    /// * `tls_cert` and `tls_key` must be set together or not at all;
288    /// * a non-loopback `listen` also requires TLS, so the cleartext password
289    ///   never crosses a plaintext TCP connection off the box.
290    pub fn validate_enabled(&self) -> Result<(), AppError> {
291        let is_loopback = self.listen.is_loopback();
292        let tls_configured = match (self.tls_cert.as_ref(), self.tls_key.as_ref()) {
293            (Some(_), Some(_)) => true,
294            (None, None) => false,
295            _ => {
296                return Err(AppError::Internal(
297                    "server.pgwire.tls_cert and server.pgwire.tls_key must be set together \
298                     (both or neither)"
299                        .into(),
300                ));
301            }
302        };
303
304        if !is_loopback && self.password.is_none() {
305            return Err(AppError::Internal(format!(
306                "server.pgwire.password is required when server.pgwire.listen is not a \
307                 loopback address (got '{}')",
308                self.listen
309            )));
310        }
311
312        if !is_loopback && !tls_configured {
313            return Err(AppError::Internal(format!(
314                "server.pgwire requires TLS (server.pgwire.tls_cert + tls_key) when \
315                 server.pgwire.listen is not a loopback address (got '{}'): cleartext \
316                 password auth must not cross a plaintext connection off the host",
317                self.listen
318            )));
319        }
320
321        Ok(())
322    }
323}
324
325#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
326#[serde(rename_all = "lowercase")]
327pub enum Backend {
328    #[default]
329    Datafusion,
330    Duckdb,
331}
332
333/// Embedded MkDocs documentation site (`[docs]` block).
334///
335/// Enabled by default — when the binary was built with the `docs`
336/// cargo feature, the site is served at [`DocsConfig::path`] out of
337/// the box. Set `enabled = false` in `datasets.toml` to suppress it
338/// (e.g. in prod). When the binary was built without the feature,
339/// `enabled = true` is harmless: the server logs a warning at startup
340/// and skips the mount. The mount path must be a non-trivial sub-path;
341/// reserved API and probe roots are rejected at startup.
342#[derive(Debug, Clone, Deserialize)]
343#[serde(default, deny_unknown_fields)]
344pub struct DocsConfig {
345    pub enabled: bool,
346    pub path: String,
347}
348
349impl Default for DocsConfig {
350    fn default() -> Self {
351        Self {
352            enabled: true,
353            path: "/mkdocs".into(),
354        }
355    }
356}
357
358/// Swagger UI + embedded OpenAPI spec (`[swagger]` block).
359///
360/// Enabled by default — when the binary was built with the `swagger`
361/// cargo feature, an interactive Swagger UI is served at
362/// [`SwaggerConfig::path`] (default `/docs`) and the raw OpenAPI JSON
363/// at `<path>/openapi.json`. Set `enabled = false` in `datasets.toml`
364/// to suppress it (e.g. in prod). When the binary was built without
365/// the feature, `enabled = true` is harmless: the server logs a
366/// warning at startup and skips the mount.
367///
368/// To let users sign in to the UI itself (Authorization Code + PKCE
369/// against any OIDC provider), populate the optional `[swagger.oauth2]`
370/// sub-block. Acquired tokens are attached as `Authorization: Bearer …`
371/// to every "Try it out" request — useful for exercising auth-protected
372/// endpoints from the docs page. This drives the UI only; it does not
373/// turn on server-side token validation.
374#[derive(Debug, Clone, Deserialize)]
375#[serde(default, deny_unknown_fields)]
376pub struct SwaggerConfig {
377    pub enabled: bool,
378    pub path: String,
379    pub oauth2: Option<SwaggerOAuth2Config>,
380}
381
382impl Default for SwaggerConfig {
383    fn default() -> Self {
384        Self {
385            enabled: true,
386            path: "/docs".into(),
387            oauth2: None,
388        }
389    }
390}
391
392/// OIDC single-sign-on for the Swagger UI (`[swagger.oauth2]`).
393///
394/// Configures the UI to drive an Authorization Code + PKCE flow against
395/// the given OIDC issuer. Swagger UI auto-discovers the authorize /
396/// token endpoints from `<issuer>/.well-known/openid-configuration`,
397/// so we don't need to pin them here.
398///
399/// All fields are required when the block is present — there is no
400/// sensible default for `issuer` or `client_id`.
401#[derive(Debug, Clone, Deserialize)]
402#[serde(deny_unknown_fields)]
403pub struct SwaggerOAuth2Config {
404    /// OIDC issuer URL, e.g.
405    /// `https://login.microsoftonline.com/<tenant>/v2.0` or
406    /// `https://accounts.google.com`. Must not end in `/`.
407    pub issuer: String,
408    /// Public OAuth2 client identifier registered with the IdP. The
409    /// client must be a SPA / public client (no secret) with
410    /// `https://<your-host>{swagger.path}/oauth2-redirect.html` listed
411    /// as an allowed redirect URI.
412    pub client_id: String,
413    /// Scopes to request by default. Will be pre-checked in the Swagger
414    /// UI authorize dialog; users can edit them before signing in.
415    /// `openid` is always added if missing.
416    #[serde(default)]
417    pub scopes: Vec<String>,
418    /// Use PKCE for the authorization code flow. Defaults to `true`;
419    /// disable only if your IdP doesn't support PKCE for public clients.
420    #[serde(default = "default_true")]
421    pub pkce: bool,
422}
423
424/// Prometheus metrics endpoint (`[metrics]` block).
425///
426/// Disabled by default. When `enabled = true` (and the binary was built
427/// with the `metrics` cargo feature), the server installs a middleware
428/// that records per-request HTTP counters and latency histograms, and
429/// exposes them in the Prometheus text exposition format at
430/// [`MetricsConfig::path`] (default `/metrics`).
431///
432/// The endpoint is mounted at a fixed, *unprefixed* path — like the
433/// health probes — so a scrape config doesn't need to know about any
434/// reverse-proxy `server.prefix`. It is **not** behind the `[auth]`
435/// layer: Prometheus scrapers rarely carry bearer tokens, and the
436/// endpoint exposes only aggregate request metrics (no row data). Keep
437/// it on a network the scraper can reach but the public cannot, e.g. by
438/// binding `server.listen` to a private interface.
439///
440/// When the binary was built without the `metrics` feature,
441/// `enabled = true` is harmless: the server logs a warning at startup
442/// and skips the endpoint.
443#[derive(Debug, Clone, Deserialize)]
444#[serde(default, deny_unknown_fields)]
445pub struct MetricsConfig {
446    pub enabled: bool,
447    pub path: String,
448}
449
450impl Default for MetricsConfig {
451    fn default() -> Self {
452        Self {
453            enabled: false,
454            path: "/metrics".into(),
455        }
456    }
457}
458
459/// Embedded dataset explorer UI (`[explorer]` block).
460///
461/// A server-rendered web app (Actix + Askama templates + htmx +
462/// Bootstrap) served at [`ExplorerConfig::path`] (default `/explore`).
463/// It offers a *discovery* view — per-dataset stats, schema, index and
464/// source configuration — and an in-browser *DuckDB* console (DuckDB-WASM)
465/// that queries each dataset's Parquet export directly.
466///
467/// Enabled by default. When the binary was built without the `explorer`
468/// cargo feature, `enabled = true` is harmless: the server logs a warning
469/// at startup and skips the mount. Set `enabled = false` to suppress it at
470/// runtime even when the feature is compiled in.
471#[derive(Debug, Clone, Deserialize)]
472#[serde(default, deny_unknown_fields)]
473pub struct ExplorerConfig {
474    pub enabled: bool,
475    pub path: String,
476}
477
478impl Default for ExplorerConfig {
479    fn default() -> Self {
480        Self {
481            enabled: true,
482            path: "/explore".into(),
483        }
484    }
485}
486
487/// Raw-SQL query endpoint (`[sql]` block).
488///
489/// Exposes `POST /api/v1/sql`, which accepts an arbitrary read-only
490/// `SELECT` in the request body and runs it against the engine. **Off by
491/// default** — raw SQL is a larger attack surface than the structured
492/// `/query` endpoint, so it must be opted into explicitly.
493///
494/// Phase 1 is scoped to a *single* dataset per query: the statement may
495/// reference at most one registered dataset (and no others / no files),
496/// enforced by a parse-time table allowlist. Cross-dataset joins are a
497/// future extension.
498///
499/// Safety rails applied to every accepted statement:
500/// - exactly one statement, and it must be a read-only `SELECT` / `WITH`,
501/// - every referenced table must be a registered dataset (no file
502///   functions, no `ATTACH`/`COPY`/`PRAGMA`/DDL/DML),
503/// - the result is hard-capped at [`SqlConfig::max_rows`] rows.
504#[derive(Debug, Clone, Deserialize)]
505#[serde(default, deny_unknown_fields)]
506pub struct SqlConfig {
507    /// Enable the `POST /api/v1/sql` endpoint. Default `false`.
508    pub enabled: bool,
509    /// Hard cap on the number of rows a single SQL query may return.
510    /// The query result is wrapped in an outer `LIMIT` so this bound is
511    /// enforced regardless of the user's own `LIMIT`. Default `100_000`.
512    pub max_rows: u64,
513}
514
515impl Default for SqlConfig {
516    fn default() -> Self {
517        Self {
518            enabled: false,
519            max_rows: 100_000,
520        }
521    }
522}
523
524/// DataFusion backend performance tuning (`[datafusion]` block).
525///
526/// Every knob is **off / stock by default**, so the backend behaves exactly
527/// like DataFusion out of the box unless you opt in. These mainly help lazy
528/// (`lazy = true`) parquet datasets, especially on object storage. Ignored by
529/// the DuckDB backend.
530#[derive(Debug, Clone, Deserialize)]
531#[serde(default, deny_unknown_fields)]
532pub struct DataFusionConfig {
533    /// Push row-level filters down into the parquet decoder so rows that fail
534    /// a predicate are never materialised (in addition to the row-group /
535    /// page-index pruning that always happens). DataFusion default is `false`
536    /// because for some workloads the extra per-row evaluation is not worth
537    /// it; turn it on for selective filters over large row groups.
538    pub pushdown_filters: bool,
539    /// Let the parquet scan reorder pushed-down predicates by estimated
540    /// selectivity. Only has an effect together with `pushdown_filters`.
541    /// DataFusion default is `false`.
542    pub reorder_filters: bool,
543    /// Cache object-store file listings on the shared runtime so repeated
544    /// lazy queries reuse `LIST` results instead of re-listing the source
545    /// prefix every time — the dominant per-query cost on S3. Default `false`.
546    pub list_files_cache: bool,
547    /// Memory budget for the file-listing cache, in MiB. Only used when
548    /// `list_files_cache = true`. Default `64`.
549    pub list_files_cache_mb: usize,
550    /// How long a cached listing stays valid, in seconds. Bounds how long it
551    /// takes for newly written files to become visible without an explicit
552    /// reload. `0` means no expiry (infinite). Default `60`.
553    pub list_files_cache_ttl_secs: u64,
554}
555
556impl Default for DataFusionConfig {
557    fn default() -> Self {
558        Self {
559            pushdown_filters: false,
560            reorder_filters: false,
561            list_files_cache: false,
562            list_files_cache_mb: 64,
563            list_files_cache_ttl_secs: 60,
564        }
565    }
566}
567
568/// OIDC bearer-token enforcement for the HTTP API (`[auth]` block).
569///
570/// Disabled by default. When `enabled = true`, the server validates
571/// every request's `Authorization: Bearer …` JWT against the JWKS
572/// discovered from the issuer's OIDC metadata
573/// (`<issuer>/.well-known/openid-configuration` → `jwks_uri`), then
574/// enforces the configured scope requirements per route.
575///
576/// Only compiled in when the binary was built with the `auth` cargo
577/// feature. Without the feature, `enabled = true` is rejected at
578/// startup so a misconfigured production deployment can't silently
579/// fall back to "no auth".
580///
581/// The Swagger UI's SSO support (`[swagger.oauth2]`) is *independent*
582/// of this block — `[swagger.oauth2]` only drives the UI's login
583/// dialog; `[auth]` is what enforces tokens on the API.
584#[derive(Debug, Clone, Deserialize)]
585#[serde(default, deny_unknown_fields)]
586pub struct AuthConfig {
587    /// Master switch. `false` (default) skips all auth processing.
588    pub enabled: bool,
589    /// OIDC issuer URL — must match the `iss` claim of every accepted
590    /// token. Required when `enabled = true`.
591    pub issuer: String,
592    /// Expected `aud` claim. When empty, audience validation is
593    /// skipped (not recommended in production).
594    pub audience: String,
595    /// Scopes a caller must hold to read datasets (GET endpoints +
596    /// POST `…/query` and `…/count`). Empty list means "no scope check,
597    /// just a valid token is enough".
598    pub read_scopes: Vec<String>,
599    /// Scopes required for admin/mutation endpoints (POST `…/reload`).
600    /// Empty list means "no scope check, just a valid token is enough".
601    pub reload_scopes: Vec<String>,
602    /// Allow unauthenticated GETs through. Useful for public datasets
603    /// and demo deployments. Defaults to `false`.
604    pub anonymous_read: bool,
605    /// Continue serving even if the JWKS fetch fails at startup.
606    /// When `true` (default), the server starts in a degraded mode that
607    /// rejects every auth'd request with 503 until JWKS becomes
608    /// reachable. When `false`, startup fails outright.
609    pub start_degraded: bool,
610    /// Allowed signing algorithms. Pinned to RS256 by default; never
611    /// include `HS*` or `none` here unless you really know what you're
612    /// doing.
613    pub algorithms: Vec<String>,
614    /// Clock-skew leeway for `exp`/`nbf` checks, in seconds.
615    pub leeway_secs: u64,
616    /// How often (in seconds) the background refresher re-fetches the
617    /// JWKS. On a `kid` cache miss the JWKS is also refreshed
618    /// out-of-band.
619    pub jwks_refresh_secs: u64,
620    /// Optional JSON-pointer into the JWT claims that extracts a
621    /// tenant identifier — attached to the principal and logged on
622    /// every request. Example: `"/tid"` (Azure AD), `"/org_id"`.
623    /// When empty, no tenant is extracted.
624    pub tenant_claim: String,
625    /// If non-empty, requests whose extracted tenant ID is not in this
626    /// list are rejected with 403. Has no effect when `tenant_claim`
627    /// is empty.
628    pub allowed_tenants: Vec<String>,
629    /// If `true`, `POST …/reload` accepts *either* a valid token with
630    /// `reload_scopes` *or* the legacy `X-Admin-Token` header. Defaults
631    /// to `true` for one-release backwards compatibility — flip to
632    /// `false` once your automation has migrated to OIDC.
633    pub admin_token_fallback: bool,
634}
635
636impl Default for AuthConfig {
637    fn default() -> Self {
638        Self {
639            enabled: false,
640            issuer: String::new(),
641            audience: String::new(),
642            read_scopes: Vec::new(),
643            reload_scopes: Vec::new(),
644            anonymous_read: false,
645            start_degraded: true,
646            algorithms: vec!["RS256".into()],
647            leeway_secs: 60,
648            jwks_refresh_secs: 3600,
649            tenant_claim: String::new(),
650            allowed_tenants: Vec::new(),
651            admin_token_fallback: true,
652        }
653    }
654}
655
656impl Backend {
657    pub fn as_str(self) -> &'static str {
658        match self {
659            Backend::Datafusion => "datafusion",
660            Backend::Duckdb => "duckdb",
661        }
662    }
663}
664
665#[derive(Debug, Clone, Deserialize, Serialize)]
666pub struct DatasetConfig {
667    pub name: String,
668    pub source: SourceConfig,
669    #[serde(default)]
670    pub s3: Option<S3Config>,
671    #[serde(default)]
672    pub index: IndexConfig,
673    /// Optional column projection applied at load time. When non-empty,
674    /// only the listed columns are read from the parquet/delta source —
675    /// every other column is skipped entirely (no decode, no allocation,
676    /// no resident memory). Empty (default) = read all columns. Names are
677    /// matched case-insensitively against the source schema.
678    #[serde(default)]
679    pub columns: Vec<String>,
680    /// When `true` (default), Utf8 columns that are dictionary-encoded in
681    /// the source parquet are read as Arrow `Dictionary(Int32, Utf8)`
682    /// instead of being expanded to plain Utf8. Massively cheaper in RAM
683    /// for low-cardinality columns. Set to `false` to bypass the override
684    /// — useful as a workaround if you observe null-handling oddities on
685    /// a particular parquet file.
686    #[serde(default = "default_true")]
687    pub dict_encode: bool,
688    /// When `true`, the backend should keep the dataset on disk and stream
689    /// it at query time instead of materialising it into RAM at startup.
690    /// Trades the in-memory hot paths (raw Arrow slice, equality index)
691    /// for bounded memory use on large / multi-file sources. Honoured by
692    /// the DataFusion backend (local + S3 parquet) and by the DuckDB
693    /// backend, which registers the dataset as a view over the source scan
694    /// (local + S3 parquet, and delta) rather than materialising a table.
695    #[serde(default)]
696    pub lazy: bool,
697    /// Column-level access control for query **predicates** — which columns
698    /// a caller may filter on (structured `predicates` / `count`, and any
699    /// reference on the raw-SQL endpoint). Mutually-exclusive `include`
700    /// (allowlist) / `exclude` (denylist). Empty (default) = no restriction.
701    #[serde(default)]
702    pub predicate_filter: ColumnFilter,
703    /// Column-level access control for **projection** — which columns a
704    /// caller may see or return (the `columns` projection, `group_by`,
705    /// aggregations, `order_by`, the `/schema` response and row sample, and
706    /// any reference on the raw-SQL endpoint). Columns denied here are
707    /// hidden as if they did not exist. Mutually-exclusive `include`
708    /// (allowlist) / `exclude` (denylist). Empty (default) = no restriction.
709    #[serde(default)]
710    pub projection_filter: ColumnFilter,
711}
712
713fn default_true() -> bool {
714    true
715}
716
717/// A mutually-exclusive column allow/deny list.
718///
719/// Set `include` to turn it into an **allowlist** — only the listed columns
720/// pass. Set `exclude` to turn it into a **denylist** — every column except
721/// the listed ones passes. Setting *both* is a configuration error (caught
722/// by [`ColumnFilter::validate`]). Leaving both empty (the default) imposes
723/// no restriction at all. Names are matched case-insensitively against the
724/// dataset's canonical column names.
725#[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq)]
726#[serde(default, deny_unknown_fields)]
727pub struct ColumnFilter {
728    #[serde(default)]
729    pub include: Vec<String>,
730    #[serde(default)]
731    pub exclude: Vec<String>,
732}
733
734impl ColumnFilter {
735    /// Whether this filter restricts anything (either list is non-empty).
736    pub fn is_active(&self) -> bool {
737        !self.include.is_empty() || !self.exclude.is_empty()
738    }
739
740    /// Whether `col` passes the filter. Case-insensitive. An empty filter
741    /// (neither list set) admits every column.
742    pub fn allows(&self, col: &str) -> bool {
743        let lc = col.to_lowercase();
744        if !self.include.is_empty() {
745            return self.include.iter().any(|c| c.to_lowercase() == lc);
746        }
747        if !self.exclude.is_empty() {
748            return !self.exclude.iter().any(|c| c.to_lowercase() == lc);
749        }
750        true
751    }
752
753    /// Reject a filter that sets both `include` and `exclude`. `ctx`
754    /// identifies the filter in the error message (e.g. `"predicate_filter"`).
755    pub fn validate(&self, dataset: &str, ctx: &str) -> Result<(), AppError> {
756        if !self.include.is_empty() && !self.exclude.is_empty() {
757            return Err(AppError::InvalidValue(format!(
758                "dataset '{dataset}': {ctx} may set 'include' or 'exclude', not both"
759            )));
760        }
761        Ok(())
762    }
763
764    /// The names listed by whichever side is active, for cross-checking
765    /// against the real schema at registration time (typos in a denylist
766    /// would otherwise silently expose a column).
767    pub fn listed(&self) -> &[String] {
768        if !self.include.is_empty() {
769            &self.include
770        } else {
771            &self.exclude
772        }
773    }
774}
775
776#[derive(Debug, Clone, Deserialize, Serialize)]
777pub struct SourceConfig {
778    pub kind: SourceKind,
779    /// Either a local filesystem path or an `s3://bucket/key` URL.
780    pub location: String,
781}
782
783#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
784#[serde(rename_all = "lowercase")]
785pub enum SourceKind {
786    #[default]
787    Parquet,
788    Delta,
789}
790
791impl SourceKind {
792    pub fn as_str(self) -> &'static str {
793        match self {
794            SourceKind::Parquet => "parquet",
795            SourceKind::Delta => "delta",
796        }
797    }
798}
799
800/// Non-secret S3 connection settings. Credentials are pulled from env / the
801/// AWS credential chain — see [`DatasetConfig::resolved_creds`].
802#[derive(Debug, Clone, Deserialize, Serialize)]
803#[serde(default)]
804pub struct S3Config {
805    pub region: Option<String>,
806    /// Custom endpoint (MinIO, R2, Wasabi, LocalStack, …). Omit for AWS.
807    pub endpoint: Option<String>,
808    /// `virtual` (default — `bucket.host`) or `path` (`host/bucket/`).
809    /// MinIO and most non-AWS providers require `path`.
810    pub addressing_style: AddressingStyle,
811    /// Allow plain-HTTP endpoints. Required for local MinIO over `http://…`.
812    pub allow_http: bool,
813    /// Inline credentials. Strongly discouraged in production — prefer env
814    /// vars (see module docs).
815    pub access_key_id: Option<String>,
816    pub secret_access_key: Option<String>,
817    pub session_token: Option<String>,
818    /// Hive partition-column handling for parquet sources. Defaults to
819    /// `auto` (detect from the path). See [`Partitioning`].
820    pub partitioning: Partitioning,
821    /// Whether the bucket is folded into a custom `endpoint` host for
822    /// virtual-hosted-style requests. Defaults to `auto`. See
823    /// [`BucketInHost`].
824    pub endpoint_bucket_in_host: BucketInHost,
825}
826
827impl Default for S3Config {
828    fn default() -> Self {
829        Self {
830            region: None,
831            endpoint: None,
832            addressing_style: AddressingStyle::Virtual,
833            allow_http: false,
834            access_key_id: None,
835            secret_access_key: None,
836            session_token: None,
837            partitioning: Partitioning::Auto,
838            endpoint_bucket_in_host: BucketInHost::Auto,
839        }
840    }
841}
842
843impl S3Config {
844    /// Resolve the endpoint URL to hand to the object store, optionally
845    /// folding `bucket` into the host for virtual-hosted-style requests so a
846    /// plain `endpoint` works the same way it does on DuckDB.
847    ///
848    /// Returns `None` when no custom endpoint is configured (AWS default).
849    /// The bucket is only prepended when it isn't already the leading host
850    /// label, so re-running this (or a config that already embeds the
851    /// bucket) never produces `bucket.bucket.host`.
852    pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
853        let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
854
855        let fold = match self.endpoint_bucket_in_host {
856            BucketInHost::False => false,
857            BucketInHost::True => true,
858            BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
859        };
860        if !fold {
861            return Some(ep.to_string());
862        }
863
864        let (scheme, host_and_path) = match ep.split_once("://") {
865            Some((s, rest)) => (Some(s), rest),
866            None => (None, ep),
867        };
868        // Split host from any trailing path so we prefix the host label only.
869        let (host, path) = match host_and_path.split_once('/') {
870            Some((h, p)) => (h, Some(p)),
871            None => (host_and_path, None),
872        };
873        // Guard against double-prefixing.
874        if host == bucket || host.starts_with(&format!("{bucket}.")) {
875            return Some(ep.to_string());
876        }
877        let new_host = format!("{bucket}.{host}");
878        let rebuilt = match (scheme, path) {
879            (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
880            (Some(s), None) => format!("{s}://{new_host}"),
881            (None, Some(p)) => format!("{new_host}/{p}"),
882            (None, None) => new_host,
883        };
884        Some(rebuilt)
885    }
886}
887
888#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
889#[serde(rename_all = "lowercase")]
890pub enum AddressingStyle {
891    #[default]
892    Virtual,
893    Path,
894}
895
896impl AddressingStyle {
897    pub fn as_str(self) -> &'static str {
898        match self {
899            AddressingStyle::Virtual => "virtual",
900            AddressingStyle::Path => "path",
901        }
902    }
903}
904
905/// How hive-style partition columns (`key=value/` path segments) are handled
906/// for an S3 parquet source. Local parquet always auto-detects; this option
907/// brings S3 in line and lets you force or disable the behaviour.
908#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
909#[serde(rename_all = "lowercase")]
910pub enum Partitioning {
911    /// Detect `key=value` segments from the location glob or by listing the
912    /// prefix. No partition columns are added when none are found.
913    #[default]
914    Auto,
915    /// Force hive partitioning. Partition keys are taken from the location
916    /// glob, or discovered by listing the prefix.
917    Hive,
918    /// Treat the source as a flat set of parquet files — never add partition
919    /// columns even if the path looks hive-partitioned.
920    None,
921}
922
923impl Partitioning {
924    pub fn as_str(self) -> &'static str {
925        match self {
926            Partitioning::Auto => "auto",
927            Partitioning::Hive => "hive",
928            Partitioning::None => "none",
929        }
930    }
931}
932
933/// Whether the bucket name is folded into the endpoint hostname for
934/// virtual-hosted-style requests against a custom endpoint. This aligns the
935/// DataFusion object-store path with DuckDB, which builds the virtual host
936/// itself — so the same plain `endpoint` works on both backends.
937#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
938#[serde(rename_all = "lowercase")]
939pub enum BucketInHost {
940    /// Fold the bucket into the host when `addressing_style = "virtual"` and
941    /// a custom `endpoint` is set (guarded against double-prefixing).
942    #[default]
943    Auto,
944    /// Always fold the bucket into the endpoint host.
945    True,
946    /// Never rewrite the endpoint — pass it through verbatim.
947    False,
948}
949
950impl BucketInHost {
951    pub fn as_str(self) -> &'static str {
952        match self {
953            BucketInHost::Auto => "auto",
954            BucketInHost::True => "true",
955            BucketInHost::False => "false",
956        }
957    }
958}
959
960#[derive(Debug, Clone, Deserialize, Serialize)]
961#[serde(default)]
962pub struct IndexConfig {
963    pub mode: IndexMode,
964    pub columns: Vec<String>,
965    pub max_cardinality: usize,
966}
967
968impl Default for IndexConfig {
969    fn default() -> Self {
970        Self {
971            mode: IndexMode::Auto,
972            columns: Vec::new(),
973            max_cardinality: 100_000,
974        }
975    }
976}
977
978#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
979#[serde(rename_all = "lowercase")]
980pub enum IndexMode {
981    #[default]
982    Auto,
983    None,
984    List,
985}
986
987/// Resolved S3 credentials. `None` fields mean "let the engine's default
988/// provider chain figure it out".
989#[derive(Debug, Clone, Default)]
990pub struct ResolvedCreds {
991    pub access_key_id: Option<String>,
992    pub secret_access_key: Option<String>,
993    pub session_token: Option<String>,
994}
995
996impl ResolvedCreds {
997    pub fn has_keypair(&self) -> bool {
998        self.access_key_id.is_some() && self.secret_access_key.is_some()
999    }
1000}
1001
1002// ---------------------------------------------------------------------------
1003// Loading + validation
1004// ---------------------------------------------------------------------------
1005
1006impl AppConfig {
1007    /// Read and validate a TOML config file.
1008    pub fn load(path: &str) -> Result<Self, AppError> {
1009        let raw = std::fs::read_to_string(path)
1010            .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
1011        let mut cfg: AppConfig =
1012            toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
1013        cfg.normalize();
1014        cfg.validate()?;
1015        // Remember where we loaded from so the explorer can optionally
1016        // append newly-registered datasets back to this file. Ignore the
1017        // error if it was already set (only the first load wins).
1018        let _ = SOURCE_CONFIG_PATH.set(PathBuf::from(path));
1019        Ok(cfg)
1020    }
1021
1022    /// Canonicalise fields that are compared case-insensitively at runtime.
1023    ///
1024    /// Token scopes are lowercased when parsed out of a JWT (see `auth.rs`),
1025    /// so the configured `read_scopes` / `reload_scopes` are lowercased here
1026    /// once at load time. Without this an operator who writes
1027    /// `"Datasets:Read"` would silently 403 every caller, since the token
1028    /// side would have become `datasets:read`.
1029    fn normalize(&mut self) {
1030        for s in self
1031            .auth
1032            .read_scopes
1033            .iter_mut()
1034            .chain(self.auth.reload_scopes.iter_mut())
1035        {
1036            *s = s.to_ascii_lowercase();
1037        }
1038    }
1039
1040    fn validate(&self) -> Result<(), AppError> {
1041        // Server prefix: empty, or must start with '/' and not end with '/'.
1042        let p = &self.server.prefix;
1043        if !p.is_empty() {
1044            if !p.starts_with('/') {
1045                return Err(AppError::Internal(format!(
1046                    "server.prefix must start with '/' (got '{p}')"
1047                )));
1048            }
1049            if p.ends_with('/') {
1050                return Err(AppError::Internal(format!(
1051                    "server.prefix must not end with '/' (got '{p}')"
1052                )));
1053            }
1054        }
1055
1056        if self.datasets.is_empty() {
1057            return Err(AppError::Internal(
1058                "datasets.toml has no [[dataset]] entries".into(),
1059            ));
1060        }
1061
1062        if self.server.quack.enabled {
1063            self.server.quack.validate_enabled()?;
1064        }
1065
1066        if self.server.pgwire.enabled {
1067            self.server.pgwire.validate_enabled()?;
1068        }
1069
1070        // Validate the docs mount path even when the section is disabled,
1071        // so an inactive config typo can't go unnoticed.
1072        {
1073            let dp = &self.docs.path;
1074            if !dp.starts_with('/') {
1075                return Err(AppError::Internal(format!(
1076                    "docs.path must start with '/' (got '{dp}')"
1077                )));
1078            }
1079            if dp.len() > 1 && dp.ends_with('/') {
1080                return Err(AppError::Internal(format!(
1081                    "docs.path must not end with '/' (got '{dp}')"
1082                )));
1083            }
1084            if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
1085                return Err(AppError::Internal(format!(
1086                    "docs.path '{dp}' collides with a reserved route"
1087                )));
1088            }
1089        }
1090
1091        // Same for the swagger UI mount.
1092        {
1093            let sp = &self.swagger.path;
1094            if !sp.starts_with('/') {
1095                return Err(AppError::Internal(format!(
1096                    "swagger.path must start with '/' (got '{sp}')"
1097                )));
1098            }
1099            if sp.len() > 1 && sp.ends_with('/') {
1100                return Err(AppError::Internal(format!(
1101                    "swagger.path must not end with '/' (got '{sp}')"
1102                )));
1103            }
1104            if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
1105                return Err(AppError::Internal(format!(
1106                    "swagger.path '{sp}' collides with a reserved route"
1107                )));
1108            }
1109            if sp == &self.docs.path {
1110                return Err(AppError::Internal(format!(
1111                    "swagger.path and docs.path must differ (both '{sp}')"
1112                )));
1113            }
1114            if let Some(o) = &self.swagger.oauth2 {
1115                if o.issuer.trim().is_empty() {
1116                    return Err(AppError::Internal(
1117                        "swagger.oauth2.issuer must not be empty".into(),
1118                    ));
1119                }
1120                if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
1121                    return Err(AppError::Internal(format!(
1122                        "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
1123                        o.issuer
1124                    )));
1125                }
1126                if o.client_id.trim().is_empty() {
1127                    return Err(AppError::Internal(
1128                        "swagger.oauth2.client_id must not be empty".into(),
1129                    ));
1130                }
1131            }
1132        }
1133
1134        // Metrics endpoint mount path. Validated even when disabled so an
1135        // inactive config typo can't go unnoticed. `/metrics` is itself a
1136        // reserved mount (so docs/swagger can't shadow it), so we check the
1137        // remaining reserved routes — and the docs/swagger paths — for
1138        // collisions rather than the whole list.
1139        {
1140            let mp = &self.metrics.path;
1141            if !mp.starts_with('/') {
1142                return Err(AppError::Internal(format!(
1143                    "metrics.path must start with '/' (got '{mp}')"
1144                )));
1145            }
1146            if mp.len() > 1 && mp.ends_with('/') {
1147                return Err(AppError::Internal(format!(
1148                    "metrics.path must not end with '/' (got '{mp}')"
1149                )));
1150            }
1151            if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
1152                return Err(AppError::Internal(format!(
1153                    "metrics.path '{mp}' collides with a reserved route"
1154                )));
1155            }
1156            if mp == &self.docs.path {
1157                return Err(AppError::Internal(format!(
1158                    "metrics.path and docs.path must differ (both '{mp}')"
1159                )));
1160            }
1161            if mp == &self.swagger.path {
1162                return Err(AppError::Internal(format!(
1163                    "metrics.path and swagger.path must differ (both '{mp}')"
1164                )));
1165            }
1166        }
1167
1168        // Explorer UI mount path. Validated even when disabled so an
1169        // inactive config typo can't go unnoticed.
1170        {
1171            let ep = &self.explorer.path;
1172            if !ep.starts_with('/') {
1173                return Err(AppError::Internal(format!(
1174                    "explorer.path must start with '/' (got '{ep}')"
1175                )));
1176            }
1177            if ep.len() > 1 && ep.ends_with('/') {
1178                return Err(AppError::Internal(format!(
1179                    "explorer.path must not end with '/' (got '{ep}')"
1180                )));
1181            }
1182            if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
1183                return Err(AppError::Internal(format!(
1184                    "explorer.path '{ep}' collides with a reserved route"
1185                )));
1186            }
1187            if ep == &self.docs.path {
1188                return Err(AppError::Internal(format!(
1189                    "explorer.path and docs.path must differ (both '{ep}')"
1190                )));
1191            }
1192            if ep == &self.swagger.path {
1193                return Err(AppError::Internal(format!(
1194                    "explorer.path and swagger.path must differ (both '{ep}')"
1195                )));
1196            }
1197            if ep == &self.metrics.path {
1198                return Err(AppError::Internal(format!(
1199                    "explorer.path and metrics.path must differ (both '{ep}')"
1200                )));
1201            }
1202        }
1203
1204        // Auth block — only meaningful when `enabled = true`. The cargo
1205        // feature gate is enforced separately in `server::serve` so a
1206        // binary built without `--features auth` and a config with
1207        // `auth.enabled = true` aborts with a clear error.
1208        if self.auth.enabled {
1209            let a = &self.auth;
1210            if a.issuer.trim().is_empty() {
1211                return Err(AppError::Internal(
1212                    "auth.issuer must not be empty when auth.enabled = true".into(),
1213                ));
1214            }
1215            if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
1216                return Err(AppError::Internal(format!(
1217                    "auth.issuer must be an absolute http(s) URL (got '{}')",
1218                    a.issuer
1219                )));
1220            }
1221            for alg in &a.algorithms {
1222                match alg.as_str() {
1223                    "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
1224                    | "PS512" => {}
1225                    other => {
1226                        return Err(AppError::Internal(format!(
1227                            "auth.algorithms[{other}] is not allowed; pick one of \
1228                         RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
1229                        )));
1230                    }
1231                }
1232            }
1233            if a.algorithms.is_empty() {
1234                return Err(AppError::Internal(
1235                    "auth.algorithms must not be empty".into(),
1236                ));
1237            }
1238            if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
1239                return Err(AppError::Internal(format!(
1240                    "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
1241                    a.tenant_claim
1242                )));
1243            }
1244            if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
1245                return Err(AppError::Internal(
1246                    "auth.allowed_tenants is set but auth.tenant_claim is empty — \
1247                     can't enforce a tenant allow-list without a claim to extract from"
1248                        .into(),
1249                ));
1250            }
1251        }
1252
1253        let mut seen = HashSet::new();
1254        for d in &self.datasets {
1255            if !seen.insert(d.name.as_str()) {
1256                return Err(AppError::Internal(format!(
1257                    "duplicate dataset name: {}",
1258                    d.name
1259                )));
1260            }
1261            if d.name.is_empty() {
1262                return Err(AppError::Internal("dataset name must not be empty".into()));
1263            }
1264            // URL-safe: alphanum + _ - .
1265            if !d
1266                .name
1267                .chars()
1268                .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1269            {
1270                return Err(AppError::Internal(format!(
1271                    "dataset name '{}' must be alphanumeric (plus _ - .)",
1272                    d.name
1273                )));
1274            }
1275
1276            if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
1277                return Err(AppError::Internal(format!(
1278                    "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1279                    d.name
1280                )));
1281            }
1282
1283            // Location-specific checks.
1284            if d.source.is_s3() {
1285                d.source.s3_bucket()?;
1286                if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1287                    && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1288                    && std::env::var("AWS_REGION").is_err()
1289                    && std::env::var("AWS_DEFAULT_REGION").is_err()
1290                {
1291                    log::warn!(
1292                        "dataset '{}': S3 source without explicit region — \
1293                         relying on AWS_REGION env var",
1294                        d.name
1295                    );
1296                }
1297            } else {
1298                // Local path. For parquet we can fully resolve to a file
1299                // list up front; for delta we only check that the directory
1300                // exists (delta has its own layout — _delta_log/, …).
1301                match d.source.kind {
1302                    SourceKind::Parquet => {
1303                        d.resolve_local_parquet_files()?;
1304                    }
1305                    SourceKind::Delta => {
1306                        let p = Path::new(&d.source.location);
1307                        if !p.exists() {
1308                            return Err(AppError::Internal(format!(
1309                                "dataset '{}': delta location does not exist: {}",
1310                                d.name, d.source.location
1311                            )));
1312                        }
1313                    }
1314                }
1315            }
1316        }
1317        Ok(())
1318    }
1319}
1320
1321impl SourceConfig {
1322    pub fn is_s3(&self) -> bool {
1323        self.location.starts_with("s3://")
1324    }
1325
1326    /// True when the location already contains a glob metacharacter
1327    /// (`*`, `?`, or `[`).
1328    pub fn has_glob(&self) -> bool {
1329        self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1330    }
1331
1332    /// Location to hand to a backend that needs an explicit parquet glob
1333    /// (DuckDB). When the location is a plain S3 prefix with no glob, append
1334    /// a recursive `**/*.parquet` so DuckDB lists the prefix the same way
1335    /// DataFusion's object-store listing does. Globbed or non-S3 locations
1336    /// are returned unchanged.
1337    pub fn s3_recursive_parquet_glob(&self) -> String {
1338        if !self.is_s3() || self.has_glob() {
1339            return self.location.clone();
1340        }
1341        let trimmed = self.location.trim_end_matches('/');
1342        format!("{trimmed}/**/*.parquet")
1343    }
1344
1345    /// Returns `(bucket, key_prefix_or_empty)` for an `s3://…` location.
1346    pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1347        let rest = self
1348            .location
1349            .strip_prefix("s3://")
1350            .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1351        let (bucket, key) = match rest.split_once('/') {
1352            Some((b, k)) => (b, k),
1353            None => (rest, ""),
1354        };
1355        if bucket.is_empty() {
1356            return Err(AppError::Internal(format!(
1357                "s3 URL missing bucket: {}",
1358                self.location
1359            )));
1360        }
1361        Ok((bucket, key))
1362    }
1363}
1364
1365impl DatasetConfig {
1366    /// Validate a dataset config supplied at runtime (e.g. registered live
1367    /// through the explorer) with the same rules the startup loader applies
1368    /// to each `[[dataset]]`: non-empty, URL-safe name and a coherent index
1369    /// configuration. Source reachability is left to the backend, which
1370    /// surfaces a clear error when it tries to open the source.
1371    pub fn validate_for_register(&self) -> Result<(), AppError> {
1372        if self.name.is_empty() {
1373            return Err(AppError::InvalidValue(
1374                "dataset name must not be empty".into(),
1375            ));
1376        }
1377        if !self
1378            .name
1379            .chars()
1380            .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1381        {
1382            return Err(AppError::InvalidValue(format!(
1383                "dataset name '{}' must be alphanumeric (plus _ - .)",
1384                self.name
1385            )));
1386        }
1387        if self.index.mode == IndexMode::List && self.index.columns.is_empty() {
1388            return Err(AppError::InvalidValue(format!(
1389                "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1390                self.name
1391            )));
1392        }
1393        self.predicate_filter.validate(&self.name, "predicate_filter")?;
1394        self.projection_filter
1395            .validate(&self.name, "projection_filter")?;
1396        if self.source.is_s3() {
1397            self.source.s3_bucket()?;
1398        }
1399        Ok(())
1400    }
1401
1402    /// Render this dataset as a standalone TOML `[[dataset]]` block suitable
1403    /// for pasting into (or appending to) a `datasets.toml`. Fields are
1404    /// emitted scalars-first so the output is valid TOML, and default-valued
1405    /// sections (`[dataset.s3]`, a default `[dataset.index]`, an empty
1406    /// projection) are omitted to keep the snippet minimal.
1407    pub fn to_toml_block(&self) -> Result<String, AppError> {
1408        #[derive(Serialize)]
1409        struct Block {
1410            name: String,
1411            #[serde(skip_serializing_if = "Vec::is_empty")]
1412            columns: Vec<String>,
1413            dict_encode: bool,
1414            lazy: bool,
1415            source: SourceConfig,
1416            #[serde(skip_serializing_if = "Option::is_none")]
1417            s3: Option<S3Config>,
1418            #[serde(skip_serializing_if = "Option::is_none")]
1419            index: Option<IndexConfig>,
1420            #[serde(skip_serializing_if = "Option::is_none")]
1421            predicate_filter: Option<ColumnFilter>,
1422            #[serde(skip_serializing_if = "Option::is_none")]
1423            projection_filter: Option<ColumnFilter>,
1424        }
1425        #[derive(Serialize)]
1426        struct Doc {
1427            dataset: [Block; 1],
1428        }
1429        let doc = Doc {
1430            dataset: [Block {
1431                name: self.name.clone(),
1432                columns: self.columns.clone(),
1433                dict_encode: self.dict_encode,
1434                lazy: self.lazy,
1435                source: self.source.clone(),
1436                s3: self.s3.clone(),
1437                index: if self.index.is_default() {
1438                    None
1439                } else {
1440                    Some(self.index.clone())
1441                },
1442                predicate_filter: self
1443                    .predicate_filter
1444                    .is_active()
1445                    .then(|| self.predicate_filter.clone()),
1446                projection_filter: self
1447                    .projection_filter
1448                    .is_active()
1449                    .then(|| self.projection_filter.clone()),
1450            }],
1451        };
1452        toml::to_string_pretty(&doc)
1453            .map_err(|e| AppError::Internal(format!("failed to render dataset TOML: {e}")))
1454    }
1455
1456    /// Append this dataset's `[[dataset]]` block to the config file this
1457    /// process was loaded from, so a runtime-registered dataset survives a
1458    /// restart. Returns the path written to.
1459    ///
1460    /// Errors with `AppError::InvalidValue` when the process has no on-disk
1461    /// config ([`source_config_path`] is `None` — e.g. the Python bindings),
1462    /// and with `AppError::Internal` when rendering or the file write fails.
1463    /// Shared by the versioned API (`POST /api/v1/datasets/persist`) and the
1464    /// explorer's persist action.
1465    pub fn persist_to_source_config(&self) -> Result<PathBuf, AppError> {
1466        use std::io::Write;
1467        let path = source_config_path().ok_or_else(|| {
1468            AppError::InvalidValue("server has no on-disk config file to append to".into())
1469        })?;
1470        let block = self.to_toml_block()?;
1471        let mut file = std::fs::OpenOptions::new()
1472            .append(true)
1473            .open(path)
1474            .map_err(|e| {
1475                AppError::Internal(format!("failed to open config {}: {e}", path.display()))
1476            })?;
1477        // Separate the appended block from existing content by a blank line.
1478        write!(file, "\n{block}").map_err(|e| {
1479            AppError::Internal(format!("failed to write config {}: {e}", path.display()))
1480        })?;
1481        Ok(path.to_path_buf())
1482    }
1483}
1484
1485impl IndexConfig {
1486    /// Whether this equals the serde default (used to omit the section from
1487    /// exported TOML when it carries no information).
1488    fn is_default(&self) -> bool {
1489        self.mode == IndexMode::Auto && self.columns.is_empty() && self.max_cardinality == 100_000
1490    }
1491}
1492
1493impl DatasetConfig {
1494    /// Expand `source.location` to a concrete list of local `.parquet`
1495    /// files. Only valid for `kind = parquet` on local paths — S3 and
1496    /// Delta sources are resolved by the backend itself.
1497    ///
1498    /// Accepts three location shapes:
1499    ///   * a single `*.parquet` file
1500    ///   * a directory (lists every `*.parquet` directly inside, non-recursive)
1501    ///   * a glob pattern containing `*`, `?` or `[…]` (e.g.
1502    ///     `data/year=2024/*.parquet`, `data/**/*.parquet`)
1503    pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1504        if self.source.is_s3() {
1505            return Err(AppError::Internal(format!(
1506                "dataset '{}': resolve_local_parquet_files called on s3 source",
1507                self.name
1508            )));
1509        }
1510        let loc = &self.source.location;
1511
1512        // Glob pattern? Expand and require at least one match.
1513        if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1514            let mut files: Vec<PathBuf> = glob::glob(loc)
1515                .map_err(|e| {
1516                    AppError::Internal(format!(
1517                        "dataset '{}': bad glob pattern '{loc}': {e}",
1518                        self.name
1519                    ))
1520                })?
1521                .filter_map(|r| r.ok())
1522                .filter(|p| {
1523                    p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1524                })
1525                .collect();
1526            files.sort();
1527            if files.is_empty() {
1528                return Err(AppError::EmptyDataset(format!(
1529                    "dataset '{}': glob '{loc}' matched no .parquet files",
1530                    self.name
1531                )));
1532            }
1533            return Ok(files);
1534        }
1535
1536        let path = Path::new(loc);
1537        if !path.exists() {
1538            return Err(AppError::Internal(format!(
1539                "dataset '{}': source path does not exist: {loc}",
1540                self.name
1541            )));
1542        }
1543
1544        if path.is_file() {
1545            if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1546                return Err(AppError::Internal(format!(
1547                    "dataset '{}': source must be a .parquet file",
1548                    self.name
1549                )));
1550            }
1551            return Ok(vec![path.to_path_buf()]);
1552        }
1553
1554        let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1555            .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1556            .filter_map(|entry| entry.ok().map(|e| e.path()))
1557            .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1558            .collect();
1559        files.sort();
1560        if files.is_empty() {
1561            return Err(AppError::EmptyDataset(format!(
1562                "dataset '{}': no *.parquet files found in {loc}",
1563                self.name
1564            )));
1565        }
1566        Ok(files)
1567    }
1568
1569    /// Estimate the on-disk byte size of this dataset's local backing
1570    /// files. Returns `None` for S3 sources (sizing would require a
1571    /// network round-trip) or when nothing can be measured.
1572    ///
1573    /// * `parquet` sums the resolved `.parquet` files (single file,
1574    ///   directory, or glob).
1575    /// * `delta` sums every `*.parquet` data file under the table root.
1576    ///   This slightly over-counts when stale files haven't been vacuumed,
1577    ///   which is fine for a coarse force-lazy threshold.
1578    pub fn estimate_local_bytes(&self) -> Option<u64> {
1579        if self.source.is_s3() {
1580            return None;
1581        }
1582        match self.source.kind {
1583            SourceKind::Parquet => {
1584                let files = self.resolve_local_parquet_files().ok()?;
1585                Some(
1586                    files
1587                        .iter()
1588                        .filter_map(|p| std::fs::metadata(p).ok())
1589                        .map(|m| m.len())
1590                        .sum(),
1591                )
1592            }
1593            SourceKind::Delta => {
1594                let root = self.source.location.trim_end_matches('/');
1595                let pattern = format!("{root}/**/*.parquet");
1596                let paths = glob::glob(&pattern).ok()?;
1597                Some(
1598                    paths
1599                        .filter_map(Result::ok)
1600                        .filter_map(|p| std::fs::metadata(&p).ok())
1601                        .filter(|m| m.is_file())
1602                        .map(|m| m.len())
1603                        .sum(),
1604                )
1605            }
1606        }
1607    }
1608
1609    /// Decide whether this dataset should be forced into lazy mode given
1610    /// the server's `force_lazy_above_mb` threshold. Returns `Some(bytes)`
1611    /// (the measured size) when it should be forced, so the caller can log
1612    /// it. Returns `None` when the dataset is already lazy, the threshold
1613    /// is disabled, the source is S3, or the measured size is unknown or at
1614    /// or below the threshold.
1615    pub fn force_lazy_bytes(&self, server: &ServerConfig) -> Option<u64> {
1616        if self.lazy || server.force_lazy_above_mb == 0 {
1617            return None;
1618        }
1619        let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1620        match self.estimate_local_bytes() {
1621            Some(bytes) if bytes > threshold => Some(bytes),
1622            _ => None,
1623        }
1624    }
1625
1626    /// Env-var prefix derived from the dataset name: uppercase with
1627    /// non-alphanumeric chars replaced by `_`. E.g. `sales.eu-1` →
1628    /// `SALES_EU_1`.
1629    pub fn env_prefix(&self) -> String {
1630        self.name
1631            .chars()
1632            .map(|c| {
1633                if c.is_ascii_alphanumeric() {
1634                    c.to_ascii_uppercase()
1635                } else {
1636                    '_'
1637                }
1638            })
1639            .collect()
1640    }
1641
1642    /// Resolve S3 credentials following the precedence chain documented at
1643    /// the top of this module. Returns an empty struct when nothing was
1644    /// found — the caller should then leave credential resolution to the
1645    /// engine's default provider chain.
1646    pub fn resolved_creds(&self) -> ResolvedCreds {
1647        let prefix = self.env_prefix();
1648        let from_env = |suffix: &str| {
1649            std::env::var(format!("{prefix}_{suffix}"))
1650                .ok()
1651                .filter(|s| !s.is_empty())
1652        };
1653        let inline = self.s3.as_ref();
1654        let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1655
1656        ResolvedCreds {
1657            access_key_id: from_env("AWS_ACCESS_KEY_ID")
1658                .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1659                .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1660            secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1661                .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1662                .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1663            session_token: from_env("AWS_SESSION_TOKEN")
1664                .or_else(|| inline.and_then(|s| s.session_token.clone()))
1665                .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1666        }
1667    }
1668
1669    /// Resolved S3 region: per-dataset env (`${PREFIX}_AWS_REGION`)
1670    /// → inline → `AWS_REGION` → `AWS_DEFAULT_REGION` → `us-east-1`.
1671    pub fn resolved_region(&self) -> String {
1672        let prefix = self.env_prefix();
1673        std::env::var(format!("{prefix}_AWS_REGION"))
1674            .ok()
1675            .filter(|s| !s.is_empty())
1676            .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1677            .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1678            .or_else(|| {
1679                std::env::var("AWS_DEFAULT_REGION")
1680                    .ok()
1681                    .filter(|s| !s.is_empty())
1682            })
1683            .unwrap_or_else(|| "us-east-1".to_string())
1684    }
1685}
1686
1687#[cfg(test)]
1688mod tests {
1689    use super::*;
1690
1691    #[test]
1692    fn server_defaults() {
1693        let s = ServerConfig::default();
1694        assert_eq!(s.backend, Backend::Datafusion);
1695        assert_eq!(s.port, 8080);
1696        assert!(s.compress);
1697        assert_eq!(s.max_body_bytes, 1024 * 1024);
1698        assert_eq!(s.max_page_size, 100_000);
1699        assert_eq!(s.force_lazy_above_mb, 0);
1700        assert_eq!(s.request_timeout_ms, 30_000);
1701        assert!(!s.quack.enabled);
1702        assert_eq!(s.quack.uri, "quack:localhost");
1703        assert!(s.quack.token.is_none());
1704        assert!(!s.quack.allow_other_hostname);
1705        assert!(s.quack.read_only);
1706        assert_eq!(s.prefix, "");
1707        assert!(s.listen.is_loopback());
1708    }
1709
1710    #[test]
1711    fn server_overrides_from_toml() {
1712        let toml = r#"
1713            [server]
1714            backend = "duckdb"
1715            port = 9000
1716            prefix = "/datapress"
1717            compress = false
1718            max_body_bytes = 4096
1719            max_page_size = 50000
1720            force_lazy_above_mb = 256
1721            request_timeout_ms = 0
1722
1723            [server.quack]
1724            enabled = true
1725            uri = "quack:localhost:9495"
1726            token = "test-token"
1727            read_only = false
1728            [[dataset]]
1729            name = "x"
1730            source.kind = "parquet"
1731            source.location = "/tmp/missing.parquet"
1732        "#;
1733        let cfg: AppConfig = toml::from_str(toml).unwrap();
1734        assert_eq!(cfg.server.backend, Backend::Duckdb);
1735        assert_eq!(cfg.server.port, 9000);
1736        assert_eq!(cfg.server.prefix, "/datapress");
1737        assert!(!cfg.server.compress);
1738        assert_eq!(cfg.server.max_body_bytes, 4096);
1739        assert_eq!(cfg.server.max_page_size, 50_000);
1740        assert_eq!(cfg.server.force_lazy_above_mb, 256);
1741        assert_eq!(cfg.server.request_timeout_ms, 0);
1742        assert!(cfg.server.quack.enabled);
1743        assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1744        assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1745        assert!(!cfg.server.quack.read_only);
1746        assert_eq!(cfg.datasets.len(), 1);
1747        assert_eq!(cfg.datasets[0].name, "x");
1748        assert!(cfg.datasets[0].dict_encode); // default
1749    }
1750
1751    #[test]
1752    fn force_lazy_bytes_logic() {
1753        // A unique temp dir with one 2 MiB "parquet" file (contents are
1754        // irrelevant — estimate_local_bytes only stats file lengths).
1755        let dir = std::env::temp_dir().join(format!(
1756            "dp-force-lazy-{}-{}",
1757            std::process::id(),
1758            std::time::SystemTime::now()
1759                .duration_since(std::time::UNIX_EPOCH)
1760                .unwrap()
1761                .as_nanos()
1762        ));
1763        std::fs::create_dir_all(&dir).unwrap();
1764        let two_mib = 2 * 1024 * 1024;
1765        let file = dir.join("data.parquet");
1766        std::fs::write(&file, vec![0u8; two_mib]).unwrap();
1767
1768        let mk = |kind: SourceKind, location: &str, lazy: bool| DatasetConfig {
1769            name: "t".into(),
1770            source: SourceConfig {
1771                kind,
1772                location: location.to_string(),
1773            },
1774            s3: None,
1775            index: IndexConfig::default(),
1776            columns: vec![],
1777            dict_encode: true,
1778            lazy,
1779            predicate_filter: Default::default(),
1780            projection_filter: Default::default(),
1781        };
1782        let server = |mb: u64| ServerConfig {
1783            force_lazy_above_mb: mb,
1784            ..ServerConfig::default()
1785        };
1786
1787        // Sizing: single file, directory walk, and delta dir walk all see 2 MiB.
1788        let file_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), false);
1789        assert_eq!(file_ds.estimate_local_bytes(), Some(two_mib as u64));
1790        let dir_ds = mk(SourceKind::Parquet, dir.to_str().unwrap(), false);
1791        assert_eq!(dir_ds.estimate_local_bytes(), Some(two_mib as u64));
1792        let delta_ds = mk(SourceKind::Delta, dir.to_str().unwrap(), false);
1793        assert_eq!(delta_ds.estimate_local_bytes(), Some(two_mib as u64));
1794
1795        // Threshold disabled → never force.
1796        assert_eq!(file_ds.force_lazy_bytes(&server(0)), None);
1797        // 1 MiB threshold, 2 MiB file → forced (returns measured size).
1798        assert_eq!(file_ds.force_lazy_bytes(&server(1)), Some(two_mib as u64));
1799        // 4 MiB threshold → not forced.
1800        assert_eq!(file_ds.force_lazy_bytes(&server(4)), None);
1801
1802        // Already-lazy datasets are never re-flagged.
1803        let lazy_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), true);
1804        assert_eq!(lazy_ds.force_lazy_bytes(&server(1)), None);
1805
1806        // S3 sources can't be measured → never auto-forced.
1807        let s3_ds = mk(SourceKind::Parquet, "s3://bucket/data.parquet", false);
1808        assert_eq!(s3_ds.estimate_local_bytes(), None);
1809        assert_eq!(s3_ds.force_lazy_bytes(&server(1)), None);
1810
1811        std::fs::remove_dir_all(&dir).ok();
1812    }
1813
1814    #[test]
1815    fn validate_rejects_bad_prefix() {
1816        let bad = ["no-leading-slash", "/trailing/"];
1817        for p in bad {
1818            let cfg = AppConfig {
1819                server: ServerConfig {
1820                    prefix: p.to_string(),
1821                    ..Default::default()
1822                },
1823                docs: DocsConfig::default(),
1824                swagger: SwaggerConfig::default(),
1825                metrics: MetricsConfig::default(),
1826                explorer: ExplorerConfig::default(),
1827                sql: SqlConfig::default(),
1828                datafusion: DataFusionConfig::default(),
1829                auth: AuthConfig::default(),
1830                datasets: vec![],
1831            };
1832            assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1833        }
1834    }
1835
1836    #[test]
1837    fn normalize_lowercases_configured_scopes() {
1838        let mut cfg = AppConfig {
1839            server: ServerConfig::default(),
1840            docs: DocsConfig::default(),
1841            swagger: SwaggerConfig::default(),
1842            metrics: MetricsConfig::default(),
1843            explorer: ExplorerConfig::default(),
1844            sql: SqlConfig::default(),
1845            datafusion: DataFusionConfig::default(),
1846            auth: AuthConfig {
1847                read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1848                reload_scopes: vec!["Datasets:Reload".into()],
1849                ..Default::default()
1850            },
1851            datasets: vec![],
1852        };
1853        cfg.normalize();
1854        assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1855        assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1856    }
1857
1858    #[test]
1859    fn validate_rejects_no_datasets() {
1860        let cfg = AppConfig {
1861            server: ServerConfig::default(),
1862            docs: DocsConfig::default(),
1863            swagger: SwaggerConfig::default(),
1864            metrics: MetricsConfig::default(),
1865            explorer: ExplorerConfig::default(),
1866            sql: SqlConfig::default(),
1867            datafusion: DataFusionConfig::default(),
1868            auth: AuthConfig::default(),
1869            datasets: vec![],
1870        };
1871        let err = cfg.validate().unwrap_err();
1872        assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1873    }
1874
1875    #[cfg(feature = "auth")]
1876    #[test]
1877    fn validate_accepts_auth_issuer_with_trailing_slash() {
1878        use std::io::Write;
1879
1880        let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1881        let _ = std::fs::remove_dir_all(&dir);
1882        std::fs::create_dir_all(&dir).unwrap();
1883        let file = dir.join("a.parquet");
1884        std::fs::File::create(&file)
1885            .unwrap()
1886            .write_all(b"x")
1887            .unwrap();
1888
1889        let cfg = AppConfig {
1890            server: ServerConfig::default(),
1891            docs: DocsConfig::default(),
1892            swagger: SwaggerConfig::default(),
1893            metrics: MetricsConfig::default(),
1894            explorer: ExplorerConfig::default(),
1895            sql: SqlConfig::default(),
1896            datafusion: DataFusionConfig::default(),
1897            auth: AuthConfig {
1898                enabled: true,
1899                issuer: "https://tenant.example.com/".into(),
1900                ..Default::default()
1901            },
1902            datasets: vec![DatasetConfig {
1903                name: "x".into(),
1904                source: SourceConfig {
1905                    kind: SourceKind::Parquet,
1906                    location: file.to_string_lossy().into_owned(),
1907                },
1908                s3: None,
1909                index: IndexConfig::default(),
1910                columns: vec![],
1911                dict_encode: true,
1912                lazy: false,
1913                predicate_filter: Default::default(),
1914                projection_filter: Default::default(),
1915            }],
1916        };
1917
1918        assert!(cfg.validate().is_ok());
1919        let _ = std::fs::remove_dir_all(&dir);
1920    }
1921
1922    #[test]
1923    fn validate_rejects_quack_non_local_host_without_override() {
1924        let cfg = AppConfig {
1925            server: ServerConfig {
1926                quack: QuackConfig {
1927                    enabled: true,
1928                    uri: "quack:127.0.0.1".into(),
1929                    token: Some("test-token".into()),
1930                    ..Default::default()
1931                },
1932                ..Default::default()
1933            },
1934            docs: DocsConfig::default(),
1935            swagger: SwaggerConfig::default(),
1936            metrics: MetricsConfig::default(),
1937            explorer: ExplorerConfig::default(),
1938            sql: SqlConfig::default(),
1939            datafusion: DataFusionConfig::default(),
1940            auth: AuthConfig::default(),
1941            datasets: vec![DatasetConfig {
1942                name: "x".into(),
1943                source: SourceConfig {
1944                    kind: SourceKind::Parquet,
1945                    location: "/tmp/missing.parquet".into(),
1946                },
1947                s3: None,
1948                index: IndexConfig::default(),
1949                columns: vec![],
1950                dict_encode: true,
1951                lazy: false,
1952                predicate_filter: Default::default(),
1953                projection_filter: Default::default(),
1954            }],
1955        };
1956        let err = cfg.validate().unwrap_err();
1957        assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1958    }
1959
1960    #[test]
1961    fn validate_rejects_bad_dataset_name() {
1962        let cfg: AppConfig = toml::from_str(
1963            r#"
1964            [[dataset]]
1965            name = "bad name!"
1966            source.kind = "parquet"
1967            source.location = "/tmp/whatever"
1968        "#,
1969        )
1970        .unwrap();
1971        let err = cfg.validate().unwrap_err();
1972        assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1973    }
1974
1975    #[test]
1976    fn validate_rejects_duplicate_names() {
1977        use std::io::Write;
1978        let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1979        let _ = std::fs::remove_dir_all(&dir);
1980        std::fs::create_dir_all(&dir).unwrap();
1981        let f = dir.join("a.parquet");
1982        std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1983        let path = f.to_str().unwrap();
1984
1985        let cfg: AppConfig = toml::from_str(&format!(
1986            r#"
1987            [[dataset]]
1988            name = "a"
1989            source.kind = "parquet"
1990            source.location = "{path}"
1991            [[dataset]]
1992            name = "a"
1993            source.kind = "parquet"
1994            source.location = "{path}"
1995        "#
1996        ))
1997        .unwrap();
1998        let err = cfg.validate().expect_err("expected error");
1999        assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
2000
2001        let _ = std::fs::remove_dir_all(&dir);
2002    }
2003
2004    #[test]
2005    fn s3_bucket_parsing() {
2006        let mk = |loc: &str| SourceConfig {
2007            kind: SourceKind::Parquet,
2008            location: loc.into(),
2009        };
2010        let s1 = mk("s3://bucket/path/key");
2011        assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
2012        let s2 = mk("s3://only-bucket");
2013        assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
2014        assert!(mk("s3:///nokey").s3_bucket().is_err());
2015        assert!(mk("/local/path").s3_bucket().is_err());
2016    }
2017
2018    #[test]
2019    fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
2020        let mk = |loc: &str| SourceConfig {
2021            kind: SourceKind::Parquet,
2022            location: loc.into(),
2023        };
2024        // Plain prefix -> recursive parquet glob (trailing slash trimmed).
2025        assert_eq!(
2026            mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
2027            "s3://bucket/logs/**/*.parquet"
2028        );
2029        assert_eq!(
2030            mk("s3://bucket/logs").s3_recursive_parquet_glob(),
2031            "s3://bucket/logs/**/*.parquet"
2032        );
2033        // Already globbed -> unchanged.
2034        assert_eq!(
2035            mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
2036            "s3://bucket/logs/*.parquet"
2037        );
2038        // Non-S3 -> unchanged.
2039        assert_eq!(
2040            mk("/local/logs").s3_recursive_parquet_glob(),
2041            "/local/logs"
2042        );
2043    }
2044
2045    #[test]
2046    fn effective_endpoint_folds_bucket_per_mode() {
2047        let virt = S3Config {
2048            endpoint: Some("https://s3.example.com".into()),
2049            addressing_style: AddressingStyle::Virtual,
2050            ..Default::default()
2051        };
2052        // Auto + virtual -> bucket folded into host.
2053        assert_eq!(
2054            virt.effective_endpoint("mybucket").as_deref(),
2055            Some("https://mybucket.s3.example.com")
2056        );
2057        // Idempotent: already-prefixed host is left alone.
2058        let prefixed = S3Config {
2059            endpoint: Some("https://mybucket.s3.example.com".into()),
2060            ..virt.clone()
2061        };
2062        assert_eq!(
2063            prefixed.effective_endpoint("mybucket").as_deref(),
2064            Some("https://mybucket.s3.example.com")
2065        );
2066        // Path style (auto) -> host untouched.
2067        let path = S3Config {
2068            addressing_style: AddressingStyle::Path,
2069            ..virt.clone()
2070        };
2071        assert_eq!(
2072            path.effective_endpoint("mybucket").as_deref(),
2073            Some("https://s3.example.com")
2074        );
2075        // Explicit overrides win over addressing style.
2076        let forced_off = S3Config {
2077            endpoint_bucket_in_host: BucketInHost::False,
2078            ..virt.clone()
2079        };
2080        assert_eq!(
2081            forced_off.effective_endpoint("mybucket").as_deref(),
2082            Some("https://s3.example.com")
2083        );
2084        let forced_on = S3Config {
2085            endpoint_bucket_in_host: BucketInHost::True,
2086            ..path.clone()
2087        };
2088        assert_eq!(
2089            forced_on.effective_endpoint("mybucket").as_deref(),
2090            Some("https://mybucket.s3.example.com")
2091        );
2092        // No endpoint -> None.
2093        assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
2094    }
2095
2096    #[test]
2097    fn env_prefix_sanitises_name() {
2098        let mk = |name: &str| DatasetConfig {
2099            name: name.into(),
2100            source: SourceConfig {
2101                kind: SourceKind::Parquet,
2102                location: "x".into(),
2103            },
2104            s3: None,
2105            index: IndexConfig::default(),
2106            columns: vec![],
2107            dict_encode: true,
2108            lazy: false,
2109            predicate_filter: Default::default(),
2110            projection_filter: Default::default(),
2111        };
2112        assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
2113        assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
2114        assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
2115    }
2116
2117    #[test]
2118    fn resolve_local_parquet_single_file_and_dir() {
2119        use std::io::Write;
2120        let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
2121        let _ = std::fs::remove_dir_all(&dir);
2122        std::fs::create_dir_all(&dir).unwrap();
2123        let f = dir.join("a.parquet");
2124        let mut fh = std::fs::File::create(&f).unwrap();
2125        fh.write_all(b"not really parquet").unwrap();
2126
2127        let mk = |loc: &str| DatasetConfig {
2128            name: "ds".into(),
2129            source: SourceConfig {
2130                kind: SourceKind::Parquet,
2131                location: loc.into(),
2132            },
2133            s3: None,
2134            index: IndexConfig::default(),
2135            columns: vec![],
2136            dict_encode: true,
2137            lazy: false,
2138            predicate_filter: Default::default(),
2139            projection_filter: Default::default(),
2140        };
2141
2142        // Direct file.
2143        let files = mk(f.to_str().unwrap())
2144            .resolve_local_parquet_files()
2145            .unwrap();
2146        assert_eq!(files, vec![f.clone()]);
2147
2148        // Directory.
2149        let files = mk(dir.to_str().unwrap())
2150            .resolve_local_parquet_files()
2151            .unwrap();
2152        assert_eq!(files, vec![f.clone()]);
2153
2154        // Missing path.
2155        assert!(
2156            mk("/no/such/place.parquet")
2157                .resolve_local_parquet_files()
2158                .is_err()
2159        );
2160
2161        let _ = std::fs::remove_dir_all(&dir);
2162    }
2163
2164    #[test]
2165    fn pgwire_loopback_without_password_is_allowed() {
2166        let cfg = PgwireConfig {
2167            enabled: true,
2168            ..Default::default()
2169        };
2170        assert!(cfg.validate_enabled().is_ok());
2171    }
2172
2173    #[test]
2174    fn pgwire_non_loopback_without_password_is_rejected() {
2175        let cfg = PgwireConfig {
2176            enabled: true,
2177            listen: IpAddr::from([0, 0, 0, 0]),
2178            password: None,
2179            ..Default::default()
2180        };
2181        let err = cfg.validate_enabled().unwrap_err();
2182        assert!(matches!(err, AppError::Internal(m) if m.contains("password is required")));
2183    }
2184
2185    #[test]
2186    fn pgwire_non_loopback_with_password_but_no_tls_is_rejected() {
2187        let cfg = PgwireConfig {
2188            enabled: true,
2189            listen: IpAddr::from([0, 0, 0, 0]),
2190            password: Some("pw".into()),
2191            ..Default::default()
2192        };
2193        let err = cfg.validate_enabled().unwrap_err();
2194        assert!(matches!(err, AppError::Internal(m) if m.contains("requires TLS")));
2195    }
2196
2197    #[test]
2198    fn pgwire_tls_cert_without_key_is_rejected() {
2199        let cfg = PgwireConfig {
2200            enabled: true,
2201            tls_cert: Some(PathBuf::from("/tmp/server.crt")),
2202            tls_key: None,
2203            ..Default::default()
2204        };
2205        let err = cfg.validate_enabled().unwrap_err();
2206        assert!(matches!(err, AppError::Internal(m) if m.contains("must be set together")));
2207    }
2208
2209    #[test]
2210    fn pgwire_non_loopback_with_password_and_tls_is_allowed() {
2211        let cfg = PgwireConfig {
2212            enabled: true,
2213            listen: IpAddr::from([0, 0, 0, 0]),
2214            password: Some("pw".into()),
2215            tls_cert: Some(PathBuf::from("/tmp/server.crt")),
2216            tls_key: Some(PathBuf::from("/tmp/server.key")),
2217            ..Default::default()
2218        };
2219        assert!(cfg.validate_enabled().is_ok());
2220    }
2221}
2222