Skip to main content

datapress_core/
config.rs

1//! Runtime configuration loaded from `datasets.toml`.
2//!
3//! Each instance binds to a list of datasets. A dataset's `[dataset.source]`
4//! block selects the format (`parquet` or `delta`) and the location (a
5//! local path or an `s3://bucket/key` URL). When the location is on S3,
6//! an optional `[dataset.s3]` block carries non-secret connection details
7//! (region, endpoint, addressing style, …).
8//!
9//! Credentials are resolved at runtime via [`DatasetConfig::resolved_creds`]
10//! in this precedence order:
11//!
12//! 1. Per-dataset env vars `${PREFIX}_AWS_ACCESS_KEY_ID`,
13//!    `${PREFIX}_AWS_SECRET_ACCESS_KEY`, `${PREFIX}_AWS_SESSION_TOKEN`
14//!    where `${PREFIX}` is the dataset name uppercased with non-alphanumeric
15//!    characters replaced by `_` (e.g. `accidents` → `ACCIDENTS`,
16//!    `sales.eu-1` → `SALES_EU_1`).
17//! 2. Inline `access_key_id` / `secret_access_key` / `session_token` in the
18//!    `[dataset.s3]` block.
19//! 3. Plain `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` /
20//!    `AWS_SESSION_TOKEN`.
21//! 4. None — fall back to the engine's own provider chain
22//!    (`~/.aws/credentials`, IMDS, …).
23
24use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32/// Mount paths the user MUST NOT pick for `[docs].path` or
33/// `[swagger].path` — they would shadow first-party routes (probes,
34/// API scopes, root).
35const RESERVED_MOUNTS: &[&str] = &[
36    "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39// ---------------------------------------------------------------------------
40// Public types
41// ---------------------------------------------------------------------------
42
43#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45    #[serde(default)]
46    pub server: ServerConfig,
47    #[serde(default)]
48    pub docs: DocsConfig,
49    #[serde(default)]
50    pub swagger: SwaggerConfig,
51    #[serde(default)]
52    pub metrics: MetricsConfig,
53    #[serde(default)]
54    pub explorer: ExplorerConfig,
55    #[serde(default)]
56    pub auth: AuthConfig,
57    #[serde(rename = "dataset", default)]
58    pub datasets: Vec<DatasetConfig>,
59}
60
61#[derive(Debug, Deserialize)]
62#[serde(default)]
63pub struct ServerConfig {
64    /// Which engine to run. Must match the binary's compile-time feature.
65    pub backend: Backend,
66    /// Listen address. Defaults to loopback (127.0.0.1) — explicitly opt in
67    /// to 0.0.0.0 if you want to expose the port.
68    pub listen: IpAddr,
69    /// TCP port.
70    pub port: u16,
71    /// Number of actix worker threads. `None` (= unset) → one per CPU.
72    pub workers: Option<usize>,
73    /// Optional URL path prefix — useful when sitting behind a reverse
74    /// proxy that rewrites e.g. `/datapress/...` → `/...`. When set, every
75    /// route is mounted under this prefix (so the proxy can pass the URL
76    /// through unchanged). Must start with `/` and not end with `/`; the
77    /// empty string (default) means no prefix.
78    pub prefix: String,
79    /// Negotiate response compression (gzip / brotli / zstd) via the
80    /// `Accept-Encoding` request header. Enabled by default. Disable when
81    /// running behind a proxy that already compresses, or when the extra
82    /// CPU is not worth the bandwidth saving.
83    pub compress: bool,
84    /// Maximum accepted JSON request body size, in bytes. Larger bodies
85    /// are rejected with `413 Payload Too Large` before any handler runs.
86    /// Default `1 MiB`. Most query bodies are well under 10 KiB; this is
87    /// a DoS guard, not a tuning knob.
88    pub max_body_bytes: usize,
89    /// Maximum rows returned by a single `/query` page. Larger
90    /// `page_size` values are clamped before the backend runs.
91    /// Default `100_000`.
92    pub max_page_size: u64,
93    /// Per-request handler timeout, in milliseconds. If a handler hasn't
94    /// produced a response within this budget the request is aborted with
95    /// `504 Gateway Timeout`. Default `30_000` (30 s). Set `0` to disable.
96    pub request_timeout_ms: u64,
97    /// Grace period for in-flight requests after the server has received
98    /// `SIGTERM` / `SIGINT`, in seconds. The listening socket is closed
99    /// immediately; existing connections then have up to this many
100    /// seconds to finish before workers are force-stopped. Default `30`.
101    pub shutdown_timeout_secs: u64,
102    /// Optional DuckDB Quack remote SQL server. Only used by the DuckDB
103    /// backend; ignored by DataFusion.
104    pub quack: QuackConfig,
105}
106
107impl Default for ServerConfig {
108    fn default() -> Self {
109        Self {
110            backend: Backend::default(),
111            listen: IpAddr::from([127, 0, 0, 1]),
112            port: 8080,
113            workers: None,
114            prefix: String::new(),
115            compress: true,
116            max_body_bytes: 1024 * 1024,
117            max_page_size: 100_000,
118            request_timeout_ms: 30_000,
119            shutdown_timeout_secs: 30,
120            quack: QuackConfig::default(),
121        }
122    }
123}
124
125/// Experimental DuckDB Quack remote protocol server.
126///
127/// Quack exposes the DuckDB SQL surface of the in-process database. Keep it
128/// disabled unless you intentionally want DuckDB clients to attach/query this
129/// process directly.
130#[derive(Debug, Clone, Deserialize)]
131#[serde(default)]
132pub struct QuackConfig {
133    /// Install/load the Quack extension and start `quack_serve` after
134    /// datasets are registered.
135    pub enabled: bool,
136    /// Quack URI to listen on. `quack:localhost` uses DuckDB's default
137    /// port 9494.
138    pub uri: String,
139    /// Optional explicit authentication token. If omitted, Quack generates
140    /// one at startup and DataPress logs it once.
141    pub token: Option<String>,
142    /// Allow binding a non-local hostname such as `quack:0.0.0.0:9494`.
143    /// For external exposure, put a TLS-terminating reverse proxy in front.
144    pub allow_other_hostname: bool,
145    /// Install a read-only authorization macro for remote queries. Enabled
146    /// by default to match DataPress' read-oriented HTTP API.
147    pub read_only: bool,
148}
149
150impl Default for QuackConfig {
151    fn default() -> Self {
152        Self {
153            enabled: false,
154            uri: "quack:localhost".into(),
155            token: None,
156            allow_other_hostname: false,
157            read_only: true,
158        }
159    }
160}
161
162impl QuackConfig {
163    /// Validate the enabled Quack configuration against DuckDB's current
164    /// safety rules. The extension treats only the literal `localhost` as
165    /// local unless `allow_other_hostname` is set.
166    pub fn validate_enabled(&self) -> Result<(), AppError> {
167        if self.uri.trim().is_empty() {
168            return Err(AppError::Internal(
169                "server.quack.uri must not be empty when server.quack.enabled = true".into(),
170            ));
171        }
172        if !self.uri.starts_with("quack:") {
173            return Err(AppError::Internal(format!(
174                "server.quack.uri must start with 'quack:' (got '{}')",
175                self.uri
176            )));
177        }
178        if !self.allow_other_hostname {
179            let host = self.hostname().unwrap_or_default();
180            if host != "localhost" {
181                return Err(AppError::Internal(format!(
182                    "server.quack.uri host must be 'localhost' unless \
183                     server.quack.allow_other_hostname = true (got '{}')",
184                    self.uri
185                )));
186            }
187        }
188        if let Some(token) = self.token.as_deref()
189            && token.len() < 4
190        {
191            return Err(AppError::Internal(
192                "server.quack.token must be at least 4 characters".into(),
193            ));
194        }
195        Ok(())
196    }
197
198    fn hostname(&self) -> Option<&str> {
199        let rest = self.uri.strip_prefix("quack:")?;
200        let rest = rest.strip_prefix("//").unwrap_or(rest);
201        let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
202        (!host.is_empty()).then_some(host)
203    }
204}
205
206#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
207#[serde(rename_all = "lowercase")]
208pub enum Backend {
209    #[default]
210    Datafusion,
211    Duckdb,
212}
213
214/// Embedded MkDocs documentation site (`[docs]` block).
215///
216/// Enabled by default — when the binary was built with the `docs`
217/// cargo feature, the site is served at [`DocsConfig::path`] out of
218/// the box. Set `enabled = false` in `datasets.toml` to suppress it
219/// (e.g. in prod). When the binary was built without the feature,
220/// `enabled = true` is harmless: the server logs a warning at startup
221/// and skips the mount. The mount path must be a non-trivial sub-path;
222/// reserved API and probe roots are rejected at startup.
223#[derive(Debug, Clone, Deserialize)]
224#[serde(default, deny_unknown_fields)]
225pub struct DocsConfig {
226    pub enabled: bool,
227    pub path: String,
228}
229
230impl Default for DocsConfig {
231    fn default() -> Self {
232        Self {
233            enabled: true,
234            path: "/mkdocs".into(),
235        }
236    }
237}
238
239/// Swagger UI + embedded OpenAPI spec (`[swagger]` block).
240///
241/// Enabled by default — when the binary was built with the `swagger`
242/// cargo feature, an interactive Swagger UI is served at
243/// [`SwaggerConfig::path`] (default `/docs`) and the raw OpenAPI JSON
244/// at `<path>/openapi.json`. Set `enabled = false` in `datasets.toml`
245/// to suppress it (e.g. in prod). When the binary was built without
246/// the feature, `enabled = true` is harmless: the server logs a
247/// warning at startup and skips the mount.
248///
249/// To let users sign in to the UI itself (Authorization Code + PKCE
250/// against any OIDC provider), populate the optional `[swagger.oauth2]`
251/// sub-block. Acquired tokens are attached as `Authorization: Bearer …`
252/// to every "Try it out" request — useful for exercising auth-protected
253/// endpoints from the docs page. This drives the UI only; it does not
254/// turn on server-side token validation.
255#[derive(Debug, Clone, Deserialize)]
256#[serde(default, deny_unknown_fields)]
257pub struct SwaggerConfig {
258    pub enabled: bool,
259    pub path: String,
260    pub oauth2: Option<SwaggerOAuth2Config>,
261}
262
263impl Default for SwaggerConfig {
264    fn default() -> Self {
265        Self {
266            enabled: true,
267            path: "/docs".into(),
268            oauth2: None,
269        }
270    }
271}
272
273/// OIDC single-sign-on for the Swagger UI (`[swagger.oauth2]`).
274///
275/// Configures the UI to drive an Authorization Code + PKCE flow against
276/// the given OIDC issuer. Swagger UI auto-discovers the authorize /
277/// token endpoints from `<issuer>/.well-known/openid-configuration`,
278/// so we don't need to pin them here.
279///
280/// All fields are required when the block is present — there is no
281/// sensible default for `issuer` or `client_id`.
282#[derive(Debug, Clone, Deserialize)]
283#[serde(deny_unknown_fields)]
284pub struct SwaggerOAuth2Config {
285    /// OIDC issuer URL, e.g.
286    /// `https://login.microsoftonline.com/<tenant>/v2.0` or
287    /// `https://accounts.google.com`. Must not end in `/`.
288    pub issuer: String,
289    /// Public OAuth2 client identifier registered with the IdP. The
290    /// client must be a SPA / public client (no secret) with
291    /// `https://<your-host>{swagger.path}/oauth2-redirect.html` listed
292    /// as an allowed redirect URI.
293    pub client_id: String,
294    /// Scopes to request by default. Will be pre-checked in the Swagger
295    /// UI authorize dialog; users can edit them before signing in.
296    /// `openid` is always added if missing.
297    #[serde(default)]
298    pub scopes: Vec<String>,
299    /// Use PKCE for the authorization code flow. Defaults to `true`;
300    /// disable only if your IdP doesn't support PKCE for public clients.
301    #[serde(default = "default_true")]
302    pub pkce: bool,
303}
304
305/// Prometheus metrics endpoint (`[metrics]` block).
306///
307/// Disabled by default. When `enabled = true` (and the binary was built
308/// with the `metrics` cargo feature), the server installs a middleware
309/// that records per-request HTTP counters and latency histograms, and
310/// exposes them in the Prometheus text exposition format at
311/// [`MetricsConfig::path`] (default `/metrics`).
312///
313/// The endpoint is mounted at a fixed, *unprefixed* path — like the
314/// health probes — so a scrape config doesn't need to know about any
315/// reverse-proxy `server.prefix`. It is **not** behind the `[auth]`
316/// layer: Prometheus scrapers rarely carry bearer tokens, and the
317/// endpoint exposes only aggregate request metrics (no row data). Keep
318/// it on a network the scraper can reach but the public cannot, e.g. by
319/// binding `server.listen` to a private interface.
320///
321/// When the binary was built without the `metrics` feature,
322/// `enabled = true` is harmless: the server logs a warning at startup
323/// and skips the endpoint.
324#[derive(Debug, Clone, Deserialize)]
325#[serde(default, deny_unknown_fields)]
326pub struct MetricsConfig {
327    pub enabled: bool,
328    pub path: String,
329}
330
331impl Default for MetricsConfig {
332    fn default() -> Self {
333        Self {
334            enabled: false,
335            path: "/metrics".into(),
336        }
337    }
338}
339
340/// Embedded dataset explorer UI (`[explorer]` block).
341///
342/// A server-rendered web app (Actix + Askama templates + htmx +
343/// Bootstrap) served at [`ExplorerConfig::path`] (default `/explore`).
344/// It offers a *discovery* view — per-dataset stats, schema, index and
345/// source configuration — and an in-browser *DuckDB* console (DuckDB-WASM)
346/// that queries each dataset's Parquet export directly.
347///
348/// Enabled by default. When the binary was built without the `explorer`
349/// cargo feature, `enabled = true` is harmless: the server logs a warning
350/// at startup and skips the mount. Set `enabled = false` to suppress it at
351/// runtime even when the feature is compiled in.
352#[derive(Debug, Clone, Deserialize)]
353#[serde(default, deny_unknown_fields)]
354pub struct ExplorerConfig {
355    pub enabled: bool,
356    pub path: String,
357}
358
359impl Default for ExplorerConfig {
360    fn default() -> Self {
361        Self {
362            enabled: true,
363            path: "/explore".into(),
364        }
365    }
366}
367
368/// OIDC bearer-token enforcement for the HTTP API (`[auth]` block).
369///
370/// Disabled by default. When `enabled = true`, the server validates
371/// every request's `Authorization: Bearer …` JWT against the JWKS
372/// discovered from the issuer's OIDC metadata
373/// (`<issuer>/.well-known/openid-configuration` → `jwks_uri`), then
374/// enforces the configured scope requirements per route.
375///
376/// Only compiled in when the binary was built with the `auth` cargo
377/// feature. Without the feature, `enabled = true` is rejected at
378/// startup so a misconfigured production deployment can't silently
379/// fall back to "no auth".
380///
381/// The Swagger UI's SSO support (`[swagger.oauth2]`) is *independent*
382/// of this block — `[swagger.oauth2]` only drives the UI's login
383/// dialog; `[auth]` is what enforces tokens on the API.
384#[derive(Debug, Clone, Deserialize)]
385#[serde(default, deny_unknown_fields)]
386pub struct AuthConfig {
387    /// Master switch. `false` (default) skips all auth processing.
388    pub enabled: bool,
389    /// OIDC issuer URL — must match the `iss` claim of every accepted
390    /// token. Required when `enabled = true`.
391    pub issuer: String,
392    /// Expected `aud` claim. When empty, audience validation is
393    /// skipped (not recommended in production).
394    pub audience: String,
395    /// Scopes a caller must hold to read datasets (GET endpoints +
396    /// POST `…/query` and `…/count`). Empty list means "no scope check,
397    /// just a valid token is enough".
398    pub read_scopes: Vec<String>,
399    /// Scopes required for admin/mutation endpoints (POST `…/reload`).
400    /// Empty list means "no scope check, just a valid token is enough".
401    pub reload_scopes: Vec<String>,
402    /// Allow unauthenticated GETs through. Useful for public datasets
403    /// and demo deployments. Defaults to `false`.
404    pub anonymous_read: bool,
405    /// Continue serving even if the JWKS fetch fails at startup.
406    /// When `true` (default), the server starts in a degraded mode that
407    /// rejects every auth'd request with 503 until JWKS becomes
408    /// reachable. When `false`, startup fails outright.
409    pub start_degraded: bool,
410    /// Allowed signing algorithms. Pinned to RS256 by default; never
411    /// include `HS*` or `none` here unless you really know what you're
412    /// doing.
413    pub algorithms: Vec<String>,
414    /// Clock-skew leeway for `exp`/`nbf` checks, in seconds.
415    pub leeway_secs: u64,
416    /// How often (in seconds) the background refresher re-fetches the
417    /// JWKS. On a `kid` cache miss the JWKS is also refreshed
418    /// out-of-band.
419    pub jwks_refresh_secs: u64,
420    /// Optional JSON-pointer into the JWT claims that extracts a
421    /// tenant identifier — attached to the principal and logged on
422    /// every request. Example: `"/tid"` (Azure AD), `"/org_id"`.
423    /// When empty, no tenant is extracted.
424    pub tenant_claim: String,
425    /// If non-empty, requests whose extracted tenant ID is not in this
426    /// list are rejected with 403. Has no effect when `tenant_claim`
427    /// is empty.
428    pub allowed_tenants: Vec<String>,
429    /// If `true`, `POST …/reload` accepts *either* a valid token with
430    /// `reload_scopes` *or* the legacy `X-Admin-Token` header. Defaults
431    /// to `true` for one-release backwards compatibility — flip to
432    /// `false` once your automation has migrated to OIDC.
433    pub admin_token_fallback: bool,
434}
435
436impl Default for AuthConfig {
437    fn default() -> Self {
438        Self {
439            enabled: false,
440            issuer: String::new(),
441            audience: String::new(),
442            read_scopes: Vec::new(),
443            reload_scopes: Vec::new(),
444            anonymous_read: false,
445            start_degraded: true,
446            algorithms: vec!["RS256".into()],
447            leeway_secs: 60,
448            jwks_refresh_secs: 3600,
449            tenant_claim: String::new(),
450            allowed_tenants: Vec::new(),
451            admin_token_fallback: true,
452        }
453    }
454}
455
456impl Backend {
457    pub fn as_str(self) -> &'static str {
458        match self {
459            Backend::Datafusion => "datafusion",
460            Backend::Duckdb => "duckdb",
461        }
462    }
463}
464
465#[derive(Debug, Clone, Deserialize)]
466pub struct DatasetConfig {
467    pub name: String,
468    pub source: SourceConfig,
469    #[serde(default)]
470    pub s3: Option<S3Config>,
471    #[serde(default)]
472    pub index: IndexConfig,
473    /// Optional column projection applied at load time. When non-empty,
474    /// only the listed columns are read from the parquet/delta source —
475    /// every other column is skipped entirely (no decode, no allocation,
476    /// no resident memory). Empty (default) = read all columns. Names are
477    /// matched case-insensitively against the source schema.
478    #[serde(default)]
479    pub columns: Vec<String>,
480    /// When `true` (default), Utf8 columns that are dictionary-encoded in
481    /// the source parquet are read as Arrow `Dictionary(Int32, Utf8)`
482    /// instead of being expanded to plain Utf8. Massively cheaper in RAM
483    /// for low-cardinality columns. Set to `false` to bypass the override
484    /// — useful as a workaround if you observe null-handling oddities on
485    /// a particular parquet file.
486    #[serde(default = "default_true")]
487    pub dict_encode: bool,
488    /// When `true`, the backend should keep the dataset on disk and stream
489    /// it at query time instead of materialising it into RAM at startup.
490    /// Trades the in-memory hot paths (raw Arrow slice, equality index)
491    /// for bounded memory use on large / multi-file sources. Honoured by
492    /// the DataFusion backend (local + S3 parquet) and by the DuckDB
493    /// backend, which registers the dataset as a view over the source scan
494    /// (local + S3 parquet, and delta) rather than materialising a table.
495    #[serde(default)]
496    pub lazy: bool,
497}
498
499fn default_true() -> bool {
500    true
501}
502
503#[derive(Debug, Clone, Deserialize)]
504pub struct SourceConfig {
505    pub kind: SourceKind,
506    /// Either a local filesystem path or an `s3://bucket/key` URL.
507    pub location: String,
508}
509
510#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
511#[serde(rename_all = "lowercase")]
512pub enum SourceKind {
513    #[default]
514    Parquet,
515    Delta,
516}
517
518impl SourceKind {
519    pub fn as_str(self) -> &'static str {
520        match self {
521            SourceKind::Parquet => "parquet",
522            SourceKind::Delta => "delta",
523        }
524    }
525}
526
527/// Non-secret S3 connection settings. Credentials are pulled from env / the
528/// AWS credential chain — see [`DatasetConfig::resolved_creds`].
529#[derive(Debug, Clone, Deserialize)]
530#[serde(default)]
531pub struct S3Config {
532    pub region: Option<String>,
533    /// Custom endpoint (MinIO, R2, Wasabi, LocalStack, …). Omit for AWS.
534    pub endpoint: Option<String>,
535    /// `virtual` (default — `bucket.host`) or `path` (`host/bucket/`).
536    /// MinIO and most non-AWS providers require `path`.
537    pub addressing_style: AddressingStyle,
538    /// Allow plain-HTTP endpoints. Required for local MinIO over `http://…`.
539    pub allow_http: bool,
540    /// Inline credentials. Strongly discouraged in production — prefer env
541    /// vars (see module docs).
542    pub access_key_id: Option<String>,
543    pub secret_access_key: Option<String>,
544    pub session_token: Option<String>,
545    /// Hive partition-column handling for parquet sources. Defaults to
546    /// `auto` (detect from the path). See [`Partitioning`].
547    pub partitioning: Partitioning,
548    /// Whether the bucket is folded into a custom `endpoint` host for
549    /// virtual-hosted-style requests. Defaults to `auto`. See
550    /// [`BucketInHost`].
551    pub endpoint_bucket_in_host: BucketInHost,
552}
553
554impl Default for S3Config {
555    fn default() -> Self {
556        Self {
557            region: None,
558            endpoint: None,
559            addressing_style: AddressingStyle::Virtual,
560            allow_http: false,
561            access_key_id: None,
562            secret_access_key: None,
563            session_token: None,
564            partitioning: Partitioning::Auto,
565            endpoint_bucket_in_host: BucketInHost::Auto,
566        }
567    }
568}
569
570impl S3Config {
571    /// Resolve the endpoint URL to hand to the object store, optionally
572    /// folding `bucket` into the host for virtual-hosted-style requests so a
573    /// plain `endpoint` works the same way it does on DuckDB.
574    ///
575    /// Returns `None` when no custom endpoint is configured (AWS default).
576    /// The bucket is only prepended when it isn't already the leading host
577    /// label, so re-running this (or a config that already embeds the
578    /// bucket) never produces `bucket.bucket.host`.
579    pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
580        let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
581
582        let fold = match self.endpoint_bucket_in_host {
583            BucketInHost::False => false,
584            BucketInHost::True => true,
585            BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
586        };
587        if !fold {
588            return Some(ep.to_string());
589        }
590
591        let (scheme, host_and_path) = match ep.split_once("://") {
592            Some((s, rest)) => (Some(s), rest),
593            None => (None, ep),
594        };
595        // Split host from any trailing path so we prefix the host label only.
596        let (host, path) = match host_and_path.split_once('/') {
597            Some((h, p)) => (h, Some(p)),
598            None => (host_and_path, None),
599        };
600        // Guard against double-prefixing.
601        if host == bucket || host.starts_with(&format!("{bucket}.")) {
602            return Some(ep.to_string());
603        }
604        let new_host = format!("{bucket}.{host}");
605        let rebuilt = match (scheme, path) {
606            (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
607            (Some(s), None) => format!("{s}://{new_host}"),
608            (None, Some(p)) => format!("{new_host}/{p}"),
609            (None, None) => new_host,
610        };
611        Some(rebuilt)
612    }
613}
614
615#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
616#[serde(rename_all = "lowercase")]
617pub enum AddressingStyle {
618    #[default]
619    Virtual,
620    Path,
621}
622
623impl AddressingStyle {
624    pub fn as_str(self) -> &'static str {
625        match self {
626            AddressingStyle::Virtual => "virtual",
627            AddressingStyle::Path => "path",
628        }
629    }
630}
631
632/// How hive-style partition columns (`key=value/` path segments) are handled
633/// for an S3 parquet source. Local parquet always auto-detects; this option
634/// brings S3 in line and lets you force or disable the behaviour.
635#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
636#[serde(rename_all = "lowercase")]
637pub enum Partitioning {
638    /// Detect `key=value` segments from the location glob or by listing the
639    /// prefix. No partition columns are added when none are found.
640    #[default]
641    Auto,
642    /// Force hive partitioning. Partition keys are taken from the location
643    /// glob, or discovered by listing the prefix.
644    Hive,
645    /// Treat the source as a flat set of parquet files — never add partition
646    /// columns even if the path looks hive-partitioned.
647    None,
648}
649
650impl Partitioning {
651    pub fn as_str(self) -> &'static str {
652        match self {
653            Partitioning::Auto => "auto",
654            Partitioning::Hive => "hive",
655            Partitioning::None => "none",
656        }
657    }
658}
659
660/// Whether the bucket name is folded into the endpoint hostname for
661/// virtual-hosted-style requests against a custom endpoint. This aligns the
662/// DataFusion object-store path with DuckDB, which builds the virtual host
663/// itself — so the same plain `endpoint` works on both backends.
664#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
665#[serde(rename_all = "lowercase")]
666pub enum BucketInHost {
667    /// Fold the bucket into the host when `addressing_style = "virtual"` and
668    /// a custom `endpoint` is set (guarded against double-prefixing).
669    #[default]
670    Auto,
671    /// Always fold the bucket into the endpoint host.
672    True,
673    /// Never rewrite the endpoint — pass it through verbatim.
674    False,
675}
676
677impl BucketInHost {
678    pub fn as_str(self) -> &'static str {
679        match self {
680            BucketInHost::Auto => "auto",
681            BucketInHost::True => "true",
682            BucketInHost::False => "false",
683        }
684    }
685}
686
687#[derive(Debug, Clone, Deserialize)]
688#[serde(default)]
689pub struct IndexConfig {
690    pub mode: IndexMode,
691    pub columns: Vec<String>,
692    pub max_cardinality: usize,
693}
694
695impl Default for IndexConfig {
696    fn default() -> Self {
697        Self {
698            mode: IndexMode::Auto,
699            columns: Vec::new(),
700            max_cardinality: 100_000,
701        }
702    }
703}
704
705#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
706#[serde(rename_all = "lowercase")]
707pub enum IndexMode {
708    #[default]
709    Auto,
710    None,
711    List,
712}
713
714/// Resolved S3 credentials. `None` fields mean "let the engine's default
715/// provider chain figure it out".
716#[derive(Debug, Clone, Default)]
717pub struct ResolvedCreds {
718    pub access_key_id: Option<String>,
719    pub secret_access_key: Option<String>,
720    pub session_token: Option<String>,
721}
722
723impl ResolvedCreds {
724    pub fn has_keypair(&self) -> bool {
725        self.access_key_id.is_some() && self.secret_access_key.is_some()
726    }
727}
728
729// ---------------------------------------------------------------------------
730// Loading + validation
731// ---------------------------------------------------------------------------
732
733impl AppConfig {
734    /// Read and validate a TOML config file.
735    pub fn load(path: &str) -> Result<Self, AppError> {
736        let raw = std::fs::read_to_string(path)
737            .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
738        let mut cfg: AppConfig =
739            toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
740        cfg.normalize();
741        cfg.validate()?;
742        Ok(cfg)
743    }
744
745    /// Canonicalise fields that are compared case-insensitively at runtime.
746    ///
747    /// Token scopes are lowercased when parsed out of a JWT (see `auth.rs`),
748    /// so the configured `read_scopes` / `reload_scopes` are lowercased here
749    /// once at load time. Without this an operator who writes
750    /// `"Datasets:Read"` would silently 403 every caller, since the token
751    /// side would have become `datasets:read`.
752    fn normalize(&mut self) {
753        for s in self
754            .auth
755            .read_scopes
756            .iter_mut()
757            .chain(self.auth.reload_scopes.iter_mut())
758        {
759            *s = s.to_ascii_lowercase();
760        }
761    }
762
763    fn validate(&self) -> Result<(), AppError> {
764        // Server prefix: empty, or must start with '/' and not end with '/'.
765        let p = &self.server.prefix;
766        if !p.is_empty() {
767            if !p.starts_with('/') {
768                return Err(AppError::Internal(format!(
769                    "server.prefix must start with '/' (got '{p}')"
770                )));
771            }
772            if p.ends_with('/') {
773                return Err(AppError::Internal(format!(
774                    "server.prefix must not end with '/' (got '{p}')"
775                )));
776            }
777        }
778
779        if self.datasets.is_empty() {
780            return Err(AppError::Internal(
781                "datasets.toml has no [[dataset]] entries".into(),
782            ));
783        }
784
785        if self.server.quack.enabled {
786            self.server.quack.validate_enabled()?;
787        }
788
789        // Validate the docs mount path even when the section is disabled,
790        // so an inactive config typo can't go unnoticed.
791        {
792            let dp = &self.docs.path;
793            if !dp.starts_with('/') {
794                return Err(AppError::Internal(format!(
795                    "docs.path must start with '/' (got '{dp}')"
796                )));
797            }
798            if dp.len() > 1 && dp.ends_with('/') {
799                return Err(AppError::Internal(format!(
800                    "docs.path must not end with '/' (got '{dp}')"
801                )));
802            }
803            if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
804                return Err(AppError::Internal(format!(
805                    "docs.path '{dp}' collides with a reserved route"
806                )));
807            }
808        }
809
810        // Same for the swagger UI mount.
811        {
812            let sp = &self.swagger.path;
813            if !sp.starts_with('/') {
814                return Err(AppError::Internal(format!(
815                    "swagger.path must start with '/' (got '{sp}')"
816                )));
817            }
818            if sp.len() > 1 && sp.ends_with('/') {
819                return Err(AppError::Internal(format!(
820                    "swagger.path must not end with '/' (got '{sp}')"
821                )));
822            }
823            if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
824                return Err(AppError::Internal(format!(
825                    "swagger.path '{sp}' collides with a reserved route"
826                )));
827            }
828            if sp == &self.docs.path {
829                return Err(AppError::Internal(format!(
830                    "swagger.path and docs.path must differ (both '{sp}')"
831                )));
832            }
833            if let Some(o) = &self.swagger.oauth2 {
834                if o.issuer.trim().is_empty() {
835                    return Err(AppError::Internal(
836                        "swagger.oauth2.issuer must not be empty".into(),
837                    ));
838                }
839                if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
840                    return Err(AppError::Internal(format!(
841                        "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
842                        o.issuer
843                    )));
844                }
845                if o.client_id.trim().is_empty() {
846                    return Err(AppError::Internal(
847                        "swagger.oauth2.client_id must not be empty".into(),
848                    ));
849                }
850            }
851        }
852
853        // Metrics endpoint mount path. Validated even when disabled so an
854        // inactive config typo can't go unnoticed. `/metrics` is itself a
855        // reserved mount (so docs/swagger can't shadow it), so we check the
856        // remaining reserved routes — and the docs/swagger paths — for
857        // collisions rather than the whole list.
858        {
859            let mp = &self.metrics.path;
860            if !mp.starts_with('/') {
861                return Err(AppError::Internal(format!(
862                    "metrics.path must start with '/' (got '{mp}')"
863                )));
864            }
865            if mp.len() > 1 && mp.ends_with('/') {
866                return Err(AppError::Internal(format!(
867                    "metrics.path must not end with '/' (got '{mp}')"
868                )));
869            }
870            if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
871                return Err(AppError::Internal(format!(
872                    "metrics.path '{mp}' collides with a reserved route"
873                )));
874            }
875            if mp == &self.docs.path {
876                return Err(AppError::Internal(format!(
877                    "metrics.path and docs.path must differ (both '{mp}')"
878                )));
879            }
880            if mp == &self.swagger.path {
881                return Err(AppError::Internal(format!(
882                    "metrics.path and swagger.path must differ (both '{mp}')"
883                )));
884            }
885        }
886
887        // Explorer UI mount path. Validated even when disabled so an
888        // inactive config typo can't go unnoticed.
889        {
890            let ep = &self.explorer.path;
891            if !ep.starts_with('/') {
892                return Err(AppError::Internal(format!(
893                    "explorer.path must start with '/' (got '{ep}')"
894                )));
895            }
896            if ep.len() > 1 && ep.ends_with('/') {
897                return Err(AppError::Internal(format!(
898                    "explorer.path must not end with '/' (got '{ep}')"
899                )));
900            }
901            if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
902                return Err(AppError::Internal(format!(
903                    "explorer.path '{ep}' collides with a reserved route"
904                )));
905            }
906            if ep == &self.docs.path {
907                return Err(AppError::Internal(format!(
908                    "explorer.path and docs.path must differ (both '{ep}')"
909                )));
910            }
911            if ep == &self.swagger.path {
912                return Err(AppError::Internal(format!(
913                    "explorer.path and swagger.path must differ (both '{ep}')"
914                )));
915            }
916            if ep == &self.metrics.path {
917                return Err(AppError::Internal(format!(
918                    "explorer.path and metrics.path must differ (both '{ep}')"
919                )));
920            }
921        }
922
923        // Auth block — only meaningful when `enabled = true`. The cargo
924        // feature gate is enforced separately in `server::serve` so a
925        // binary built without `--features auth` and a config with
926        // `auth.enabled = true` aborts with a clear error.
927        if self.auth.enabled {
928            let a = &self.auth;
929            if a.issuer.trim().is_empty() {
930                return Err(AppError::Internal(
931                    "auth.issuer must not be empty when auth.enabled = true".into(),
932                ));
933            }
934            if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
935                return Err(AppError::Internal(format!(
936                    "auth.issuer must be an absolute http(s) URL (got '{}')",
937                    a.issuer
938                )));
939            }
940            for alg in &a.algorithms {
941                match alg.as_str() {
942                    "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
943                    | "PS512" => {}
944                    other => {
945                        return Err(AppError::Internal(format!(
946                            "auth.algorithms[{other}] is not allowed; pick one of \
947                         RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
948                        )));
949                    }
950                }
951            }
952            if a.algorithms.is_empty() {
953                return Err(AppError::Internal(
954                    "auth.algorithms must not be empty".into(),
955                ));
956            }
957            if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
958                return Err(AppError::Internal(format!(
959                    "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
960                    a.tenant_claim
961                )));
962            }
963            if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
964                return Err(AppError::Internal(
965                    "auth.allowed_tenants is set but auth.tenant_claim is empty — \
966                     can't enforce a tenant allow-list without a claim to extract from"
967                        .into(),
968                ));
969            }
970        }
971
972        let mut seen = HashSet::new();
973        for d in &self.datasets {
974            if !seen.insert(d.name.as_str()) {
975                return Err(AppError::Internal(format!(
976                    "duplicate dataset name: {}",
977                    d.name
978                )));
979            }
980            if d.name.is_empty() {
981                return Err(AppError::Internal("dataset name must not be empty".into()));
982            }
983            // URL-safe: alphanum + _ - .
984            if !d
985                .name
986                .chars()
987                .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
988            {
989                return Err(AppError::Internal(format!(
990                    "dataset name '{}' must be alphanumeric (plus _ - .)",
991                    d.name
992                )));
993            }
994
995            if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
996                return Err(AppError::Internal(format!(
997                    "dataset '{}': index.mode = 'list' requires non-empty index.columns",
998                    d.name
999                )));
1000            }
1001
1002            // Location-specific checks.
1003            if d.source.is_s3() {
1004                d.source.s3_bucket()?;
1005                if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1006                    && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1007                    && std::env::var("AWS_REGION").is_err()
1008                    && std::env::var("AWS_DEFAULT_REGION").is_err()
1009                {
1010                    log::warn!(
1011                        "dataset '{}': S3 source without explicit region — \
1012                         relying on AWS_REGION env var",
1013                        d.name
1014                    );
1015                }
1016            } else {
1017                // Local path. For parquet we can fully resolve to a file
1018                // list up front; for delta we only check that the directory
1019                // exists (delta has its own layout — _delta_log/, …).
1020                match d.source.kind {
1021                    SourceKind::Parquet => {
1022                        d.resolve_local_parquet_files()?;
1023                    }
1024                    SourceKind::Delta => {
1025                        let p = Path::new(&d.source.location);
1026                        if !p.exists() {
1027                            return Err(AppError::Internal(format!(
1028                                "dataset '{}': delta location does not exist: {}",
1029                                d.name, d.source.location
1030                            )));
1031                        }
1032                    }
1033                }
1034            }
1035        }
1036        Ok(())
1037    }
1038}
1039
1040impl SourceConfig {
1041    pub fn is_s3(&self) -> bool {
1042        self.location.starts_with("s3://")
1043    }
1044
1045    /// True when the location already contains a glob metacharacter
1046    /// (`*`, `?`, or `[`).
1047    pub fn has_glob(&self) -> bool {
1048        self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1049    }
1050
1051    /// Location to hand to a backend that needs an explicit parquet glob
1052    /// (DuckDB). When the location is a plain S3 prefix with no glob, append
1053    /// a recursive `**/*.parquet` so DuckDB lists the prefix the same way
1054    /// DataFusion's object-store listing does. Globbed or non-S3 locations
1055    /// are returned unchanged.
1056    pub fn s3_recursive_parquet_glob(&self) -> String {
1057        if !self.is_s3() || self.has_glob() {
1058            return self.location.clone();
1059        }
1060        let trimmed = self.location.trim_end_matches('/');
1061        format!("{trimmed}/**/*.parquet")
1062    }
1063
1064    /// Returns `(bucket, key_prefix_or_empty)` for an `s3://…` location.
1065    pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1066        let rest = self
1067            .location
1068            .strip_prefix("s3://")
1069            .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1070        let (bucket, key) = match rest.split_once('/') {
1071            Some((b, k)) => (b, k),
1072            None => (rest, ""),
1073        };
1074        if bucket.is_empty() {
1075            return Err(AppError::Internal(format!(
1076                "s3 URL missing bucket: {}",
1077                self.location
1078            )));
1079        }
1080        Ok((bucket, key))
1081    }
1082}
1083
1084impl DatasetConfig {
1085    /// Expand `source.location` to a concrete list of local `.parquet`
1086    /// files. Only valid for `kind = parquet` on local paths — S3 and
1087    /// Delta sources are resolved by the backend itself.
1088    ///
1089    /// Accepts three location shapes:
1090    ///   * a single `*.parquet` file
1091    ///   * a directory (lists every `*.parquet` directly inside, non-recursive)
1092    ///   * a glob pattern containing `*`, `?` or `[…]` (e.g.
1093    ///     `data/year=2024/*.parquet`, `data/**/*.parquet`)
1094    pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1095        if self.source.is_s3() {
1096            return Err(AppError::Internal(format!(
1097                "dataset '{}': resolve_local_parquet_files called on s3 source",
1098                self.name
1099            )));
1100        }
1101        let loc = &self.source.location;
1102
1103        // Glob pattern? Expand and require at least one match.
1104        if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1105            let mut files: Vec<PathBuf> = glob::glob(loc)
1106                .map_err(|e| {
1107                    AppError::Internal(format!(
1108                        "dataset '{}': bad glob pattern '{loc}': {e}",
1109                        self.name
1110                    ))
1111                })?
1112                .filter_map(|r| r.ok())
1113                .filter(|p| {
1114                    p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1115                })
1116                .collect();
1117            files.sort();
1118            if files.is_empty() {
1119                return Err(AppError::Internal(format!(
1120                    "dataset '{}': glob '{loc}' matched no .parquet files",
1121                    self.name
1122                )));
1123            }
1124            return Ok(files);
1125        }
1126
1127        let path = Path::new(loc);
1128        if !path.exists() {
1129            return Err(AppError::Internal(format!(
1130                "dataset '{}': source path does not exist: {loc}",
1131                self.name
1132            )));
1133        }
1134
1135        if path.is_file() {
1136            if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1137                return Err(AppError::Internal(format!(
1138                    "dataset '{}': source must be a .parquet file",
1139                    self.name
1140                )));
1141            }
1142            return Ok(vec![path.to_path_buf()]);
1143        }
1144
1145        let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1146            .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1147            .filter_map(|entry| entry.ok().map(|e| e.path()))
1148            .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1149            .collect();
1150        files.sort();
1151        if files.is_empty() {
1152            return Err(AppError::Internal(format!(
1153                "dataset '{}': no *.parquet files found in {loc}",
1154                self.name
1155            )));
1156        }
1157        Ok(files)
1158    }
1159
1160    /// Env-var prefix derived from the dataset name: uppercase with
1161    /// non-alphanumeric chars replaced by `_`. E.g. `sales.eu-1` →
1162    /// `SALES_EU_1`.
1163    pub fn env_prefix(&self) -> String {
1164        self.name
1165            .chars()
1166            .map(|c| {
1167                if c.is_ascii_alphanumeric() {
1168                    c.to_ascii_uppercase()
1169                } else {
1170                    '_'
1171                }
1172            })
1173            .collect()
1174    }
1175
1176    /// Resolve S3 credentials following the precedence chain documented at
1177    /// the top of this module. Returns an empty struct when nothing was
1178    /// found — the caller should then leave credential resolution to the
1179    /// engine's default provider chain.
1180    pub fn resolved_creds(&self) -> ResolvedCreds {
1181        let prefix = self.env_prefix();
1182        let from_env = |suffix: &str| {
1183            std::env::var(format!("{prefix}_{suffix}"))
1184                .ok()
1185                .filter(|s| !s.is_empty())
1186        };
1187        let inline = self.s3.as_ref();
1188        let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1189
1190        ResolvedCreds {
1191            access_key_id: from_env("AWS_ACCESS_KEY_ID")
1192                .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1193                .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1194            secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1195                .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1196                .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1197            session_token: from_env("AWS_SESSION_TOKEN")
1198                .or_else(|| inline.and_then(|s| s.session_token.clone()))
1199                .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1200        }
1201    }
1202
1203    /// Resolved S3 region: per-dataset env (`${PREFIX}_AWS_REGION`)
1204    /// → inline → `AWS_REGION` → `AWS_DEFAULT_REGION` → `us-east-1`.
1205    pub fn resolved_region(&self) -> String {
1206        let prefix = self.env_prefix();
1207        std::env::var(format!("{prefix}_AWS_REGION"))
1208            .ok()
1209            .filter(|s| !s.is_empty())
1210            .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1211            .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1212            .or_else(|| {
1213                std::env::var("AWS_DEFAULT_REGION")
1214                    .ok()
1215                    .filter(|s| !s.is_empty())
1216            })
1217            .unwrap_or_else(|| "us-east-1".to_string())
1218    }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223    use super::*;
1224
1225    #[test]
1226    fn server_defaults() {
1227        let s = ServerConfig::default();
1228        assert_eq!(s.backend, Backend::Datafusion);
1229        assert_eq!(s.port, 8080);
1230        assert!(s.compress);
1231        assert_eq!(s.max_body_bytes, 1024 * 1024);
1232        assert_eq!(s.max_page_size, 100_000);
1233        assert_eq!(s.request_timeout_ms, 30_000);
1234        assert!(!s.quack.enabled);
1235        assert_eq!(s.quack.uri, "quack:localhost");
1236        assert!(s.quack.token.is_none());
1237        assert!(!s.quack.allow_other_hostname);
1238        assert!(s.quack.read_only);
1239        assert_eq!(s.prefix, "");
1240        assert!(s.listen.is_loopback());
1241    }
1242
1243    #[test]
1244    fn server_overrides_from_toml() {
1245        let toml = r#"
1246            [server]
1247            backend = "duckdb"
1248            port = 9000
1249            prefix = "/datapress"
1250            compress = false
1251            max_body_bytes = 4096
1252            max_page_size = 50000
1253            request_timeout_ms = 0
1254
1255            [server.quack]
1256            enabled = true
1257            uri = "quack:localhost:9495"
1258            token = "test-token"
1259            read_only = false
1260            [[dataset]]
1261            name = "x"
1262            source.kind = "parquet"
1263            source.location = "/tmp/missing.parquet"
1264        "#;
1265        let cfg: AppConfig = toml::from_str(toml).unwrap();
1266        assert_eq!(cfg.server.backend, Backend::Duckdb);
1267        assert_eq!(cfg.server.port, 9000);
1268        assert_eq!(cfg.server.prefix, "/datapress");
1269        assert!(!cfg.server.compress);
1270        assert_eq!(cfg.server.max_body_bytes, 4096);
1271        assert_eq!(cfg.server.max_page_size, 50_000);
1272        assert_eq!(cfg.server.request_timeout_ms, 0);
1273        assert!(cfg.server.quack.enabled);
1274        assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1275        assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1276        assert!(!cfg.server.quack.read_only);
1277        assert_eq!(cfg.datasets.len(), 1);
1278        assert_eq!(cfg.datasets[0].name, "x");
1279        assert!(cfg.datasets[0].dict_encode); // default
1280    }
1281
1282    #[test]
1283    fn validate_rejects_bad_prefix() {
1284        let bad = ["no-leading-slash", "/trailing/"];
1285        for p in bad {
1286            let cfg = AppConfig {
1287                server: ServerConfig {
1288                    prefix: p.to_string(),
1289                    ..Default::default()
1290                },
1291                docs: DocsConfig::default(),
1292                swagger: SwaggerConfig::default(),
1293                metrics: MetricsConfig::default(),
1294                explorer: ExplorerConfig::default(),
1295                auth: AuthConfig::default(),
1296                datasets: vec![],
1297            };
1298            assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1299        }
1300    }
1301
1302    #[test]
1303    fn normalize_lowercases_configured_scopes() {
1304        let mut cfg = AppConfig {
1305            server: ServerConfig::default(),
1306            docs: DocsConfig::default(),
1307            swagger: SwaggerConfig::default(),
1308            metrics: MetricsConfig::default(),
1309            explorer: ExplorerConfig::default(),
1310            auth: AuthConfig {
1311                read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1312                reload_scopes: vec!["Datasets:Reload".into()],
1313                ..Default::default()
1314            },
1315            datasets: vec![],
1316        };
1317        cfg.normalize();
1318        assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1319        assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1320    }
1321
1322    #[test]
1323    fn validate_rejects_no_datasets() {
1324        let cfg = AppConfig {
1325            server: ServerConfig::default(),
1326            docs: DocsConfig::default(),
1327            swagger: SwaggerConfig::default(),
1328            metrics: MetricsConfig::default(),
1329            explorer: ExplorerConfig::default(),
1330            auth: AuthConfig::default(),
1331            datasets: vec![],
1332        };
1333        let err = cfg.validate().unwrap_err();
1334        assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1335    }
1336
1337    #[cfg(feature = "auth")]
1338    #[test]
1339    fn validate_accepts_auth_issuer_with_trailing_slash() {
1340        use std::io::Write;
1341
1342        let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1343        let _ = std::fs::remove_dir_all(&dir);
1344        std::fs::create_dir_all(&dir).unwrap();
1345        let file = dir.join("a.parquet");
1346        std::fs::File::create(&file)
1347            .unwrap()
1348            .write_all(b"x")
1349            .unwrap();
1350
1351        let cfg = AppConfig {
1352            server: ServerConfig::default(),
1353            docs: DocsConfig::default(),
1354            swagger: SwaggerConfig::default(),
1355            metrics: MetricsConfig::default(),
1356            explorer: ExplorerConfig::default(),
1357            auth: AuthConfig {
1358                enabled: true,
1359                issuer: "https://tenant.example.com/".into(),
1360                ..Default::default()
1361            },
1362            datasets: vec![DatasetConfig {
1363                name: "x".into(),
1364                source: SourceConfig {
1365                    kind: SourceKind::Parquet,
1366                    location: file.to_string_lossy().into_owned(),
1367                },
1368                s3: None,
1369                index: IndexConfig::default(),
1370                columns: vec![],
1371                dict_encode: true,
1372                lazy: false,
1373            }],
1374        };
1375
1376        assert!(cfg.validate().is_ok());
1377        let _ = std::fs::remove_dir_all(&dir);
1378    }
1379
1380    #[test]
1381    fn validate_rejects_quack_non_local_host_without_override() {
1382        let cfg = AppConfig {
1383            server: ServerConfig {
1384                quack: QuackConfig {
1385                    enabled: true,
1386                    uri: "quack:127.0.0.1".into(),
1387                    token: Some("test-token".into()),
1388                    ..Default::default()
1389                },
1390                ..Default::default()
1391            },
1392            docs: DocsConfig::default(),
1393            swagger: SwaggerConfig::default(),
1394            metrics: MetricsConfig::default(),
1395            explorer: ExplorerConfig::default(),
1396            auth: AuthConfig::default(),
1397            datasets: vec![DatasetConfig {
1398                name: "x".into(),
1399                source: SourceConfig {
1400                    kind: SourceKind::Parquet,
1401                    location: "/tmp/missing.parquet".into(),
1402                },
1403                s3: None,
1404                index: IndexConfig::default(),
1405                columns: vec![],
1406                dict_encode: true,
1407                lazy: false,
1408            }],
1409        };
1410        let err = cfg.validate().unwrap_err();
1411        assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1412    }
1413
1414    #[test]
1415    fn validate_rejects_bad_dataset_name() {
1416        let cfg: AppConfig = toml::from_str(
1417            r#"
1418            [[dataset]]
1419            name = "bad name!"
1420            source.kind = "parquet"
1421            source.location = "/tmp/whatever"
1422        "#,
1423        )
1424        .unwrap();
1425        let err = cfg.validate().unwrap_err();
1426        assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1427    }
1428
1429    #[test]
1430    fn validate_rejects_duplicate_names() {
1431        use std::io::Write;
1432        let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1433        let _ = std::fs::remove_dir_all(&dir);
1434        std::fs::create_dir_all(&dir).unwrap();
1435        let f = dir.join("a.parquet");
1436        std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1437        let path = f.to_str().unwrap();
1438
1439        let cfg: AppConfig = toml::from_str(&format!(
1440            r#"
1441            [[dataset]]
1442            name = "a"
1443            source.kind = "parquet"
1444            source.location = "{path}"
1445            [[dataset]]
1446            name = "a"
1447            source.kind = "parquet"
1448            source.location = "{path}"
1449        "#
1450        ))
1451        .unwrap();
1452        let err = cfg.validate().expect_err("expected error");
1453        assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1454
1455        let _ = std::fs::remove_dir_all(&dir);
1456    }
1457
1458    #[test]
1459    fn s3_bucket_parsing() {
1460        let mk = |loc: &str| SourceConfig {
1461            kind: SourceKind::Parquet,
1462            location: loc.into(),
1463        };
1464        let s1 = mk("s3://bucket/path/key");
1465        assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1466        let s2 = mk("s3://only-bucket");
1467        assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1468        assert!(mk("s3:///nokey").s3_bucket().is_err());
1469        assert!(mk("/local/path").s3_bucket().is_err());
1470    }
1471
1472    #[test]
1473    fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1474        let mk = |loc: &str| SourceConfig {
1475            kind: SourceKind::Parquet,
1476            location: loc.into(),
1477        };
1478        // Plain prefix -> recursive parquet glob (trailing slash trimmed).
1479        assert_eq!(
1480            mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1481            "s3://bucket/logs/**/*.parquet"
1482        );
1483        assert_eq!(
1484            mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1485            "s3://bucket/logs/**/*.parquet"
1486        );
1487        // Already globbed -> unchanged.
1488        assert_eq!(
1489            mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1490            "s3://bucket/logs/*.parquet"
1491        );
1492        // Non-S3 -> unchanged.
1493        assert_eq!(
1494            mk("/local/logs").s3_recursive_parquet_glob(),
1495            "/local/logs"
1496        );
1497    }
1498
1499    #[test]
1500    fn effective_endpoint_folds_bucket_per_mode() {
1501        let virt = S3Config {
1502            endpoint: Some("https://s3.example.com".into()),
1503            addressing_style: AddressingStyle::Virtual,
1504            ..Default::default()
1505        };
1506        // Auto + virtual -> bucket folded into host.
1507        assert_eq!(
1508            virt.effective_endpoint("mybucket").as_deref(),
1509            Some("https://mybucket.s3.example.com")
1510        );
1511        // Idempotent: already-prefixed host is left alone.
1512        let prefixed = S3Config {
1513            endpoint: Some("https://mybucket.s3.example.com".into()),
1514            ..virt.clone()
1515        };
1516        assert_eq!(
1517            prefixed.effective_endpoint("mybucket").as_deref(),
1518            Some("https://mybucket.s3.example.com")
1519        );
1520        // Path style (auto) -> host untouched.
1521        let path = S3Config {
1522            addressing_style: AddressingStyle::Path,
1523            ..virt.clone()
1524        };
1525        assert_eq!(
1526            path.effective_endpoint("mybucket").as_deref(),
1527            Some("https://s3.example.com")
1528        );
1529        // Explicit overrides win over addressing style.
1530        let forced_off = S3Config {
1531            endpoint_bucket_in_host: BucketInHost::False,
1532            ..virt.clone()
1533        };
1534        assert_eq!(
1535            forced_off.effective_endpoint("mybucket").as_deref(),
1536            Some("https://s3.example.com")
1537        );
1538        let forced_on = S3Config {
1539            endpoint_bucket_in_host: BucketInHost::True,
1540            ..path.clone()
1541        };
1542        assert_eq!(
1543            forced_on.effective_endpoint("mybucket").as_deref(),
1544            Some("https://mybucket.s3.example.com")
1545        );
1546        // No endpoint -> None.
1547        assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1548    }
1549
1550    #[test]
1551    fn env_prefix_sanitises_name() {
1552        let mk = |name: &str| DatasetConfig {
1553            name: name.into(),
1554            source: SourceConfig {
1555                kind: SourceKind::Parquet,
1556                location: "x".into(),
1557            },
1558            s3: None,
1559            index: IndexConfig::default(),
1560            columns: vec![],
1561            dict_encode: true,
1562            lazy: false,
1563        };
1564        assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1565        assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1566        assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1567    }
1568
1569    #[test]
1570    fn resolve_local_parquet_single_file_and_dir() {
1571        use std::io::Write;
1572        let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1573        let _ = std::fs::remove_dir_all(&dir);
1574        std::fs::create_dir_all(&dir).unwrap();
1575        let f = dir.join("a.parquet");
1576        let mut fh = std::fs::File::create(&f).unwrap();
1577        fh.write_all(b"not really parquet").unwrap();
1578
1579        let mk = |loc: &str| DatasetConfig {
1580            name: "ds".into(),
1581            source: SourceConfig {
1582                kind: SourceKind::Parquet,
1583                location: loc.into(),
1584            },
1585            s3: None,
1586            index: IndexConfig::default(),
1587            columns: vec![],
1588            dict_encode: true,
1589            lazy: false,
1590        };
1591
1592        // Direct file.
1593        let files = mk(f.to_str().unwrap())
1594            .resolve_local_parquet_files()
1595            .unwrap();
1596        assert_eq!(files, vec![f.clone()]);
1597
1598        // Directory.
1599        let files = mk(dir.to_str().unwrap())
1600            .resolve_local_parquet_files()
1601            .unwrap();
1602        assert_eq!(files, vec![f.clone()]);
1603
1604        // Missing path.
1605        assert!(
1606            mk("/no/such/place.parquet")
1607                .resolve_local_parquet_files()
1608                .is_err()
1609        );
1610
1611        let _ = std::fs::remove_dir_all(&dir);
1612    }
1613}