Skip to main content

datapress_core/
config.rs

1//! Runtime configuration loaded from `datasets.toml`.
2//!
3//! Each instance binds to a list of datasets. A dataset's `[dataset.source]`
4//! block selects the format (`parquet` or `delta`) and the location (a
5//! local path or an `s3://bucket/key` URL). When the location is on S3,
6//! an optional `[dataset.s3]` block carries non-secret connection details
7//! (region, endpoint, addressing style, …).
8//!
9//! Credentials are resolved at runtime via [`DatasetConfig::resolved_creds`]
10//! in this precedence order:
11//!
12//! 1. Per-dataset env vars `${PREFIX}_AWS_ACCESS_KEY_ID`,
13//!    `${PREFIX}_AWS_SECRET_ACCESS_KEY`, `${PREFIX}_AWS_SESSION_TOKEN`
14//!    where `${PREFIX}` is the dataset name uppercased with non-alphanumeric
15//!    characters replaced by `_` (e.g. `accidents` → `ACCIDENTS`,
16//!    `sales.eu-1` → `SALES_EU_1`).
17//! 2. Inline `access_key_id` / `secret_access_key` / `session_token` in the
18//!    `[dataset.s3]` block.
19//! 3. Plain `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` /
20//!    `AWS_SESSION_TOKEN`.
21//! 4. None — fall back to the engine's own provider chain
22//!    (`~/.aws/credentials`, IMDS, …).
23
24use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32/// Mount paths the user MUST NOT pick for `[docs].path` or
33/// `[swagger].path` — they would shadow first-party routes (probes,
34/// API scopes, root).
35const RESERVED_MOUNTS: &[&str] = &[
36    "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39// ---------------------------------------------------------------------------
40// Public types
41// ---------------------------------------------------------------------------
42
43#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45    #[serde(default)]
46    pub server: ServerConfig,
47    #[serde(default)]
48    pub docs: DocsConfig,
49    #[serde(default)]
50    pub swagger: SwaggerConfig,
51    #[serde(default)]
52    pub metrics: MetricsConfig,
53    #[serde(default)]
54    pub explorer: ExplorerConfig,
55    #[serde(default)]
56    pub sql: SqlConfig,
57    #[serde(default)]
58    pub auth: AuthConfig,
59    #[serde(rename = "dataset", default)]
60    pub datasets: Vec<DatasetConfig>,
61}
62
63#[derive(Debug, Deserialize)]
64#[serde(default)]
65pub struct ServerConfig {
66    /// Which engine to run. Must match the binary's compile-time feature.
67    pub backend: Backend,
68    /// Listen address. Defaults to loopback (127.0.0.1) — explicitly opt in
69    /// to 0.0.0.0 if you want to expose the port.
70    pub listen: IpAddr,
71    /// TCP port.
72    pub port: u16,
73    /// Number of actix worker threads. `None` (= unset) → one per CPU.
74    pub workers: Option<usize>,
75    /// Optional URL path prefix — useful when sitting behind a reverse
76    /// proxy that rewrites e.g. `/datapress/...` → `/...`. When set, every
77    /// route is mounted under this prefix (so the proxy can pass the URL
78    /// through unchanged). Must start with `/` and not end with `/`; the
79    /// empty string (default) means no prefix.
80    pub prefix: String,
81    /// Negotiate response compression (gzip / brotli / zstd) via the
82    /// `Accept-Encoding` request header. Enabled by default. Disable when
83    /// running behind a proxy that already compresses, or when the extra
84    /// CPU is not worth the bandwidth saving.
85    pub compress: bool,
86    /// Maximum accepted JSON request body size, in bytes. Larger bodies
87    /// are rejected with `413 Payload Too Large` before any handler runs.
88    /// Default `1 MiB`. Most query bodies are well under 10 KiB; this is
89    /// a DoS guard, not a tuning knob.
90    pub max_body_bytes: usize,
91    /// Maximum rows returned by a single `/query` page. Larger
92    /// `page_size` values are clamped before the backend runs.
93    /// Default `100_000`.
94    pub max_page_size: u64,
95    /// Per-request handler timeout, in milliseconds. If a handler hasn't
96    /// produced a response within this budget the request is aborted with
97    /// `504 Gateway Timeout`. Default `30_000` (30 s). Set `0` to disable.
98    pub request_timeout_ms: u64,
99    /// Grace period for in-flight requests after the server has received
100    /// `SIGTERM` / `SIGINT`, in seconds. The listening socket is closed
101    /// immediately; existing connections then have up to this many
102    /// seconds to finish before workers are force-stopped. Default `30`.
103    pub shutdown_timeout_secs: u64,
104    /// Optional DuckDB Quack remote SQL server. Only used by the DuckDB
105    /// backend; ignored by DataFusion.
106    pub quack: QuackConfig,
107}
108
109impl Default for ServerConfig {
110    fn default() -> Self {
111        Self {
112            backend: Backend::default(),
113            listen: IpAddr::from([127, 0, 0, 1]),
114            port: 8080,
115            workers: None,
116            prefix: String::new(),
117            compress: true,
118            max_body_bytes: 1024 * 1024,
119            max_page_size: 100_000,
120            request_timeout_ms: 30_000,
121            shutdown_timeout_secs: 30,
122            quack: QuackConfig::default(),
123        }
124    }
125}
126
127/// Experimental DuckDB Quack remote protocol server.
128///
129/// Quack exposes the DuckDB SQL surface of the in-process database. Keep it
130/// disabled unless you intentionally want DuckDB clients to attach/query this
131/// process directly.
132#[derive(Debug, Clone, Deserialize)]
133#[serde(default)]
134pub struct QuackConfig {
135    /// Install/load the Quack extension and start `quack_serve` after
136    /// datasets are registered.
137    pub enabled: bool,
138    /// Quack URI to listen on. `quack:localhost` uses DuckDB's default
139    /// port 9494.
140    pub uri: String,
141    /// Optional explicit authentication token. If omitted, Quack generates
142    /// one at startup and DataPress logs it once.
143    pub token: Option<String>,
144    /// Allow binding a non-local hostname such as `quack:0.0.0.0:9494`.
145    /// For external exposure, put a TLS-terminating reverse proxy in front.
146    pub allow_other_hostname: bool,
147    /// Install a read-only authorization macro for remote queries. Enabled
148    /// by default to match DataPress' read-oriented HTTP API.
149    pub read_only: bool,
150}
151
152impl Default for QuackConfig {
153    fn default() -> Self {
154        Self {
155            enabled: false,
156            uri: "quack:localhost".into(),
157            token: None,
158            allow_other_hostname: false,
159            read_only: true,
160        }
161    }
162}
163
164impl QuackConfig {
165    /// Validate the enabled Quack configuration against DuckDB's current
166    /// safety rules. The extension treats only the literal `localhost` as
167    /// local unless `allow_other_hostname` is set.
168    pub fn validate_enabled(&self) -> Result<(), AppError> {
169        if self.uri.trim().is_empty() {
170            return Err(AppError::Internal(
171                "server.quack.uri must not be empty when server.quack.enabled = true".into(),
172            ));
173        }
174        if !self.uri.starts_with("quack:") {
175            return Err(AppError::Internal(format!(
176                "server.quack.uri must start with 'quack:' (got '{}')",
177                self.uri
178            )));
179        }
180        if !self.allow_other_hostname {
181            let host = self.hostname().unwrap_or_default();
182            if host != "localhost" {
183                return Err(AppError::Internal(format!(
184                    "server.quack.uri host must be 'localhost' unless \
185                     server.quack.allow_other_hostname = true (got '{}')",
186                    self.uri
187                )));
188            }
189        }
190        if let Some(token) = self.token.as_deref()
191            && token.len() < 4
192        {
193            return Err(AppError::Internal(
194                "server.quack.token must be at least 4 characters".into(),
195            ));
196        }
197        Ok(())
198    }
199
200    fn hostname(&self) -> Option<&str> {
201        let rest = self.uri.strip_prefix("quack:")?;
202        let rest = rest.strip_prefix("//").unwrap_or(rest);
203        let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
204        (!host.is_empty()).then_some(host)
205    }
206}
207
208#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
209#[serde(rename_all = "lowercase")]
210pub enum Backend {
211    #[default]
212    Datafusion,
213    Duckdb,
214}
215
216/// Embedded MkDocs documentation site (`[docs]` block).
217///
218/// Enabled by default — when the binary was built with the `docs`
219/// cargo feature, the site is served at [`DocsConfig::path`] out of
220/// the box. Set `enabled = false` in `datasets.toml` to suppress it
221/// (e.g. in prod). When the binary was built without the feature,
222/// `enabled = true` is harmless: the server logs a warning at startup
223/// and skips the mount. The mount path must be a non-trivial sub-path;
224/// reserved API and probe roots are rejected at startup.
225#[derive(Debug, Clone, Deserialize)]
226#[serde(default, deny_unknown_fields)]
227pub struct DocsConfig {
228    pub enabled: bool,
229    pub path: String,
230}
231
232impl Default for DocsConfig {
233    fn default() -> Self {
234        Self {
235            enabled: true,
236            path: "/mkdocs".into(),
237        }
238    }
239}
240
241/// Swagger UI + embedded OpenAPI spec (`[swagger]` block).
242///
243/// Enabled by default — when the binary was built with the `swagger`
244/// cargo feature, an interactive Swagger UI is served at
245/// [`SwaggerConfig::path`] (default `/docs`) and the raw OpenAPI JSON
246/// at `<path>/openapi.json`. Set `enabled = false` in `datasets.toml`
247/// to suppress it (e.g. in prod). When the binary was built without
248/// the feature, `enabled = true` is harmless: the server logs a
249/// warning at startup and skips the mount.
250///
251/// To let users sign in to the UI itself (Authorization Code + PKCE
252/// against any OIDC provider), populate the optional `[swagger.oauth2]`
253/// sub-block. Acquired tokens are attached as `Authorization: Bearer …`
254/// to every "Try it out" request — useful for exercising auth-protected
255/// endpoints from the docs page. This drives the UI only; it does not
256/// turn on server-side token validation.
257#[derive(Debug, Clone, Deserialize)]
258#[serde(default, deny_unknown_fields)]
259pub struct SwaggerConfig {
260    pub enabled: bool,
261    pub path: String,
262    pub oauth2: Option<SwaggerOAuth2Config>,
263}
264
265impl Default for SwaggerConfig {
266    fn default() -> Self {
267        Self {
268            enabled: true,
269            path: "/docs".into(),
270            oauth2: None,
271        }
272    }
273}
274
275/// OIDC single-sign-on for the Swagger UI (`[swagger.oauth2]`).
276///
277/// Configures the UI to drive an Authorization Code + PKCE flow against
278/// the given OIDC issuer. Swagger UI auto-discovers the authorize /
279/// token endpoints from `<issuer>/.well-known/openid-configuration`,
280/// so we don't need to pin them here.
281///
282/// All fields are required when the block is present — there is no
283/// sensible default for `issuer` or `client_id`.
284#[derive(Debug, Clone, Deserialize)]
285#[serde(deny_unknown_fields)]
286pub struct SwaggerOAuth2Config {
287    /// OIDC issuer URL, e.g.
288    /// `https://login.microsoftonline.com/<tenant>/v2.0` or
289    /// `https://accounts.google.com`. Must not end in `/`.
290    pub issuer: String,
291    /// Public OAuth2 client identifier registered with the IdP. The
292    /// client must be a SPA / public client (no secret) with
293    /// `https://<your-host>{swagger.path}/oauth2-redirect.html` listed
294    /// as an allowed redirect URI.
295    pub client_id: String,
296    /// Scopes to request by default. Will be pre-checked in the Swagger
297    /// UI authorize dialog; users can edit them before signing in.
298    /// `openid` is always added if missing.
299    #[serde(default)]
300    pub scopes: Vec<String>,
301    /// Use PKCE for the authorization code flow. Defaults to `true`;
302    /// disable only if your IdP doesn't support PKCE for public clients.
303    #[serde(default = "default_true")]
304    pub pkce: bool,
305}
306
307/// Prometheus metrics endpoint (`[metrics]` block).
308///
309/// Disabled by default. When `enabled = true` (and the binary was built
310/// with the `metrics` cargo feature), the server installs a middleware
311/// that records per-request HTTP counters and latency histograms, and
312/// exposes them in the Prometheus text exposition format at
313/// [`MetricsConfig::path`] (default `/metrics`).
314///
315/// The endpoint is mounted at a fixed, *unprefixed* path — like the
316/// health probes — so a scrape config doesn't need to know about any
317/// reverse-proxy `server.prefix`. It is **not** behind the `[auth]`
318/// layer: Prometheus scrapers rarely carry bearer tokens, and the
319/// endpoint exposes only aggregate request metrics (no row data). Keep
320/// it on a network the scraper can reach but the public cannot, e.g. by
321/// binding `server.listen` to a private interface.
322///
323/// When the binary was built without the `metrics` feature,
324/// `enabled = true` is harmless: the server logs a warning at startup
325/// and skips the endpoint.
326#[derive(Debug, Clone, Deserialize)]
327#[serde(default, deny_unknown_fields)]
328pub struct MetricsConfig {
329    pub enabled: bool,
330    pub path: String,
331}
332
333impl Default for MetricsConfig {
334    fn default() -> Self {
335        Self {
336            enabled: false,
337            path: "/metrics".into(),
338        }
339    }
340}
341
342/// Embedded dataset explorer UI (`[explorer]` block).
343///
344/// A server-rendered web app (Actix + Askama templates + htmx +
345/// Bootstrap) served at [`ExplorerConfig::path`] (default `/explore`).
346/// It offers a *discovery* view — per-dataset stats, schema, index and
347/// source configuration — and an in-browser *DuckDB* console (DuckDB-WASM)
348/// that queries each dataset's Parquet export directly.
349///
350/// Enabled by default. When the binary was built without the `explorer`
351/// cargo feature, `enabled = true` is harmless: the server logs a warning
352/// at startup and skips the mount. Set `enabled = false` to suppress it at
353/// runtime even when the feature is compiled in.
354#[derive(Debug, Clone, Deserialize)]
355#[serde(default, deny_unknown_fields)]
356pub struct ExplorerConfig {
357    pub enabled: bool,
358    pub path: String,
359}
360
361impl Default for ExplorerConfig {
362    fn default() -> Self {
363        Self {
364            enabled: true,
365            path: "/explore".into(),
366        }
367    }
368}
369
370/// Raw-SQL query endpoint (`[sql]` block).
371///
372/// Exposes `POST /api/v1/sql`, which accepts an arbitrary read-only
373/// `SELECT` in the request body and runs it against the engine. **Off by
374/// default** — raw SQL is a larger attack surface than the structured
375/// `/query` endpoint, so it must be opted into explicitly.
376///
377/// Phase 1 is scoped to a *single* dataset per query: the statement may
378/// reference at most one registered dataset (and no others / no files),
379/// enforced by a parse-time table allowlist. Cross-dataset joins are a
380/// future extension.
381///
382/// Safety rails applied to every accepted statement:
383/// - exactly one statement, and it must be a read-only `SELECT` / `WITH`,
384/// - every referenced table must be a registered dataset (no file
385///   functions, no `ATTACH`/`COPY`/`PRAGMA`/DDL/DML),
386/// - the result is hard-capped at [`SqlConfig::max_rows`] rows.
387#[derive(Debug, Clone, Deserialize)]
388#[serde(default, deny_unknown_fields)]
389pub struct SqlConfig {
390    /// Enable the `POST /api/v1/sql` endpoint. Default `false`.
391    pub enabled: bool,
392    /// Hard cap on the number of rows a single SQL query may return.
393    /// The query result is wrapped in an outer `LIMIT` so this bound is
394    /// enforced regardless of the user's own `LIMIT`. Default `100_000`.
395    pub max_rows: u64,
396}
397
398impl Default for SqlConfig {
399    fn default() -> Self {
400        Self {
401            enabled: false,
402            max_rows: 100_000,
403        }
404    }
405}
406
407/// OIDC bearer-token enforcement for the HTTP API (`[auth]` block).
408///
409/// Disabled by default. When `enabled = true`, the server validates
410/// every request's `Authorization: Bearer …` JWT against the JWKS
411/// discovered from the issuer's OIDC metadata
412/// (`<issuer>/.well-known/openid-configuration` → `jwks_uri`), then
413/// enforces the configured scope requirements per route.
414///
415/// Only compiled in when the binary was built with the `auth` cargo
416/// feature. Without the feature, `enabled = true` is rejected at
417/// startup so a misconfigured production deployment can't silently
418/// fall back to "no auth".
419///
420/// The Swagger UI's SSO support (`[swagger.oauth2]`) is *independent*
421/// of this block — `[swagger.oauth2]` only drives the UI's login
422/// dialog; `[auth]` is what enforces tokens on the API.
423#[derive(Debug, Clone, Deserialize)]
424#[serde(default, deny_unknown_fields)]
425pub struct AuthConfig {
426    /// Master switch. `false` (default) skips all auth processing.
427    pub enabled: bool,
428    /// OIDC issuer URL — must match the `iss` claim of every accepted
429    /// token. Required when `enabled = true`.
430    pub issuer: String,
431    /// Expected `aud` claim. When empty, audience validation is
432    /// skipped (not recommended in production).
433    pub audience: String,
434    /// Scopes a caller must hold to read datasets (GET endpoints +
435    /// POST `…/query` and `…/count`). Empty list means "no scope check,
436    /// just a valid token is enough".
437    pub read_scopes: Vec<String>,
438    /// Scopes required for admin/mutation endpoints (POST `…/reload`).
439    /// Empty list means "no scope check, just a valid token is enough".
440    pub reload_scopes: Vec<String>,
441    /// Allow unauthenticated GETs through. Useful for public datasets
442    /// and demo deployments. Defaults to `false`.
443    pub anonymous_read: bool,
444    /// Continue serving even if the JWKS fetch fails at startup.
445    /// When `true` (default), the server starts in a degraded mode that
446    /// rejects every auth'd request with 503 until JWKS becomes
447    /// reachable. When `false`, startup fails outright.
448    pub start_degraded: bool,
449    /// Allowed signing algorithms. Pinned to RS256 by default; never
450    /// include `HS*` or `none` here unless you really know what you're
451    /// doing.
452    pub algorithms: Vec<String>,
453    /// Clock-skew leeway for `exp`/`nbf` checks, in seconds.
454    pub leeway_secs: u64,
455    /// How often (in seconds) the background refresher re-fetches the
456    /// JWKS. On a `kid` cache miss the JWKS is also refreshed
457    /// out-of-band.
458    pub jwks_refresh_secs: u64,
459    /// Optional JSON-pointer into the JWT claims that extracts a
460    /// tenant identifier — attached to the principal and logged on
461    /// every request. Example: `"/tid"` (Azure AD), `"/org_id"`.
462    /// When empty, no tenant is extracted.
463    pub tenant_claim: String,
464    /// If non-empty, requests whose extracted tenant ID is not in this
465    /// list are rejected with 403. Has no effect when `tenant_claim`
466    /// is empty.
467    pub allowed_tenants: Vec<String>,
468    /// If `true`, `POST …/reload` accepts *either* a valid token with
469    /// `reload_scopes` *or* the legacy `X-Admin-Token` header. Defaults
470    /// to `true` for one-release backwards compatibility — flip to
471    /// `false` once your automation has migrated to OIDC.
472    pub admin_token_fallback: bool,
473}
474
475impl Default for AuthConfig {
476    fn default() -> Self {
477        Self {
478            enabled: false,
479            issuer: String::new(),
480            audience: String::new(),
481            read_scopes: Vec::new(),
482            reload_scopes: Vec::new(),
483            anonymous_read: false,
484            start_degraded: true,
485            algorithms: vec!["RS256".into()],
486            leeway_secs: 60,
487            jwks_refresh_secs: 3600,
488            tenant_claim: String::new(),
489            allowed_tenants: Vec::new(),
490            admin_token_fallback: true,
491        }
492    }
493}
494
495impl Backend {
496    pub fn as_str(self) -> &'static str {
497        match self {
498            Backend::Datafusion => "datafusion",
499            Backend::Duckdb => "duckdb",
500        }
501    }
502}
503
504#[derive(Debug, Clone, Deserialize)]
505pub struct DatasetConfig {
506    pub name: String,
507    pub source: SourceConfig,
508    #[serde(default)]
509    pub s3: Option<S3Config>,
510    #[serde(default)]
511    pub index: IndexConfig,
512    /// Optional column projection applied at load time. When non-empty,
513    /// only the listed columns are read from the parquet/delta source —
514    /// every other column is skipped entirely (no decode, no allocation,
515    /// no resident memory). Empty (default) = read all columns. Names are
516    /// matched case-insensitively against the source schema.
517    #[serde(default)]
518    pub columns: Vec<String>,
519    /// When `true` (default), Utf8 columns that are dictionary-encoded in
520    /// the source parquet are read as Arrow `Dictionary(Int32, Utf8)`
521    /// instead of being expanded to plain Utf8. Massively cheaper in RAM
522    /// for low-cardinality columns. Set to `false` to bypass the override
523    /// — useful as a workaround if you observe null-handling oddities on
524    /// a particular parquet file.
525    #[serde(default = "default_true")]
526    pub dict_encode: bool,
527    /// When `true`, the backend should keep the dataset on disk and stream
528    /// it at query time instead of materialising it into RAM at startup.
529    /// Trades the in-memory hot paths (raw Arrow slice, equality index)
530    /// for bounded memory use on large / multi-file sources. Honoured by
531    /// the DataFusion backend (local + S3 parquet) and by the DuckDB
532    /// backend, which registers the dataset as a view over the source scan
533    /// (local + S3 parquet, and delta) rather than materialising a table.
534    #[serde(default)]
535    pub lazy: bool,
536}
537
538fn default_true() -> bool {
539    true
540}
541
542#[derive(Debug, Clone, Deserialize)]
543pub struct SourceConfig {
544    pub kind: SourceKind,
545    /// Either a local filesystem path or an `s3://bucket/key` URL.
546    pub location: String,
547}
548
549#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
550#[serde(rename_all = "lowercase")]
551pub enum SourceKind {
552    #[default]
553    Parquet,
554    Delta,
555}
556
557impl SourceKind {
558    pub fn as_str(self) -> &'static str {
559        match self {
560            SourceKind::Parquet => "parquet",
561            SourceKind::Delta => "delta",
562        }
563    }
564}
565
566/// Non-secret S3 connection settings. Credentials are pulled from env / the
567/// AWS credential chain — see [`DatasetConfig::resolved_creds`].
568#[derive(Debug, Clone, Deserialize)]
569#[serde(default)]
570pub struct S3Config {
571    pub region: Option<String>,
572    /// Custom endpoint (MinIO, R2, Wasabi, LocalStack, …). Omit for AWS.
573    pub endpoint: Option<String>,
574    /// `virtual` (default — `bucket.host`) or `path` (`host/bucket/`).
575    /// MinIO and most non-AWS providers require `path`.
576    pub addressing_style: AddressingStyle,
577    /// Allow plain-HTTP endpoints. Required for local MinIO over `http://…`.
578    pub allow_http: bool,
579    /// Inline credentials. Strongly discouraged in production — prefer env
580    /// vars (see module docs).
581    pub access_key_id: Option<String>,
582    pub secret_access_key: Option<String>,
583    pub session_token: Option<String>,
584    /// Hive partition-column handling for parquet sources. Defaults to
585    /// `auto` (detect from the path). See [`Partitioning`].
586    pub partitioning: Partitioning,
587    /// Whether the bucket is folded into a custom `endpoint` host for
588    /// virtual-hosted-style requests. Defaults to `auto`. See
589    /// [`BucketInHost`].
590    pub endpoint_bucket_in_host: BucketInHost,
591}
592
593impl Default for S3Config {
594    fn default() -> Self {
595        Self {
596            region: None,
597            endpoint: None,
598            addressing_style: AddressingStyle::Virtual,
599            allow_http: false,
600            access_key_id: None,
601            secret_access_key: None,
602            session_token: None,
603            partitioning: Partitioning::Auto,
604            endpoint_bucket_in_host: BucketInHost::Auto,
605        }
606    }
607}
608
609impl S3Config {
610    /// Resolve the endpoint URL to hand to the object store, optionally
611    /// folding `bucket` into the host for virtual-hosted-style requests so a
612    /// plain `endpoint` works the same way it does on DuckDB.
613    ///
614    /// Returns `None` when no custom endpoint is configured (AWS default).
615    /// The bucket is only prepended when it isn't already the leading host
616    /// label, so re-running this (or a config that already embeds the
617    /// bucket) never produces `bucket.bucket.host`.
618    pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
619        let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
620
621        let fold = match self.endpoint_bucket_in_host {
622            BucketInHost::False => false,
623            BucketInHost::True => true,
624            BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
625        };
626        if !fold {
627            return Some(ep.to_string());
628        }
629
630        let (scheme, host_and_path) = match ep.split_once("://") {
631            Some((s, rest)) => (Some(s), rest),
632            None => (None, ep),
633        };
634        // Split host from any trailing path so we prefix the host label only.
635        let (host, path) = match host_and_path.split_once('/') {
636            Some((h, p)) => (h, Some(p)),
637            None => (host_and_path, None),
638        };
639        // Guard against double-prefixing.
640        if host == bucket || host.starts_with(&format!("{bucket}.")) {
641            return Some(ep.to_string());
642        }
643        let new_host = format!("{bucket}.{host}");
644        let rebuilt = match (scheme, path) {
645            (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
646            (Some(s), None) => format!("{s}://{new_host}"),
647            (None, Some(p)) => format!("{new_host}/{p}"),
648            (None, None) => new_host,
649        };
650        Some(rebuilt)
651    }
652}
653
654#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
655#[serde(rename_all = "lowercase")]
656pub enum AddressingStyle {
657    #[default]
658    Virtual,
659    Path,
660}
661
662impl AddressingStyle {
663    pub fn as_str(self) -> &'static str {
664        match self {
665            AddressingStyle::Virtual => "virtual",
666            AddressingStyle::Path => "path",
667        }
668    }
669}
670
671/// How hive-style partition columns (`key=value/` path segments) are handled
672/// for an S3 parquet source. Local parquet always auto-detects; this option
673/// brings S3 in line and lets you force or disable the behaviour.
674#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
675#[serde(rename_all = "lowercase")]
676pub enum Partitioning {
677    /// Detect `key=value` segments from the location glob or by listing the
678    /// prefix. No partition columns are added when none are found.
679    #[default]
680    Auto,
681    /// Force hive partitioning. Partition keys are taken from the location
682    /// glob, or discovered by listing the prefix.
683    Hive,
684    /// Treat the source as a flat set of parquet files — never add partition
685    /// columns even if the path looks hive-partitioned.
686    None,
687}
688
689impl Partitioning {
690    pub fn as_str(self) -> &'static str {
691        match self {
692            Partitioning::Auto => "auto",
693            Partitioning::Hive => "hive",
694            Partitioning::None => "none",
695        }
696    }
697}
698
699/// Whether the bucket name is folded into the endpoint hostname for
700/// virtual-hosted-style requests against a custom endpoint. This aligns the
701/// DataFusion object-store path with DuckDB, which builds the virtual host
702/// itself — so the same plain `endpoint` works on both backends.
703#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
704#[serde(rename_all = "lowercase")]
705pub enum BucketInHost {
706    /// Fold the bucket into the host when `addressing_style = "virtual"` and
707    /// a custom `endpoint` is set (guarded against double-prefixing).
708    #[default]
709    Auto,
710    /// Always fold the bucket into the endpoint host.
711    True,
712    /// Never rewrite the endpoint — pass it through verbatim.
713    False,
714}
715
716impl BucketInHost {
717    pub fn as_str(self) -> &'static str {
718        match self {
719            BucketInHost::Auto => "auto",
720            BucketInHost::True => "true",
721            BucketInHost::False => "false",
722        }
723    }
724}
725
726#[derive(Debug, Clone, Deserialize)]
727#[serde(default)]
728pub struct IndexConfig {
729    pub mode: IndexMode,
730    pub columns: Vec<String>,
731    pub max_cardinality: usize,
732}
733
734impl Default for IndexConfig {
735    fn default() -> Self {
736        Self {
737            mode: IndexMode::Auto,
738            columns: Vec::new(),
739            max_cardinality: 100_000,
740        }
741    }
742}
743
744#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
745#[serde(rename_all = "lowercase")]
746pub enum IndexMode {
747    #[default]
748    Auto,
749    None,
750    List,
751}
752
753/// Resolved S3 credentials. `None` fields mean "let the engine's default
754/// provider chain figure it out".
755#[derive(Debug, Clone, Default)]
756pub struct ResolvedCreds {
757    pub access_key_id: Option<String>,
758    pub secret_access_key: Option<String>,
759    pub session_token: Option<String>,
760}
761
762impl ResolvedCreds {
763    pub fn has_keypair(&self) -> bool {
764        self.access_key_id.is_some() && self.secret_access_key.is_some()
765    }
766}
767
768// ---------------------------------------------------------------------------
769// Loading + validation
770// ---------------------------------------------------------------------------
771
772impl AppConfig {
773    /// Read and validate a TOML config file.
774    pub fn load(path: &str) -> Result<Self, AppError> {
775        let raw = std::fs::read_to_string(path)
776            .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
777        let mut cfg: AppConfig =
778            toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
779        cfg.normalize();
780        cfg.validate()?;
781        Ok(cfg)
782    }
783
784    /// Canonicalise fields that are compared case-insensitively at runtime.
785    ///
786    /// Token scopes are lowercased when parsed out of a JWT (see `auth.rs`),
787    /// so the configured `read_scopes` / `reload_scopes` are lowercased here
788    /// once at load time. Without this an operator who writes
789    /// `"Datasets:Read"` would silently 403 every caller, since the token
790    /// side would have become `datasets:read`.
791    fn normalize(&mut self) {
792        for s in self
793            .auth
794            .read_scopes
795            .iter_mut()
796            .chain(self.auth.reload_scopes.iter_mut())
797        {
798            *s = s.to_ascii_lowercase();
799        }
800    }
801
802    fn validate(&self) -> Result<(), AppError> {
803        // Server prefix: empty, or must start with '/' and not end with '/'.
804        let p = &self.server.prefix;
805        if !p.is_empty() {
806            if !p.starts_with('/') {
807                return Err(AppError::Internal(format!(
808                    "server.prefix must start with '/' (got '{p}')"
809                )));
810            }
811            if p.ends_with('/') {
812                return Err(AppError::Internal(format!(
813                    "server.prefix must not end with '/' (got '{p}')"
814                )));
815            }
816        }
817
818        if self.datasets.is_empty() {
819            return Err(AppError::Internal(
820                "datasets.toml has no [[dataset]] entries".into(),
821            ));
822        }
823
824        if self.server.quack.enabled {
825            self.server.quack.validate_enabled()?;
826        }
827
828        // Validate the docs mount path even when the section is disabled,
829        // so an inactive config typo can't go unnoticed.
830        {
831            let dp = &self.docs.path;
832            if !dp.starts_with('/') {
833                return Err(AppError::Internal(format!(
834                    "docs.path must start with '/' (got '{dp}')"
835                )));
836            }
837            if dp.len() > 1 && dp.ends_with('/') {
838                return Err(AppError::Internal(format!(
839                    "docs.path must not end with '/' (got '{dp}')"
840                )));
841            }
842            if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
843                return Err(AppError::Internal(format!(
844                    "docs.path '{dp}' collides with a reserved route"
845                )));
846            }
847        }
848
849        // Same for the swagger UI mount.
850        {
851            let sp = &self.swagger.path;
852            if !sp.starts_with('/') {
853                return Err(AppError::Internal(format!(
854                    "swagger.path must start with '/' (got '{sp}')"
855                )));
856            }
857            if sp.len() > 1 && sp.ends_with('/') {
858                return Err(AppError::Internal(format!(
859                    "swagger.path must not end with '/' (got '{sp}')"
860                )));
861            }
862            if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
863                return Err(AppError::Internal(format!(
864                    "swagger.path '{sp}' collides with a reserved route"
865                )));
866            }
867            if sp == &self.docs.path {
868                return Err(AppError::Internal(format!(
869                    "swagger.path and docs.path must differ (both '{sp}')"
870                )));
871            }
872            if let Some(o) = &self.swagger.oauth2 {
873                if o.issuer.trim().is_empty() {
874                    return Err(AppError::Internal(
875                        "swagger.oauth2.issuer must not be empty".into(),
876                    ));
877                }
878                if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
879                    return Err(AppError::Internal(format!(
880                        "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
881                        o.issuer
882                    )));
883                }
884                if o.client_id.trim().is_empty() {
885                    return Err(AppError::Internal(
886                        "swagger.oauth2.client_id must not be empty".into(),
887                    ));
888                }
889            }
890        }
891
892        // Metrics endpoint mount path. Validated even when disabled so an
893        // inactive config typo can't go unnoticed. `/metrics` is itself a
894        // reserved mount (so docs/swagger can't shadow it), so we check the
895        // remaining reserved routes — and the docs/swagger paths — for
896        // collisions rather than the whole list.
897        {
898            let mp = &self.metrics.path;
899            if !mp.starts_with('/') {
900                return Err(AppError::Internal(format!(
901                    "metrics.path must start with '/' (got '{mp}')"
902                )));
903            }
904            if mp.len() > 1 && mp.ends_with('/') {
905                return Err(AppError::Internal(format!(
906                    "metrics.path must not end with '/' (got '{mp}')"
907                )));
908            }
909            if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
910                return Err(AppError::Internal(format!(
911                    "metrics.path '{mp}' collides with a reserved route"
912                )));
913            }
914            if mp == &self.docs.path {
915                return Err(AppError::Internal(format!(
916                    "metrics.path and docs.path must differ (both '{mp}')"
917                )));
918            }
919            if mp == &self.swagger.path {
920                return Err(AppError::Internal(format!(
921                    "metrics.path and swagger.path must differ (both '{mp}')"
922                )));
923            }
924        }
925
926        // Explorer UI mount path. Validated even when disabled so an
927        // inactive config typo can't go unnoticed.
928        {
929            let ep = &self.explorer.path;
930            if !ep.starts_with('/') {
931                return Err(AppError::Internal(format!(
932                    "explorer.path must start with '/' (got '{ep}')"
933                )));
934            }
935            if ep.len() > 1 && ep.ends_with('/') {
936                return Err(AppError::Internal(format!(
937                    "explorer.path must not end with '/' (got '{ep}')"
938                )));
939            }
940            if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
941                return Err(AppError::Internal(format!(
942                    "explorer.path '{ep}' collides with a reserved route"
943                )));
944            }
945            if ep == &self.docs.path {
946                return Err(AppError::Internal(format!(
947                    "explorer.path and docs.path must differ (both '{ep}')"
948                )));
949            }
950            if ep == &self.swagger.path {
951                return Err(AppError::Internal(format!(
952                    "explorer.path and swagger.path must differ (both '{ep}')"
953                )));
954            }
955            if ep == &self.metrics.path {
956                return Err(AppError::Internal(format!(
957                    "explorer.path and metrics.path must differ (both '{ep}')"
958                )));
959            }
960        }
961
962        // Auth block — only meaningful when `enabled = true`. The cargo
963        // feature gate is enforced separately in `server::serve` so a
964        // binary built without `--features auth` and a config with
965        // `auth.enabled = true` aborts with a clear error.
966        if self.auth.enabled {
967            let a = &self.auth;
968            if a.issuer.trim().is_empty() {
969                return Err(AppError::Internal(
970                    "auth.issuer must not be empty when auth.enabled = true".into(),
971                ));
972            }
973            if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
974                return Err(AppError::Internal(format!(
975                    "auth.issuer must be an absolute http(s) URL (got '{}')",
976                    a.issuer
977                )));
978            }
979            for alg in &a.algorithms {
980                match alg.as_str() {
981                    "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
982                    | "PS512" => {}
983                    other => {
984                        return Err(AppError::Internal(format!(
985                            "auth.algorithms[{other}] is not allowed; pick one of \
986                         RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
987                        )));
988                    }
989                }
990            }
991            if a.algorithms.is_empty() {
992                return Err(AppError::Internal(
993                    "auth.algorithms must not be empty".into(),
994                ));
995            }
996            if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
997                return Err(AppError::Internal(format!(
998                    "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
999                    a.tenant_claim
1000                )));
1001            }
1002            if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
1003                return Err(AppError::Internal(
1004                    "auth.allowed_tenants is set but auth.tenant_claim is empty — \
1005                     can't enforce a tenant allow-list without a claim to extract from"
1006                        .into(),
1007                ));
1008            }
1009        }
1010
1011        let mut seen = HashSet::new();
1012        for d in &self.datasets {
1013            if !seen.insert(d.name.as_str()) {
1014                return Err(AppError::Internal(format!(
1015                    "duplicate dataset name: {}",
1016                    d.name
1017                )));
1018            }
1019            if d.name.is_empty() {
1020                return Err(AppError::Internal("dataset name must not be empty".into()));
1021            }
1022            // URL-safe: alphanum + _ - .
1023            if !d
1024                .name
1025                .chars()
1026                .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1027            {
1028                return Err(AppError::Internal(format!(
1029                    "dataset name '{}' must be alphanumeric (plus _ - .)",
1030                    d.name
1031                )));
1032            }
1033
1034            if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
1035                return Err(AppError::Internal(format!(
1036                    "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1037                    d.name
1038                )));
1039            }
1040
1041            // Location-specific checks.
1042            if d.source.is_s3() {
1043                d.source.s3_bucket()?;
1044                if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1045                    && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1046                    && std::env::var("AWS_REGION").is_err()
1047                    && std::env::var("AWS_DEFAULT_REGION").is_err()
1048                {
1049                    log::warn!(
1050                        "dataset '{}': S3 source without explicit region — \
1051                         relying on AWS_REGION env var",
1052                        d.name
1053                    );
1054                }
1055            } else {
1056                // Local path. For parquet we can fully resolve to a file
1057                // list up front; for delta we only check that the directory
1058                // exists (delta has its own layout — _delta_log/, …).
1059                match d.source.kind {
1060                    SourceKind::Parquet => {
1061                        d.resolve_local_parquet_files()?;
1062                    }
1063                    SourceKind::Delta => {
1064                        let p = Path::new(&d.source.location);
1065                        if !p.exists() {
1066                            return Err(AppError::Internal(format!(
1067                                "dataset '{}': delta location does not exist: {}",
1068                                d.name, d.source.location
1069                            )));
1070                        }
1071                    }
1072                }
1073            }
1074        }
1075        Ok(())
1076    }
1077}
1078
1079impl SourceConfig {
1080    pub fn is_s3(&self) -> bool {
1081        self.location.starts_with("s3://")
1082    }
1083
1084    /// True when the location already contains a glob metacharacter
1085    /// (`*`, `?`, or `[`).
1086    pub fn has_glob(&self) -> bool {
1087        self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1088    }
1089
1090    /// Location to hand to a backend that needs an explicit parquet glob
1091    /// (DuckDB). When the location is a plain S3 prefix with no glob, append
1092    /// a recursive `**/*.parquet` so DuckDB lists the prefix the same way
1093    /// DataFusion's object-store listing does. Globbed or non-S3 locations
1094    /// are returned unchanged.
1095    pub fn s3_recursive_parquet_glob(&self) -> String {
1096        if !self.is_s3() || self.has_glob() {
1097            return self.location.clone();
1098        }
1099        let trimmed = self.location.trim_end_matches('/');
1100        format!("{trimmed}/**/*.parquet")
1101    }
1102
1103    /// Returns `(bucket, key_prefix_or_empty)` for an `s3://…` location.
1104    pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1105        let rest = self
1106            .location
1107            .strip_prefix("s3://")
1108            .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1109        let (bucket, key) = match rest.split_once('/') {
1110            Some((b, k)) => (b, k),
1111            None => (rest, ""),
1112        };
1113        if bucket.is_empty() {
1114            return Err(AppError::Internal(format!(
1115                "s3 URL missing bucket: {}",
1116                self.location
1117            )));
1118        }
1119        Ok((bucket, key))
1120    }
1121}
1122
1123impl DatasetConfig {
1124    /// Expand `source.location` to a concrete list of local `.parquet`
1125    /// files. Only valid for `kind = parquet` on local paths — S3 and
1126    /// Delta sources are resolved by the backend itself.
1127    ///
1128    /// Accepts three location shapes:
1129    ///   * a single `*.parquet` file
1130    ///   * a directory (lists every `*.parquet` directly inside, non-recursive)
1131    ///   * a glob pattern containing `*`, `?` or `[…]` (e.g.
1132    ///     `data/year=2024/*.parquet`, `data/**/*.parquet`)
1133    pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1134        if self.source.is_s3() {
1135            return Err(AppError::Internal(format!(
1136                "dataset '{}': resolve_local_parquet_files called on s3 source",
1137                self.name
1138            )));
1139        }
1140        let loc = &self.source.location;
1141
1142        // Glob pattern? Expand and require at least one match.
1143        if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1144            let mut files: Vec<PathBuf> = glob::glob(loc)
1145                .map_err(|e| {
1146                    AppError::Internal(format!(
1147                        "dataset '{}': bad glob pattern '{loc}': {e}",
1148                        self.name
1149                    ))
1150                })?
1151                .filter_map(|r| r.ok())
1152                .filter(|p| {
1153                    p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1154                })
1155                .collect();
1156            files.sort();
1157            if files.is_empty() {
1158                return Err(AppError::Internal(format!(
1159                    "dataset '{}': glob '{loc}' matched no .parquet files",
1160                    self.name
1161                )));
1162            }
1163            return Ok(files);
1164        }
1165
1166        let path = Path::new(loc);
1167        if !path.exists() {
1168            return Err(AppError::Internal(format!(
1169                "dataset '{}': source path does not exist: {loc}",
1170                self.name
1171            )));
1172        }
1173
1174        if path.is_file() {
1175            if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1176                return Err(AppError::Internal(format!(
1177                    "dataset '{}': source must be a .parquet file",
1178                    self.name
1179                )));
1180            }
1181            return Ok(vec![path.to_path_buf()]);
1182        }
1183
1184        let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1185            .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1186            .filter_map(|entry| entry.ok().map(|e| e.path()))
1187            .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1188            .collect();
1189        files.sort();
1190        if files.is_empty() {
1191            return Err(AppError::Internal(format!(
1192                "dataset '{}': no *.parquet files found in {loc}",
1193                self.name
1194            )));
1195        }
1196        Ok(files)
1197    }
1198
1199    /// Env-var prefix derived from the dataset name: uppercase with
1200    /// non-alphanumeric chars replaced by `_`. E.g. `sales.eu-1` →
1201    /// `SALES_EU_1`.
1202    pub fn env_prefix(&self) -> String {
1203        self.name
1204            .chars()
1205            .map(|c| {
1206                if c.is_ascii_alphanumeric() {
1207                    c.to_ascii_uppercase()
1208                } else {
1209                    '_'
1210                }
1211            })
1212            .collect()
1213    }
1214
1215    /// Resolve S3 credentials following the precedence chain documented at
1216    /// the top of this module. Returns an empty struct when nothing was
1217    /// found — the caller should then leave credential resolution to the
1218    /// engine's default provider chain.
1219    pub fn resolved_creds(&self) -> ResolvedCreds {
1220        let prefix = self.env_prefix();
1221        let from_env = |suffix: &str| {
1222            std::env::var(format!("{prefix}_{suffix}"))
1223                .ok()
1224                .filter(|s| !s.is_empty())
1225        };
1226        let inline = self.s3.as_ref();
1227        let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1228
1229        ResolvedCreds {
1230            access_key_id: from_env("AWS_ACCESS_KEY_ID")
1231                .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1232                .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1233            secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1234                .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1235                .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1236            session_token: from_env("AWS_SESSION_TOKEN")
1237                .or_else(|| inline.and_then(|s| s.session_token.clone()))
1238                .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1239        }
1240    }
1241
1242    /// Resolved S3 region: per-dataset env (`${PREFIX}_AWS_REGION`)
1243    /// → inline → `AWS_REGION` → `AWS_DEFAULT_REGION` → `us-east-1`.
1244    pub fn resolved_region(&self) -> String {
1245        let prefix = self.env_prefix();
1246        std::env::var(format!("{prefix}_AWS_REGION"))
1247            .ok()
1248            .filter(|s| !s.is_empty())
1249            .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1250            .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1251            .or_else(|| {
1252                std::env::var("AWS_DEFAULT_REGION")
1253                    .ok()
1254                    .filter(|s| !s.is_empty())
1255            })
1256            .unwrap_or_else(|| "us-east-1".to_string())
1257    }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262    use super::*;
1263
1264    #[test]
1265    fn server_defaults() {
1266        let s = ServerConfig::default();
1267        assert_eq!(s.backend, Backend::Datafusion);
1268        assert_eq!(s.port, 8080);
1269        assert!(s.compress);
1270        assert_eq!(s.max_body_bytes, 1024 * 1024);
1271        assert_eq!(s.max_page_size, 100_000);
1272        assert_eq!(s.request_timeout_ms, 30_000);
1273        assert!(!s.quack.enabled);
1274        assert_eq!(s.quack.uri, "quack:localhost");
1275        assert!(s.quack.token.is_none());
1276        assert!(!s.quack.allow_other_hostname);
1277        assert!(s.quack.read_only);
1278        assert_eq!(s.prefix, "");
1279        assert!(s.listen.is_loopback());
1280    }
1281
1282    #[test]
1283    fn server_overrides_from_toml() {
1284        let toml = r#"
1285            [server]
1286            backend = "duckdb"
1287            port = 9000
1288            prefix = "/datapress"
1289            compress = false
1290            max_body_bytes = 4096
1291            max_page_size = 50000
1292            request_timeout_ms = 0
1293
1294            [server.quack]
1295            enabled = true
1296            uri = "quack:localhost:9495"
1297            token = "test-token"
1298            read_only = false
1299            [[dataset]]
1300            name = "x"
1301            source.kind = "parquet"
1302            source.location = "/tmp/missing.parquet"
1303        "#;
1304        let cfg: AppConfig = toml::from_str(toml).unwrap();
1305        assert_eq!(cfg.server.backend, Backend::Duckdb);
1306        assert_eq!(cfg.server.port, 9000);
1307        assert_eq!(cfg.server.prefix, "/datapress");
1308        assert!(!cfg.server.compress);
1309        assert_eq!(cfg.server.max_body_bytes, 4096);
1310        assert_eq!(cfg.server.max_page_size, 50_000);
1311        assert_eq!(cfg.server.request_timeout_ms, 0);
1312        assert!(cfg.server.quack.enabled);
1313        assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1314        assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1315        assert!(!cfg.server.quack.read_only);
1316        assert_eq!(cfg.datasets.len(), 1);
1317        assert_eq!(cfg.datasets[0].name, "x");
1318        assert!(cfg.datasets[0].dict_encode); // default
1319    }
1320
1321    #[test]
1322    fn validate_rejects_bad_prefix() {
1323        let bad = ["no-leading-slash", "/trailing/"];
1324        for p in bad {
1325            let cfg = AppConfig {
1326                server: ServerConfig {
1327                    prefix: p.to_string(),
1328                    ..Default::default()
1329                },
1330                docs: DocsConfig::default(),
1331                swagger: SwaggerConfig::default(),
1332                metrics: MetricsConfig::default(),
1333                explorer: ExplorerConfig::default(),
1334                sql: SqlConfig::default(),
1335                auth: AuthConfig::default(),
1336                datasets: vec![],
1337            };
1338            assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1339        }
1340    }
1341
1342    #[test]
1343    fn normalize_lowercases_configured_scopes() {
1344        let mut cfg = AppConfig {
1345            server: ServerConfig::default(),
1346            docs: DocsConfig::default(),
1347            swagger: SwaggerConfig::default(),
1348            metrics: MetricsConfig::default(),
1349            explorer: ExplorerConfig::default(),
1350            sql: SqlConfig::default(),
1351            auth: AuthConfig {
1352                read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1353                reload_scopes: vec!["Datasets:Reload".into()],
1354                ..Default::default()
1355            },
1356            datasets: vec![],
1357        };
1358        cfg.normalize();
1359        assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1360        assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1361    }
1362
1363    #[test]
1364    fn validate_rejects_no_datasets() {
1365        let cfg = AppConfig {
1366            server: ServerConfig::default(),
1367            docs: DocsConfig::default(),
1368            swagger: SwaggerConfig::default(),
1369            metrics: MetricsConfig::default(),
1370            explorer: ExplorerConfig::default(),
1371            sql: SqlConfig::default(),
1372            auth: AuthConfig::default(),
1373            datasets: vec![],
1374        };
1375        let err = cfg.validate().unwrap_err();
1376        assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1377    }
1378
1379    #[cfg(feature = "auth")]
1380    #[test]
1381    fn validate_accepts_auth_issuer_with_trailing_slash() {
1382        use std::io::Write;
1383
1384        let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1385        let _ = std::fs::remove_dir_all(&dir);
1386        std::fs::create_dir_all(&dir).unwrap();
1387        let file = dir.join("a.parquet");
1388        std::fs::File::create(&file)
1389            .unwrap()
1390            .write_all(b"x")
1391            .unwrap();
1392
1393        let cfg = AppConfig {
1394            server: ServerConfig::default(),
1395            docs: DocsConfig::default(),
1396            swagger: SwaggerConfig::default(),
1397            metrics: MetricsConfig::default(),
1398            explorer: ExplorerConfig::default(),
1399            sql: SqlConfig::default(),
1400            auth: AuthConfig {
1401                enabled: true,
1402                issuer: "https://tenant.example.com/".into(),
1403                ..Default::default()
1404            },
1405            datasets: vec![DatasetConfig {
1406                name: "x".into(),
1407                source: SourceConfig {
1408                    kind: SourceKind::Parquet,
1409                    location: file.to_string_lossy().into_owned(),
1410                },
1411                s3: None,
1412                index: IndexConfig::default(),
1413                columns: vec![],
1414                dict_encode: true,
1415                lazy: false,
1416            }],
1417        };
1418
1419        assert!(cfg.validate().is_ok());
1420        let _ = std::fs::remove_dir_all(&dir);
1421    }
1422
1423    #[test]
1424    fn validate_rejects_quack_non_local_host_without_override() {
1425        let cfg = AppConfig {
1426            server: ServerConfig {
1427                quack: QuackConfig {
1428                    enabled: true,
1429                    uri: "quack:127.0.0.1".into(),
1430                    token: Some("test-token".into()),
1431                    ..Default::default()
1432                },
1433                ..Default::default()
1434            },
1435            docs: DocsConfig::default(),
1436            swagger: SwaggerConfig::default(),
1437            metrics: MetricsConfig::default(),
1438            explorer: ExplorerConfig::default(),
1439            sql: SqlConfig::default(),
1440            auth: AuthConfig::default(),
1441            datasets: vec![DatasetConfig {
1442                name: "x".into(),
1443                source: SourceConfig {
1444                    kind: SourceKind::Parquet,
1445                    location: "/tmp/missing.parquet".into(),
1446                },
1447                s3: None,
1448                index: IndexConfig::default(),
1449                columns: vec![],
1450                dict_encode: true,
1451                lazy: false,
1452            }],
1453        };
1454        let err = cfg.validate().unwrap_err();
1455        assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1456    }
1457
1458    #[test]
1459    fn validate_rejects_bad_dataset_name() {
1460        let cfg: AppConfig = toml::from_str(
1461            r#"
1462            [[dataset]]
1463            name = "bad name!"
1464            source.kind = "parquet"
1465            source.location = "/tmp/whatever"
1466        "#,
1467        )
1468        .unwrap();
1469        let err = cfg.validate().unwrap_err();
1470        assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1471    }
1472
1473    #[test]
1474    fn validate_rejects_duplicate_names() {
1475        use std::io::Write;
1476        let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1477        let _ = std::fs::remove_dir_all(&dir);
1478        std::fs::create_dir_all(&dir).unwrap();
1479        let f = dir.join("a.parquet");
1480        std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1481        let path = f.to_str().unwrap();
1482
1483        let cfg: AppConfig = toml::from_str(&format!(
1484            r#"
1485            [[dataset]]
1486            name = "a"
1487            source.kind = "parquet"
1488            source.location = "{path}"
1489            [[dataset]]
1490            name = "a"
1491            source.kind = "parquet"
1492            source.location = "{path}"
1493        "#
1494        ))
1495        .unwrap();
1496        let err = cfg.validate().expect_err("expected error");
1497        assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1498
1499        let _ = std::fs::remove_dir_all(&dir);
1500    }
1501
1502    #[test]
1503    fn s3_bucket_parsing() {
1504        let mk = |loc: &str| SourceConfig {
1505            kind: SourceKind::Parquet,
1506            location: loc.into(),
1507        };
1508        let s1 = mk("s3://bucket/path/key");
1509        assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1510        let s2 = mk("s3://only-bucket");
1511        assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1512        assert!(mk("s3:///nokey").s3_bucket().is_err());
1513        assert!(mk("/local/path").s3_bucket().is_err());
1514    }
1515
1516    #[test]
1517    fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1518        let mk = |loc: &str| SourceConfig {
1519            kind: SourceKind::Parquet,
1520            location: loc.into(),
1521        };
1522        // Plain prefix -> recursive parquet glob (trailing slash trimmed).
1523        assert_eq!(
1524            mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1525            "s3://bucket/logs/**/*.parquet"
1526        );
1527        assert_eq!(
1528            mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1529            "s3://bucket/logs/**/*.parquet"
1530        );
1531        // Already globbed -> unchanged.
1532        assert_eq!(
1533            mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1534            "s3://bucket/logs/*.parquet"
1535        );
1536        // Non-S3 -> unchanged.
1537        assert_eq!(
1538            mk("/local/logs").s3_recursive_parquet_glob(),
1539            "/local/logs"
1540        );
1541    }
1542
1543    #[test]
1544    fn effective_endpoint_folds_bucket_per_mode() {
1545        let virt = S3Config {
1546            endpoint: Some("https://s3.example.com".into()),
1547            addressing_style: AddressingStyle::Virtual,
1548            ..Default::default()
1549        };
1550        // Auto + virtual -> bucket folded into host.
1551        assert_eq!(
1552            virt.effective_endpoint("mybucket").as_deref(),
1553            Some("https://mybucket.s3.example.com")
1554        );
1555        // Idempotent: already-prefixed host is left alone.
1556        let prefixed = S3Config {
1557            endpoint: Some("https://mybucket.s3.example.com".into()),
1558            ..virt.clone()
1559        };
1560        assert_eq!(
1561            prefixed.effective_endpoint("mybucket").as_deref(),
1562            Some("https://mybucket.s3.example.com")
1563        );
1564        // Path style (auto) -> host untouched.
1565        let path = S3Config {
1566            addressing_style: AddressingStyle::Path,
1567            ..virt.clone()
1568        };
1569        assert_eq!(
1570            path.effective_endpoint("mybucket").as_deref(),
1571            Some("https://s3.example.com")
1572        );
1573        // Explicit overrides win over addressing style.
1574        let forced_off = S3Config {
1575            endpoint_bucket_in_host: BucketInHost::False,
1576            ..virt.clone()
1577        };
1578        assert_eq!(
1579            forced_off.effective_endpoint("mybucket").as_deref(),
1580            Some("https://s3.example.com")
1581        );
1582        let forced_on = S3Config {
1583            endpoint_bucket_in_host: BucketInHost::True,
1584            ..path.clone()
1585        };
1586        assert_eq!(
1587            forced_on.effective_endpoint("mybucket").as_deref(),
1588            Some("https://mybucket.s3.example.com")
1589        );
1590        // No endpoint -> None.
1591        assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1592    }
1593
1594    #[test]
1595    fn env_prefix_sanitises_name() {
1596        let mk = |name: &str| DatasetConfig {
1597            name: name.into(),
1598            source: SourceConfig {
1599                kind: SourceKind::Parquet,
1600                location: "x".into(),
1601            },
1602            s3: None,
1603            index: IndexConfig::default(),
1604            columns: vec![],
1605            dict_encode: true,
1606            lazy: false,
1607        };
1608        assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1609        assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1610        assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1611    }
1612
1613    #[test]
1614    fn resolve_local_parquet_single_file_and_dir() {
1615        use std::io::Write;
1616        let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1617        let _ = std::fs::remove_dir_all(&dir);
1618        std::fs::create_dir_all(&dir).unwrap();
1619        let f = dir.join("a.parquet");
1620        let mut fh = std::fs::File::create(&f).unwrap();
1621        fh.write_all(b"not really parquet").unwrap();
1622
1623        let mk = |loc: &str| DatasetConfig {
1624            name: "ds".into(),
1625            source: SourceConfig {
1626                kind: SourceKind::Parquet,
1627                location: loc.into(),
1628            },
1629            s3: None,
1630            index: IndexConfig::default(),
1631            columns: vec![],
1632            dict_encode: true,
1633            lazy: false,
1634        };
1635
1636        // Direct file.
1637        let files = mk(f.to_str().unwrap())
1638            .resolve_local_parquet_files()
1639            .unwrap();
1640        assert_eq!(files, vec![f.clone()]);
1641
1642        // Directory.
1643        let files = mk(dir.to_str().unwrap())
1644            .resolve_local_parquet_files()
1645            .unwrap();
1646        assert_eq!(files, vec![f.clone()]);
1647
1648        // Missing path.
1649        assert!(
1650            mk("/no/such/place.parquet")
1651                .resolve_local_parquet_files()
1652                .is_err()
1653        );
1654
1655        let _ = std::fs::remove_dir_all(&dir);
1656    }
1657}