Skip to main content

datapress_core/
config.rs

1//! Runtime configuration loaded from `datasets.toml`.
2//!
3//! Each instance binds to a list of datasets. A dataset's `[dataset.source]`
4//! block selects the format (`parquet` or `delta`) and the location (a
5//! local path or an `s3://bucket/key` URL). When the location is on S3,
6//! an optional `[dataset.s3]` block carries non-secret connection details
7//! (region, endpoint, addressing style, …).
8//!
9//! Credentials are resolved at runtime via [`DatasetConfig::resolved_creds`]
10//! in this precedence order:
11//!
12//! 1. Per-dataset env vars `${PREFIX}_AWS_ACCESS_KEY_ID`,
13//!    `${PREFIX}_AWS_SECRET_ACCESS_KEY`, `${PREFIX}_AWS_SESSION_TOKEN`
14//!    where `${PREFIX}` is the dataset name uppercased with non-alphanumeric
15//!    characters replaced by `_` (e.g. `accidents` → `ACCIDENTS`,
16//!    `sales.eu-1` → `SALES_EU_1`).
17//! 2. Inline `access_key_id` / `secret_access_key` / `session_token` in the
18//!    `[dataset.s3]` block.
19//! 3. Plain `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` /
20//!    `AWS_SESSION_TOKEN`.
21//! 4. None — fall back to the engine's own provider chain
22//!    (`~/.aws/credentials`, IMDS, …).
23
24use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32/// Mount paths the user MUST NOT pick for `[docs].path` or
33/// `[swagger].path` — they would shadow first-party routes (probes,
34/// API scopes, root).
35const RESERVED_MOUNTS: &[&str] = &[
36    "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39// ---------------------------------------------------------------------------
40// Public types
41// ---------------------------------------------------------------------------
42
43#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45    #[serde(default)]
46    pub server: ServerConfig,
47    #[serde(default)]
48    pub docs: DocsConfig,
49    #[serde(default)]
50    pub swagger: SwaggerConfig,
51    #[serde(default)]
52    pub metrics: MetricsConfig,
53    #[serde(default)]
54    pub explorer: ExplorerConfig,
55    #[serde(default)]
56    pub auth: AuthConfig,
57    #[serde(rename = "dataset", default)]
58    pub datasets: Vec<DatasetConfig>,
59}
60
61#[derive(Debug, Deserialize)]
62#[serde(default)]
63pub struct ServerConfig {
64    /// Which engine to run. Must match the binary's compile-time feature.
65    pub backend: Backend,
66    /// Listen address. Defaults to loopback (127.0.0.1) — explicitly opt in
67    /// to 0.0.0.0 if you want to expose the port.
68    pub listen: IpAddr,
69    /// TCP port.
70    pub port: u16,
71    /// Number of actix worker threads. `None` (= unset) → one per CPU.
72    pub workers: Option<usize>,
73    /// Optional URL path prefix — useful when sitting behind a reverse
74    /// proxy that rewrites e.g. `/datapress/...` → `/...`. When set, every
75    /// route is mounted under this prefix (so the proxy can pass the URL
76    /// through unchanged). Must start with `/` and not end with `/`; the
77    /// empty string (default) means no prefix.
78    pub prefix: String,
79    /// Negotiate response compression (gzip / brotli / zstd) via the
80    /// `Accept-Encoding` request header. Enabled by default. Disable when
81    /// running behind a proxy that already compresses, or when the extra
82    /// CPU is not worth the bandwidth saving.
83    pub compress: bool,
84    /// Maximum accepted JSON request body size, in bytes. Larger bodies
85    /// are rejected with `413 Payload Too Large` before any handler runs.
86    /// Default `1 MiB`. Most query bodies are well under 10 KiB; this is
87    /// a DoS guard, not a tuning knob.
88    pub max_body_bytes: usize,
89    /// Maximum rows returned by a single `/query` page. Larger
90    /// `page_size` values are clamped before the backend runs.
91    /// Default `100_000`.
92    pub max_page_size: u64,
93    /// Per-request handler timeout, in milliseconds. If a handler hasn't
94    /// produced a response within this budget the request is aborted with
95    /// `504 Gateway Timeout`. Default `30_000` (30 s). Set `0` to disable.
96    pub request_timeout_ms: u64,
97    /// Grace period for in-flight requests after the server has received
98    /// `SIGTERM` / `SIGINT`, in seconds. The listening socket is closed
99    /// immediately; existing connections then have up to this many
100    /// seconds to finish before workers are force-stopped. Default `30`.
101    pub shutdown_timeout_secs: u64,
102    /// Optional DuckDB Quack remote SQL server. Only used by the DuckDB
103    /// backend; ignored by DataFusion.
104    pub quack: QuackConfig,
105}
106
107impl Default for ServerConfig {
108    fn default() -> Self {
109        Self {
110            backend: Backend::default(),
111            listen: IpAddr::from([127, 0, 0, 1]),
112            port: 8080,
113            workers: None,
114            prefix: String::new(),
115            compress: true,
116            max_body_bytes: 1024 * 1024,
117            max_page_size: 100_000,
118            request_timeout_ms: 30_000,
119            shutdown_timeout_secs: 30,
120            quack: QuackConfig::default(),
121        }
122    }
123}
124
125/// Experimental DuckDB Quack remote protocol server.
126///
127/// Quack exposes the DuckDB SQL surface of the in-process database. Keep it
128/// disabled unless you intentionally want DuckDB clients to attach/query this
129/// process directly.
130#[derive(Debug, Clone, Deserialize)]
131#[serde(default)]
132pub struct QuackConfig {
133    /// Install/load the Quack extension and start `quack_serve` after
134    /// datasets are registered.
135    pub enabled: bool,
136    /// Quack URI to listen on. `quack:localhost` uses DuckDB's default
137    /// port 9494.
138    pub uri: String,
139    /// Optional explicit authentication token. If omitted, Quack generates
140    /// one at startup and DataPress logs it once.
141    pub token: Option<String>,
142    /// Allow binding a non-local hostname such as `quack:0.0.0.0:9494`.
143    /// For external exposure, put a TLS-terminating reverse proxy in front.
144    pub allow_other_hostname: bool,
145    /// Install a read-only authorization macro for remote queries. Enabled
146    /// by default to match DataPress' read-oriented HTTP API.
147    pub read_only: bool,
148}
149
150impl Default for QuackConfig {
151    fn default() -> Self {
152        Self {
153            enabled: false,
154            uri: "quack:localhost".into(),
155            token: None,
156            allow_other_hostname: false,
157            read_only: true,
158        }
159    }
160}
161
162impl QuackConfig {
163    /// Validate the enabled Quack configuration against DuckDB's current
164    /// safety rules. The extension treats only the literal `localhost` as
165    /// local unless `allow_other_hostname` is set.
166    pub fn validate_enabled(&self) -> Result<(), AppError> {
167        if self.uri.trim().is_empty() {
168            return Err(AppError::Internal(
169                "server.quack.uri must not be empty when server.quack.enabled = true".into(),
170            ));
171        }
172        if !self.uri.starts_with("quack:") {
173            return Err(AppError::Internal(format!(
174                "server.quack.uri must start with 'quack:' (got '{}')",
175                self.uri
176            )));
177        }
178        if !self.allow_other_hostname {
179            let host = self.hostname().unwrap_or_default();
180            if host != "localhost" {
181                return Err(AppError::Internal(format!(
182                    "server.quack.uri host must be 'localhost' unless \
183                     server.quack.allow_other_hostname = true (got '{}')",
184                    self.uri
185                )));
186            }
187        }
188        if let Some(token) = self.token.as_deref()
189            && token.len() < 4
190        {
191            return Err(AppError::Internal(
192                "server.quack.token must be at least 4 characters".into(),
193            ));
194        }
195        Ok(())
196    }
197
198    fn hostname(&self) -> Option<&str> {
199        let rest = self.uri.strip_prefix("quack:")?;
200        let rest = rest.strip_prefix("//").unwrap_or(rest);
201        let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
202        (!host.is_empty()).then_some(host)
203    }
204}
205
206#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
207#[serde(rename_all = "lowercase")]
208pub enum Backend {
209    #[default]
210    Datafusion,
211    Duckdb,
212}
213
214/// Embedded MkDocs documentation site (`[docs]` block).
215///
216/// Enabled by default — when the binary was built with the `docs`
217/// cargo feature, the site is served at [`DocsConfig::path`] out of
218/// the box. Set `enabled = false` in `datasets.toml` to suppress it
219/// (e.g. in prod). When the binary was built without the feature,
220/// `enabled = true` is harmless: the server logs a warning at startup
221/// and skips the mount. The mount path must be a non-trivial sub-path;
222/// reserved API and probe roots are rejected at startup.
223#[derive(Debug, Clone, Deserialize)]
224#[serde(default, deny_unknown_fields)]
225pub struct DocsConfig {
226    pub enabled: bool,
227    pub path: String,
228}
229
230impl Default for DocsConfig {
231    fn default() -> Self {
232        Self {
233            enabled: true,
234            path: "/mkdocs".into(),
235        }
236    }
237}
238
239/// Swagger UI + embedded OpenAPI spec (`[swagger]` block).
240///
241/// Enabled by default — when the binary was built with the `swagger`
242/// cargo feature, an interactive Swagger UI is served at
243/// [`SwaggerConfig::path`] (default `/docs`) and the raw OpenAPI JSON
244/// at `<path>/openapi.json`. Set `enabled = false` in `datasets.toml`
245/// to suppress it (e.g. in prod). When the binary was built without
246/// the feature, `enabled = true` is harmless: the server logs a
247/// warning at startup and skips the mount.
248///
249/// To let users sign in to the UI itself (Authorization Code + PKCE
250/// against any OIDC provider), populate the optional `[swagger.oauth2]`
251/// sub-block. Acquired tokens are attached as `Authorization: Bearer …`
252/// to every "Try it out" request — useful for exercising auth-protected
253/// endpoints from the docs page. This drives the UI only; it does not
254/// turn on server-side token validation.
255#[derive(Debug, Clone, Deserialize)]
256#[serde(default, deny_unknown_fields)]
257pub struct SwaggerConfig {
258    pub enabled: bool,
259    pub path: String,
260    pub oauth2: Option<SwaggerOAuth2Config>,
261}
262
263impl Default for SwaggerConfig {
264    fn default() -> Self {
265        Self {
266            enabled: true,
267            path: "/docs".into(),
268            oauth2: None,
269        }
270    }
271}
272
273/// OIDC single-sign-on for the Swagger UI (`[swagger.oauth2]`).
274///
275/// Configures the UI to drive an Authorization Code + PKCE flow against
276/// the given OIDC issuer. Swagger UI auto-discovers the authorize /
277/// token endpoints from `<issuer>/.well-known/openid-configuration`,
278/// so we don't need to pin them here.
279///
280/// All fields are required when the block is present — there is no
281/// sensible default for `issuer` or `client_id`.
282#[derive(Debug, Clone, Deserialize)]
283#[serde(deny_unknown_fields)]
284pub struct SwaggerOAuth2Config {
285    /// OIDC issuer URL, e.g.
286    /// `https://login.microsoftonline.com/<tenant>/v2.0` or
287    /// `https://accounts.google.com`. Must not end in `/`.
288    pub issuer: String,
289    /// Public OAuth2 client identifier registered with the IdP. The
290    /// client must be a SPA / public client (no secret) with
291    /// `https://<your-host>{swagger.path}/oauth2-redirect.html` listed
292    /// as an allowed redirect URI.
293    pub client_id: String,
294    /// Scopes to request by default. Will be pre-checked in the Swagger
295    /// UI authorize dialog; users can edit them before signing in.
296    /// `openid` is always added if missing.
297    #[serde(default)]
298    pub scopes: Vec<String>,
299    /// Use PKCE for the authorization code flow. Defaults to `true`;
300    /// disable only if your IdP doesn't support PKCE for public clients.
301    #[serde(default = "default_true")]
302    pub pkce: bool,
303}
304
305/// Prometheus metrics endpoint (`[metrics]` block).
306///
307/// Disabled by default. When `enabled = true` (and the binary was built
308/// with the `metrics` cargo feature), the server installs a middleware
309/// that records per-request HTTP counters and latency histograms, and
310/// exposes them in the Prometheus text exposition format at
311/// [`MetricsConfig::path`] (default `/metrics`).
312///
313/// The endpoint is mounted at a fixed, *unprefixed* path — like the
314/// health probes — so a scrape config doesn't need to know about any
315/// reverse-proxy `server.prefix`. It is **not** behind the `[auth]`
316/// layer: Prometheus scrapers rarely carry bearer tokens, and the
317/// endpoint exposes only aggregate request metrics (no row data). Keep
318/// it on a network the scraper can reach but the public cannot, e.g. by
319/// binding `server.listen` to a private interface.
320///
321/// When the binary was built without the `metrics` feature,
322/// `enabled = true` is harmless: the server logs a warning at startup
323/// and skips the endpoint.
324#[derive(Debug, Clone, Deserialize)]
325#[serde(default, deny_unknown_fields)]
326pub struct MetricsConfig {
327    pub enabled: bool,
328    pub path: String,
329}
330
331impl Default for MetricsConfig {
332    fn default() -> Self {
333        Self {
334            enabled: false,
335            path: "/metrics".into(),
336        }
337    }
338}
339
340/// Embedded dataset explorer UI (`[explorer]` block).
341///
342/// A server-rendered web app (Actix + Askama templates + htmx +
343/// Bootstrap) served at [`ExplorerConfig::path`] (default `/explore`).
344/// It offers a *discovery* view — per-dataset stats, schema, index and
345/// source configuration — and an in-browser *DuckDB* console (DuckDB-WASM)
346/// that queries each dataset's Parquet export directly.
347///
348/// Enabled by default. When the binary was built without the `explorer`
349/// cargo feature, `enabled = true` is harmless: the server logs a warning
350/// at startup and skips the mount. Set `enabled = false` to suppress it at
351/// runtime even when the feature is compiled in.
352#[derive(Debug, Clone, Deserialize)]
353#[serde(default, deny_unknown_fields)]
354pub struct ExplorerConfig {
355    pub enabled: bool,
356    pub path: String,
357}
358
359impl Default for ExplorerConfig {
360    fn default() -> Self {
361        Self {
362            enabled: true,
363            path: "/explore".into(),
364        }
365    }
366}
367
368/// OIDC bearer-token enforcement for the HTTP API (`[auth]` block).
369///
370/// Disabled by default. When `enabled = true`, the server validates
371/// every request's `Authorization: Bearer …` JWT against the JWKS
372/// discovered from the issuer's OIDC metadata
373/// (`<issuer>/.well-known/openid-configuration` → `jwks_uri`), then
374/// enforces the configured scope requirements per route.
375///
376/// Only compiled in when the binary was built with the `auth` cargo
377/// feature. Without the feature, `enabled = true` is rejected at
378/// startup so a misconfigured production deployment can't silently
379/// fall back to "no auth".
380///
381/// The Swagger UI's SSO support (`[swagger.oauth2]`) is *independent*
382/// of this block — `[swagger.oauth2]` only drives the UI's login
383/// dialog; `[auth]` is what enforces tokens on the API.
384#[derive(Debug, Clone, Deserialize)]
385#[serde(default, deny_unknown_fields)]
386pub struct AuthConfig {
387    /// Master switch. `false` (default) skips all auth processing.
388    pub enabled: bool,
389    /// OIDC issuer URL — must match the `iss` claim of every accepted
390    /// token. Required when `enabled = true`.
391    pub issuer: String,
392    /// Expected `aud` claim. When empty, audience validation is
393    /// skipped (not recommended in production).
394    pub audience: String,
395    /// Scopes a caller must hold to read datasets (GET endpoints +
396    /// POST `…/query` and `…/count`). Empty list means "no scope check,
397    /// just a valid token is enough".
398    pub read_scopes: Vec<String>,
399    /// Scopes required for admin/mutation endpoints (POST `…/reload`).
400    /// Empty list means "no scope check, just a valid token is enough".
401    pub reload_scopes: Vec<String>,
402    /// Allow unauthenticated GETs through. Useful for public datasets
403    /// and demo deployments. Defaults to `false`.
404    pub anonymous_read: bool,
405    /// Continue serving even if the JWKS fetch fails at startup.
406    /// When `true` (default), the server starts in a degraded mode that
407    /// rejects every auth'd request with 503 until JWKS becomes
408    /// reachable. When `false`, startup fails outright.
409    pub start_degraded: bool,
410    /// Allowed signing algorithms. Pinned to RS256 by default; never
411    /// include `HS*` or `none` here unless you really know what you're
412    /// doing.
413    pub algorithms: Vec<String>,
414    /// Clock-skew leeway for `exp`/`nbf` checks, in seconds.
415    pub leeway_secs: u64,
416    /// How often (in seconds) the background refresher re-fetches the
417    /// JWKS. On a `kid` cache miss the JWKS is also refreshed
418    /// out-of-band.
419    pub jwks_refresh_secs: u64,
420    /// Optional JSON-pointer into the JWT claims that extracts a
421    /// tenant identifier — attached to the principal and logged on
422    /// every request. Example: `"/tid"` (Azure AD), `"/org_id"`.
423    /// When empty, no tenant is extracted.
424    pub tenant_claim: String,
425    /// If non-empty, requests whose extracted tenant ID is not in this
426    /// list are rejected with 403. Has no effect when `tenant_claim`
427    /// is empty.
428    pub allowed_tenants: Vec<String>,
429    /// If `true`, `POST …/reload` accepts *either* a valid token with
430    /// `reload_scopes` *or* the legacy `X-Admin-Token` header. Defaults
431    /// to `true` for one-release backwards compatibility — flip to
432    /// `false` once your automation has migrated to OIDC.
433    pub admin_token_fallback: bool,
434}
435
436impl Default for AuthConfig {
437    fn default() -> Self {
438        Self {
439            enabled: false,
440            issuer: String::new(),
441            audience: String::new(),
442            read_scopes: Vec::new(),
443            reload_scopes: Vec::new(),
444            anonymous_read: false,
445            start_degraded: true,
446            algorithms: vec!["RS256".into()],
447            leeway_secs: 60,
448            jwks_refresh_secs: 3600,
449            tenant_claim: String::new(),
450            allowed_tenants: Vec::new(),
451            admin_token_fallback: true,
452        }
453    }
454}
455
456impl Backend {
457    pub fn as_str(self) -> &'static str {
458        match self {
459            Backend::Datafusion => "datafusion",
460            Backend::Duckdb => "duckdb",
461        }
462    }
463}
464
465#[derive(Debug, Clone, Deserialize)]
466pub struct DatasetConfig {
467    pub name: String,
468    pub source: SourceConfig,
469    #[serde(default)]
470    pub s3: Option<S3Config>,
471    #[serde(default)]
472    pub index: IndexConfig,
473    /// Optional column projection applied at load time. When non-empty,
474    /// only the listed columns are read from the parquet/delta source —
475    /// every other column is skipped entirely (no decode, no allocation,
476    /// no resident memory). Empty (default) = read all columns. Names are
477    /// matched case-insensitively against the source schema.
478    #[serde(default)]
479    pub columns: Vec<String>,
480    /// When `true` (default), Utf8 columns that are dictionary-encoded in
481    /// the source parquet are read as Arrow `Dictionary(Int32, Utf8)`
482    /// instead of being expanded to plain Utf8. Massively cheaper in RAM
483    /// for low-cardinality columns. Set to `false` to bypass the override
484    /// — useful as a workaround if you observe null-handling oddities on
485    /// a particular parquet file.
486    #[serde(default = "default_true")]
487    pub dict_encode: bool,
488    /// When `true`, the backend should keep the dataset on disk and stream
489    /// it at query time instead of materialising it into RAM at startup.
490    /// Trades the in-memory hot paths (raw Arrow slice, equality index)
491    /// for bounded memory use on large / multi-file sources. Currently
492    /// honoured by the DataFusion backend for local parquet.
493    #[serde(default)]
494    pub lazy: bool,
495}
496
497fn default_true() -> bool {
498    true
499}
500
501#[derive(Debug, Clone, Deserialize)]
502pub struct SourceConfig {
503    pub kind: SourceKind,
504    /// Either a local filesystem path or an `s3://bucket/key` URL.
505    pub location: String,
506}
507
508#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
509#[serde(rename_all = "lowercase")]
510pub enum SourceKind {
511    #[default]
512    Parquet,
513    Delta,
514}
515
516impl SourceKind {
517    pub fn as_str(self) -> &'static str {
518        match self {
519            SourceKind::Parquet => "parquet",
520            SourceKind::Delta => "delta",
521        }
522    }
523}
524
525/// Non-secret S3 connection settings. Credentials are pulled from env / the
526/// AWS credential chain — see [`DatasetConfig::resolved_creds`].
527#[derive(Debug, Clone, Deserialize)]
528#[serde(default)]
529pub struct S3Config {
530    pub region: Option<String>,
531    /// Custom endpoint (MinIO, R2, Wasabi, LocalStack, …). Omit for AWS.
532    pub endpoint: Option<String>,
533    /// `virtual` (default — `bucket.host`) or `path` (`host/bucket/`).
534    /// MinIO and most non-AWS providers require `path`.
535    pub addressing_style: AddressingStyle,
536    /// Allow plain-HTTP endpoints. Required for local MinIO over `http://…`.
537    pub allow_http: bool,
538    /// Inline credentials. Strongly discouraged in production — prefer env
539    /// vars (see module docs).
540    pub access_key_id: Option<String>,
541    pub secret_access_key: Option<String>,
542    pub session_token: Option<String>,
543    /// Hive partition-column handling for parquet sources. Defaults to
544    /// `auto` (detect from the path). See [`Partitioning`].
545    pub partitioning: Partitioning,
546    /// Whether the bucket is folded into a custom `endpoint` host for
547    /// virtual-hosted-style requests. Defaults to `auto`. See
548    /// [`BucketInHost`].
549    pub endpoint_bucket_in_host: BucketInHost,
550}
551
552impl Default for S3Config {
553    fn default() -> Self {
554        Self {
555            region: None,
556            endpoint: None,
557            addressing_style: AddressingStyle::Virtual,
558            allow_http: false,
559            access_key_id: None,
560            secret_access_key: None,
561            session_token: None,
562            partitioning: Partitioning::Auto,
563            endpoint_bucket_in_host: BucketInHost::Auto,
564        }
565    }
566}
567
568impl S3Config {
569    /// Resolve the endpoint URL to hand to the object store, optionally
570    /// folding `bucket` into the host for virtual-hosted-style requests so a
571    /// plain `endpoint` works the same way it does on DuckDB.
572    ///
573    /// Returns `None` when no custom endpoint is configured (AWS default).
574    /// The bucket is only prepended when it isn't already the leading host
575    /// label, so re-running this (or a config that already embeds the
576    /// bucket) never produces `bucket.bucket.host`.
577    pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
578        let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
579
580        let fold = match self.endpoint_bucket_in_host {
581            BucketInHost::False => false,
582            BucketInHost::True => true,
583            BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
584        };
585        if !fold {
586            return Some(ep.to_string());
587        }
588
589        let (scheme, host_and_path) = match ep.split_once("://") {
590            Some((s, rest)) => (Some(s), rest),
591            None => (None, ep),
592        };
593        // Split host from any trailing path so we prefix the host label only.
594        let (host, path) = match host_and_path.split_once('/') {
595            Some((h, p)) => (h, Some(p)),
596            None => (host_and_path, None),
597        };
598        // Guard against double-prefixing.
599        if host == bucket || host.starts_with(&format!("{bucket}.")) {
600            return Some(ep.to_string());
601        }
602        let new_host = format!("{bucket}.{host}");
603        let rebuilt = match (scheme, path) {
604            (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
605            (Some(s), None) => format!("{s}://{new_host}"),
606            (None, Some(p)) => format!("{new_host}/{p}"),
607            (None, None) => new_host,
608        };
609        Some(rebuilt)
610    }
611}
612
613#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
614#[serde(rename_all = "lowercase")]
615pub enum AddressingStyle {
616    #[default]
617    Virtual,
618    Path,
619}
620
621impl AddressingStyle {
622    pub fn as_str(self) -> &'static str {
623        match self {
624            AddressingStyle::Virtual => "virtual",
625            AddressingStyle::Path => "path",
626        }
627    }
628}
629
630/// How hive-style partition columns (`key=value/` path segments) are handled
631/// for an S3 parquet source. Local parquet always auto-detects; this option
632/// brings S3 in line and lets you force or disable the behaviour.
633#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
634#[serde(rename_all = "lowercase")]
635pub enum Partitioning {
636    /// Detect `key=value` segments from the location glob or by listing the
637    /// prefix. No partition columns are added when none are found.
638    #[default]
639    Auto,
640    /// Force hive partitioning. Partition keys are taken from the location
641    /// glob, or discovered by listing the prefix.
642    Hive,
643    /// Treat the source as a flat set of parquet files — never add partition
644    /// columns even if the path looks hive-partitioned.
645    None,
646}
647
648impl Partitioning {
649    pub fn as_str(self) -> &'static str {
650        match self {
651            Partitioning::Auto => "auto",
652            Partitioning::Hive => "hive",
653            Partitioning::None => "none",
654        }
655    }
656}
657
658/// Whether the bucket name is folded into the endpoint hostname for
659/// virtual-hosted-style requests against a custom endpoint. This aligns the
660/// DataFusion object-store path with DuckDB, which builds the virtual host
661/// itself — so the same plain `endpoint` works on both backends.
662#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
663#[serde(rename_all = "lowercase")]
664pub enum BucketInHost {
665    /// Fold the bucket into the host when `addressing_style = "virtual"` and
666    /// a custom `endpoint` is set (guarded against double-prefixing).
667    #[default]
668    Auto,
669    /// Always fold the bucket into the endpoint host.
670    True,
671    /// Never rewrite the endpoint — pass it through verbatim.
672    False,
673}
674
675impl BucketInHost {
676    pub fn as_str(self) -> &'static str {
677        match self {
678            BucketInHost::Auto => "auto",
679            BucketInHost::True => "true",
680            BucketInHost::False => "false",
681        }
682    }
683}
684
685#[derive(Debug, Clone, Deserialize)]
686#[serde(default)]
687pub struct IndexConfig {
688    pub mode: IndexMode,
689    pub columns: Vec<String>,
690    pub max_cardinality: usize,
691}
692
693impl Default for IndexConfig {
694    fn default() -> Self {
695        Self {
696            mode: IndexMode::Auto,
697            columns: Vec::new(),
698            max_cardinality: 100_000,
699        }
700    }
701}
702
703#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
704#[serde(rename_all = "lowercase")]
705pub enum IndexMode {
706    #[default]
707    Auto,
708    None,
709    List,
710}
711
712/// Resolved S3 credentials. `None` fields mean "let the engine's default
713/// provider chain figure it out".
714#[derive(Debug, Clone, Default)]
715pub struct ResolvedCreds {
716    pub access_key_id: Option<String>,
717    pub secret_access_key: Option<String>,
718    pub session_token: Option<String>,
719}
720
721impl ResolvedCreds {
722    pub fn has_keypair(&self) -> bool {
723        self.access_key_id.is_some() && self.secret_access_key.is_some()
724    }
725}
726
727// ---------------------------------------------------------------------------
728// Loading + validation
729// ---------------------------------------------------------------------------
730
731impl AppConfig {
732    /// Read and validate a TOML config file.
733    pub fn load(path: &str) -> Result<Self, AppError> {
734        let raw = std::fs::read_to_string(path)
735            .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
736        let mut cfg: AppConfig =
737            toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
738        cfg.normalize();
739        cfg.validate()?;
740        Ok(cfg)
741    }
742
743    /// Canonicalise fields that are compared case-insensitively at runtime.
744    ///
745    /// Token scopes are lowercased when parsed out of a JWT (see `auth.rs`),
746    /// so the configured `read_scopes` / `reload_scopes` are lowercased here
747    /// once at load time. Without this an operator who writes
748    /// `"Datasets:Read"` would silently 403 every caller, since the token
749    /// side would have become `datasets:read`.
750    fn normalize(&mut self) {
751        for s in self
752            .auth
753            .read_scopes
754            .iter_mut()
755            .chain(self.auth.reload_scopes.iter_mut())
756        {
757            *s = s.to_ascii_lowercase();
758        }
759    }
760
761    fn validate(&self) -> Result<(), AppError> {
762        // Server prefix: empty, or must start with '/' and not end with '/'.
763        let p = &self.server.prefix;
764        if !p.is_empty() {
765            if !p.starts_with('/') {
766                return Err(AppError::Internal(format!(
767                    "server.prefix must start with '/' (got '{p}')"
768                )));
769            }
770            if p.ends_with('/') {
771                return Err(AppError::Internal(format!(
772                    "server.prefix must not end with '/' (got '{p}')"
773                )));
774            }
775        }
776
777        if self.datasets.is_empty() {
778            return Err(AppError::Internal(
779                "datasets.toml has no [[dataset]] entries".into(),
780            ));
781        }
782
783        if self.server.quack.enabled {
784            self.server.quack.validate_enabled()?;
785        }
786
787        // Validate the docs mount path even when the section is disabled,
788        // so an inactive config typo can't go unnoticed.
789        {
790            let dp = &self.docs.path;
791            if !dp.starts_with('/') {
792                return Err(AppError::Internal(format!(
793                    "docs.path must start with '/' (got '{dp}')"
794                )));
795            }
796            if dp.len() > 1 && dp.ends_with('/') {
797                return Err(AppError::Internal(format!(
798                    "docs.path must not end with '/' (got '{dp}')"
799                )));
800            }
801            if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
802                return Err(AppError::Internal(format!(
803                    "docs.path '{dp}' collides with a reserved route"
804                )));
805            }
806        }
807
808        // Same for the swagger UI mount.
809        {
810            let sp = &self.swagger.path;
811            if !sp.starts_with('/') {
812                return Err(AppError::Internal(format!(
813                    "swagger.path must start with '/' (got '{sp}')"
814                )));
815            }
816            if sp.len() > 1 && sp.ends_with('/') {
817                return Err(AppError::Internal(format!(
818                    "swagger.path must not end with '/' (got '{sp}')"
819                )));
820            }
821            if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
822                return Err(AppError::Internal(format!(
823                    "swagger.path '{sp}' collides with a reserved route"
824                )));
825            }
826            if sp == &self.docs.path {
827                return Err(AppError::Internal(format!(
828                    "swagger.path and docs.path must differ (both '{sp}')"
829                )));
830            }
831            if let Some(o) = &self.swagger.oauth2 {
832                if o.issuer.trim().is_empty() {
833                    return Err(AppError::Internal(
834                        "swagger.oauth2.issuer must not be empty".into(),
835                    ));
836                }
837                if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
838                    return Err(AppError::Internal(format!(
839                        "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
840                        o.issuer
841                    )));
842                }
843                if o.client_id.trim().is_empty() {
844                    return Err(AppError::Internal(
845                        "swagger.oauth2.client_id must not be empty".into(),
846                    ));
847                }
848            }
849        }
850
851        // Metrics endpoint mount path. Validated even when disabled so an
852        // inactive config typo can't go unnoticed. `/metrics` is itself a
853        // reserved mount (so docs/swagger can't shadow it), so we check the
854        // remaining reserved routes — and the docs/swagger paths — for
855        // collisions rather than the whole list.
856        {
857            let mp = &self.metrics.path;
858            if !mp.starts_with('/') {
859                return Err(AppError::Internal(format!(
860                    "metrics.path must start with '/' (got '{mp}')"
861                )));
862            }
863            if mp.len() > 1 && mp.ends_with('/') {
864                return Err(AppError::Internal(format!(
865                    "metrics.path must not end with '/' (got '{mp}')"
866                )));
867            }
868            if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
869                return Err(AppError::Internal(format!(
870                    "metrics.path '{mp}' collides with a reserved route"
871                )));
872            }
873            if mp == &self.docs.path {
874                return Err(AppError::Internal(format!(
875                    "metrics.path and docs.path must differ (both '{mp}')"
876                )));
877            }
878            if mp == &self.swagger.path {
879                return Err(AppError::Internal(format!(
880                    "metrics.path and swagger.path must differ (both '{mp}')"
881                )));
882            }
883        }
884
885        // Explorer UI mount path. Validated even when disabled so an
886        // inactive config typo can't go unnoticed.
887        {
888            let ep = &self.explorer.path;
889            if !ep.starts_with('/') {
890                return Err(AppError::Internal(format!(
891                    "explorer.path must start with '/' (got '{ep}')"
892                )));
893            }
894            if ep.len() > 1 && ep.ends_with('/') {
895                return Err(AppError::Internal(format!(
896                    "explorer.path must not end with '/' (got '{ep}')"
897                )));
898            }
899            if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
900                return Err(AppError::Internal(format!(
901                    "explorer.path '{ep}' collides with a reserved route"
902                )));
903            }
904            if ep == &self.docs.path {
905                return Err(AppError::Internal(format!(
906                    "explorer.path and docs.path must differ (both '{ep}')"
907                )));
908            }
909            if ep == &self.swagger.path {
910                return Err(AppError::Internal(format!(
911                    "explorer.path and swagger.path must differ (both '{ep}')"
912                )));
913            }
914            if ep == &self.metrics.path {
915                return Err(AppError::Internal(format!(
916                    "explorer.path and metrics.path must differ (both '{ep}')"
917                )));
918            }
919        }
920
921        // Auth block — only meaningful when `enabled = true`. The cargo
922        // feature gate is enforced separately in `server::serve` so a
923        // binary built without `--features auth` and a config with
924        // `auth.enabled = true` aborts with a clear error.
925        if self.auth.enabled {
926            let a = &self.auth;
927            if a.issuer.trim().is_empty() {
928                return Err(AppError::Internal(
929                    "auth.issuer must not be empty when auth.enabled = true".into(),
930                ));
931            }
932            if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
933                return Err(AppError::Internal(format!(
934                    "auth.issuer must be an absolute http(s) URL (got '{}')",
935                    a.issuer
936                )));
937            }
938            for alg in &a.algorithms {
939                match alg.as_str() {
940                    "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
941                    | "PS512" => {}
942                    other => {
943                        return Err(AppError::Internal(format!(
944                            "auth.algorithms[{other}] is not allowed; pick one of \
945                         RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
946                        )));
947                    }
948                }
949            }
950            if a.algorithms.is_empty() {
951                return Err(AppError::Internal(
952                    "auth.algorithms must not be empty".into(),
953                ));
954            }
955            if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
956                return Err(AppError::Internal(format!(
957                    "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
958                    a.tenant_claim
959                )));
960            }
961            if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
962                return Err(AppError::Internal(
963                    "auth.allowed_tenants is set but auth.tenant_claim is empty — \
964                     can't enforce a tenant allow-list without a claim to extract from"
965                        .into(),
966                ));
967            }
968        }
969
970        let mut seen = HashSet::new();
971        for d in &self.datasets {
972            if !seen.insert(d.name.as_str()) {
973                return Err(AppError::Internal(format!(
974                    "duplicate dataset name: {}",
975                    d.name
976                )));
977            }
978            if d.name.is_empty() {
979                return Err(AppError::Internal("dataset name must not be empty".into()));
980            }
981            // URL-safe: alphanum + _ - .
982            if !d
983                .name
984                .chars()
985                .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
986            {
987                return Err(AppError::Internal(format!(
988                    "dataset name '{}' must be alphanumeric (plus _ - .)",
989                    d.name
990                )));
991            }
992
993            if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
994                return Err(AppError::Internal(format!(
995                    "dataset '{}': index.mode = 'list' requires non-empty index.columns",
996                    d.name
997                )));
998            }
999
1000            // Location-specific checks.
1001            if d.source.is_s3() {
1002                d.source.s3_bucket()?;
1003                if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1004                    && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1005                    && std::env::var("AWS_REGION").is_err()
1006                    && std::env::var("AWS_DEFAULT_REGION").is_err()
1007                {
1008                    log::warn!(
1009                        "dataset '{}': S3 source without explicit region — \
1010                         relying on AWS_REGION env var",
1011                        d.name
1012                    );
1013                }
1014            } else {
1015                // Local path. For parquet we can fully resolve to a file
1016                // list up front; for delta we only check that the directory
1017                // exists (delta has its own layout — _delta_log/, …).
1018                match d.source.kind {
1019                    SourceKind::Parquet => {
1020                        d.resolve_local_parquet_files()?;
1021                    }
1022                    SourceKind::Delta => {
1023                        let p = Path::new(&d.source.location);
1024                        if !p.exists() {
1025                            return Err(AppError::Internal(format!(
1026                                "dataset '{}': delta location does not exist: {}",
1027                                d.name, d.source.location
1028                            )));
1029                        }
1030                    }
1031                }
1032            }
1033        }
1034        Ok(())
1035    }
1036}
1037
1038impl SourceConfig {
1039    pub fn is_s3(&self) -> bool {
1040        self.location.starts_with("s3://")
1041    }
1042
1043    /// True when the location already contains a glob metacharacter
1044    /// (`*`, `?`, or `[`).
1045    pub fn has_glob(&self) -> bool {
1046        self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1047    }
1048
1049    /// Location to hand to a backend that needs an explicit parquet glob
1050    /// (DuckDB). When the location is a plain S3 prefix with no glob, append
1051    /// a recursive `**/*.parquet` so DuckDB lists the prefix the same way
1052    /// DataFusion's object-store listing does. Globbed or non-S3 locations
1053    /// are returned unchanged.
1054    pub fn s3_recursive_parquet_glob(&self) -> String {
1055        if !self.is_s3() || self.has_glob() {
1056            return self.location.clone();
1057        }
1058        let trimmed = self.location.trim_end_matches('/');
1059        format!("{trimmed}/**/*.parquet")
1060    }
1061
1062    /// Returns `(bucket, key_prefix_or_empty)` for an `s3://…` location.
1063    pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1064        let rest = self
1065            .location
1066            .strip_prefix("s3://")
1067            .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1068        let (bucket, key) = match rest.split_once('/') {
1069            Some((b, k)) => (b, k),
1070            None => (rest, ""),
1071        };
1072        if bucket.is_empty() {
1073            return Err(AppError::Internal(format!(
1074                "s3 URL missing bucket: {}",
1075                self.location
1076            )));
1077        }
1078        Ok((bucket, key))
1079    }
1080}
1081
1082impl DatasetConfig {
1083    /// Expand `source.location` to a concrete list of local `.parquet`
1084    /// files. Only valid for `kind = parquet` on local paths — S3 and
1085    /// Delta sources are resolved by the backend itself.
1086    ///
1087    /// Accepts three location shapes:
1088    ///   * a single `*.parquet` file
1089    ///   * a directory (lists every `*.parquet` directly inside, non-recursive)
1090    ///   * a glob pattern containing `*`, `?` or `[…]` (e.g.
1091    ///     `data/year=2024/*.parquet`, `data/**/*.parquet`)
1092    pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1093        if self.source.is_s3() {
1094            return Err(AppError::Internal(format!(
1095                "dataset '{}': resolve_local_parquet_files called on s3 source",
1096                self.name
1097            )));
1098        }
1099        let loc = &self.source.location;
1100
1101        // Glob pattern? Expand and require at least one match.
1102        if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1103            let mut files: Vec<PathBuf> = glob::glob(loc)
1104                .map_err(|e| {
1105                    AppError::Internal(format!(
1106                        "dataset '{}': bad glob pattern '{loc}': {e}",
1107                        self.name
1108                    ))
1109                })?
1110                .filter_map(|r| r.ok())
1111                .filter(|p| {
1112                    p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1113                })
1114                .collect();
1115            files.sort();
1116            if files.is_empty() {
1117                return Err(AppError::Internal(format!(
1118                    "dataset '{}': glob '{loc}' matched no .parquet files",
1119                    self.name
1120                )));
1121            }
1122            return Ok(files);
1123        }
1124
1125        let path = Path::new(loc);
1126        if !path.exists() {
1127            return Err(AppError::Internal(format!(
1128                "dataset '{}': source path does not exist: {loc}",
1129                self.name
1130            )));
1131        }
1132
1133        if path.is_file() {
1134            if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1135                return Err(AppError::Internal(format!(
1136                    "dataset '{}': source must be a .parquet file",
1137                    self.name
1138                )));
1139            }
1140            return Ok(vec![path.to_path_buf()]);
1141        }
1142
1143        let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1144            .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1145            .filter_map(|entry| entry.ok().map(|e| e.path()))
1146            .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1147            .collect();
1148        files.sort();
1149        if files.is_empty() {
1150            return Err(AppError::Internal(format!(
1151                "dataset '{}': no *.parquet files found in {loc}",
1152                self.name
1153            )));
1154        }
1155        Ok(files)
1156    }
1157
1158    /// Env-var prefix derived from the dataset name: uppercase with
1159    /// non-alphanumeric chars replaced by `_`. E.g. `sales.eu-1` →
1160    /// `SALES_EU_1`.
1161    pub fn env_prefix(&self) -> String {
1162        self.name
1163            .chars()
1164            .map(|c| {
1165                if c.is_ascii_alphanumeric() {
1166                    c.to_ascii_uppercase()
1167                } else {
1168                    '_'
1169                }
1170            })
1171            .collect()
1172    }
1173
1174    /// Resolve S3 credentials following the precedence chain documented at
1175    /// the top of this module. Returns an empty struct when nothing was
1176    /// found — the caller should then leave credential resolution to the
1177    /// engine's default provider chain.
1178    pub fn resolved_creds(&self) -> ResolvedCreds {
1179        let prefix = self.env_prefix();
1180        let from_env = |suffix: &str| {
1181            std::env::var(format!("{prefix}_{suffix}"))
1182                .ok()
1183                .filter(|s| !s.is_empty())
1184        };
1185        let inline = self.s3.as_ref();
1186        let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1187
1188        ResolvedCreds {
1189            access_key_id: from_env("AWS_ACCESS_KEY_ID")
1190                .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1191                .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1192            secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1193                .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1194                .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1195            session_token: from_env("AWS_SESSION_TOKEN")
1196                .or_else(|| inline.and_then(|s| s.session_token.clone()))
1197                .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1198        }
1199    }
1200
1201    /// Resolved S3 region: per-dataset env (`${PREFIX}_AWS_REGION`)
1202    /// → inline → `AWS_REGION` → `AWS_DEFAULT_REGION` → `us-east-1`.
1203    pub fn resolved_region(&self) -> String {
1204        let prefix = self.env_prefix();
1205        std::env::var(format!("{prefix}_AWS_REGION"))
1206            .ok()
1207            .filter(|s| !s.is_empty())
1208            .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1209            .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1210            .or_else(|| {
1211                std::env::var("AWS_DEFAULT_REGION")
1212                    .ok()
1213                    .filter(|s| !s.is_empty())
1214            })
1215            .unwrap_or_else(|| "us-east-1".to_string())
1216    }
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221    use super::*;
1222
1223    #[test]
1224    fn server_defaults() {
1225        let s = ServerConfig::default();
1226        assert_eq!(s.backend, Backend::Datafusion);
1227        assert_eq!(s.port, 8080);
1228        assert!(s.compress);
1229        assert_eq!(s.max_body_bytes, 1024 * 1024);
1230        assert_eq!(s.max_page_size, 100_000);
1231        assert_eq!(s.request_timeout_ms, 30_000);
1232        assert!(!s.quack.enabled);
1233        assert_eq!(s.quack.uri, "quack:localhost");
1234        assert!(s.quack.token.is_none());
1235        assert!(!s.quack.allow_other_hostname);
1236        assert!(s.quack.read_only);
1237        assert_eq!(s.prefix, "");
1238        assert!(s.listen.is_loopback());
1239    }
1240
1241    #[test]
1242    fn server_overrides_from_toml() {
1243        let toml = r#"
1244            [server]
1245            backend = "duckdb"
1246            port = 9000
1247            prefix = "/datapress"
1248            compress = false
1249            max_body_bytes = 4096
1250            max_page_size = 50000
1251            request_timeout_ms = 0
1252
1253            [server.quack]
1254            enabled = true
1255            uri = "quack:localhost:9495"
1256            token = "test-token"
1257            read_only = false
1258            [[dataset]]
1259            name = "x"
1260            source.kind = "parquet"
1261            source.location = "/tmp/missing.parquet"
1262        "#;
1263        let cfg: AppConfig = toml::from_str(toml).unwrap();
1264        assert_eq!(cfg.server.backend, Backend::Duckdb);
1265        assert_eq!(cfg.server.port, 9000);
1266        assert_eq!(cfg.server.prefix, "/datapress");
1267        assert!(!cfg.server.compress);
1268        assert_eq!(cfg.server.max_body_bytes, 4096);
1269        assert_eq!(cfg.server.max_page_size, 50_000);
1270        assert_eq!(cfg.server.request_timeout_ms, 0);
1271        assert!(cfg.server.quack.enabled);
1272        assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1273        assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1274        assert!(!cfg.server.quack.read_only);
1275        assert_eq!(cfg.datasets.len(), 1);
1276        assert_eq!(cfg.datasets[0].name, "x");
1277        assert!(cfg.datasets[0].dict_encode); // default
1278    }
1279
1280    #[test]
1281    fn validate_rejects_bad_prefix() {
1282        let bad = ["no-leading-slash", "/trailing/"];
1283        for p in bad {
1284            let cfg = AppConfig {
1285                server: ServerConfig {
1286                    prefix: p.to_string(),
1287                    ..Default::default()
1288                },
1289                docs: DocsConfig::default(),
1290                swagger: SwaggerConfig::default(),
1291                metrics: MetricsConfig::default(),
1292                explorer: ExplorerConfig::default(),
1293                auth: AuthConfig::default(),
1294                datasets: vec![],
1295            };
1296            assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1297        }
1298    }
1299
1300    #[test]
1301    fn normalize_lowercases_configured_scopes() {
1302        let mut cfg = AppConfig {
1303            server: ServerConfig::default(),
1304            docs: DocsConfig::default(),
1305            swagger: SwaggerConfig::default(),
1306            metrics: MetricsConfig::default(),
1307            explorer: ExplorerConfig::default(),
1308            auth: AuthConfig {
1309                read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1310                reload_scopes: vec!["Datasets:Reload".into()],
1311                ..Default::default()
1312            },
1313            datasets: vec![],
1314        };
1315        cfg.normalize();
1316        assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1317        assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1318    }
1319
1320    #[test]
1321    fn validate_rejects_no_datasets() {
1322        let cfg = AppConfig {
1323            server: ServerConfig::default(),
1324            docs: DocsConfig::default(),
1325            swagger: SwaggerConfig::default(),
1326            metrics: MetricsConfig::default(),
1327            explorer: ExplorerConfig::default(),
1328            auth: AuthConfig::default(),
1329            datasets: vec![],
1330        };
1331        let err = cfg.validate().unwrap_err();
1332        assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1333    }
1334
1335    #[cfg(feature = "auth")]
1336    #[test]
1337    fn validate_accepts_auth_issuer_with_trailing_slash() {
1338        use std::io::Write;
1339
1340        let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1341        let _ = std::fs::remove_dir_all(&dir);
1342        std::fs::create_dir_all(&dir).unwrap();
1343        let file = dir.join("a.parquet");
1344        std::fs::File::create(&file)
1345            .unwrap()
1346            .write_all(b"x")
1347            .unwrap();
1348
1349        let cfg = AppConfig {
1350            server: ServerConfig::default(),
1351            docs: DocsConfig::default(),
1352            swagger: SwaggerConfig::default(),
1353            metrics: MetricsConfig::default(),
1354            explorer: ExplorerConfig::default(),
1355            auth: AuthConfig {
1356                enabled: true,
1357                issuer: "https://tenant.example.com/".into(),
1358                ..Default::default()
1359            },
1360            datasets: vec![DatasetConfig {
1361                name: "x".into(),
1362                source: SourceConfig {
1363                    kind: SourceKind::Parquet,
1364                    location: file.to_string_lossy().into_owned(),
1365                },
1366                s3: None,
1367                index: IndexConfig::default(),
1368                columns: vec![],
1369                dict_encode: true,
1370                lazy: false,
1371            }],
1372        };
1373
1374        assert!(cfg.validate().is_ok());
1375        let _ = std::fs::remove_dir_all(&dir);
1376    }
1377
1378    #[test]
1379    fn validate_rejects_quack_non_local_host_without_override() {
1380        let cfg = AppConfig {
1381            server: ServerConfig {
1382                quack: QuackConfig {
1383                    enabled: true,
1384                    uri: "quack:127.0.0.1".into(),
1385                    token: Some("test-token".into()),
1386                    ..Default::default()
1387                },
1388                ..Default::default()
1389            },
1390            docs: DocsConfig::default(),
1391            swagger: SwaggerConfig::default(),
1392            metrics: MetricsConfig::default(),
1393            explorer: ExplorerConfig::default(),
1394            auth: AuthConfig::default(),
1395            datasets: vec![DatasetConfig {
1396                name: "x".into(),
1397                source: SourceConfig {
1398                    kind: SourceKind::Parquet,
1399                    location: "/tmp/missing.parquet".into(),
1400                },
1401                s3: None,
1402                index: IndexConfig::default(),
1403                columns: vec![],
1404                dict_encode: true,
1405                lazy: false,
1406            }],
1407        };
1408        let err = cfg.validate().unwrap_err();
1409        assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1410    }
1411
1412    #[test]
1413    fn validate_rejects_bad_dataset_name() {
1414        let cfg: AppConfig = toml::from_str(
1415            r#"
1416            [[dataset]]
1417            name = "bad name!"
1418            source.kind = "parquet"
1419            source.location = "/tmp/whatever"
1420        "#,
1421        )
1422        .unwrap();
1423        let err = cfg.validate().unwrap_err();
1424        assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1425    }
1426
1427    #[test]
1428    fn validate_rejects_duplicate_names() {
1429        use std::io::Write;
1430        let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1431        let _ = std::fs::remove_dir_all(&dir);
1432        std::fs::create_dir_all(&dir).unwrap();
1433        let f = dir.join("a.parquet");
1434        std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1435        let path = f.to_str().unwrap();
1436
1437        let cfg: AppConfig = toml::from_str(&format!(
1438            r#"
1439            [[dataset]]
1440            name = "a"
1441            source.kind = "parquet"
1442            source.location = "{path}"
1443            [[dataset]]
1444            name = "a"
1445            source.kind = "parquet"
1446            source.location = "{path}"
1447        "#
1448        ))
1449        .unwrap();
1450        let err = cfg.validate().expect_err("expected error");
1451        assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1452
1453        let _ = std::fs::remove_dir_all(&dir);
1454    }
1455
1456    #[test]
1457    fn s3_bucket_parsing() {
1458        let mk = |loc: &str| SourceConfig {
1459            kind: SourceKind::Parquet,
1460            location: loc.into(),
1461        };
1462        let s1 = mk("s3://bucket/path/key");
1463        assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1464        let s2 = mk("s3://only-bucket");
1465        assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1466        assert!(mk("s3:///nokey").s3_bucket().is_err());
1467        assert!(mk("/local/path").s3_bucket().is_err());
1468    }
1469
1470    #[test]
1471    fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1472        let mk = |loc: &str| SourceConfig {
1473            kind: SourceKind::Parquet,
1474            location: loc.into(),
1475        };
1476        // Plain prefix -> recursive parquet glob (trailing slash trimmed).
1477        assert_eq!(
1478            mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1479            "s3://bucket/logs/**/*.parquet"
1480        );
1481        assert_eq!(
1482            mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1483            "s3://bucket/logs/**/*.parquet"
1484        );
1485        // Already globbed -> unchanged.
1486        assert_eq!(
1487            mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1488            "s3://bucket/logs/*.parquet"
1489        );
1490        // Non-S3 -> unchanged.
1491        assert_eq!(
1492            mk("/local/logs").s3_recursive_parquet_glob(),
1493            "/local/logs"
1494        );
1495    }
1496
1497    #[test]
1498    fn effective_endpoint_folds_bucket_per_mode() {
1499        let virt = S3Config {
1500            endpoint: Some("https://s3.example.com".into()),
1501            addressing_style: AddressingStyle::Virtual,
1502            ..Default::default()
1503        };
1504        // Auto + virtual -> bucket folded into host.
1505        assert_eq!(
1506            virt.effective_endpoint("mybucket").as_deref(),
1507            Some("https://mybucket.s3.example.com")
1508        );
1509        // Idempotent: already-prefixed host is left alone.
1510        let prefixed = S3Config {
1511            endpoint: Some("https://mybucket.s3.example.com".into()),
1512            ..virt.clone()
1513        };
1514        assert_eq!(
1515            prefixed.effective_endpoint("mybucket").as_deref(),
1516            Some("https://mybucket.s3.example.com")
1517        );
1518        // Path style (auto) -> host untouched.
1519        let path = S3Config {
1520            addressing_style: AddressingStyle::Path,
1521            ..virt.clone()
1522        };
1523        assert_eq!(
1524            path.effective_endpoint("mybucket").as_deref(),
1525            Some("https://s3.example.com")
1526        );
1527        // Explicit overrides win over addressing style.
1528        let forced_off = S3Config {
1529            endpoint_bucket_in_host: BucketInHost::False,
1530            ..virt.clone()
1531        };
1532        assert_eq!(
1533            forced_off.effective_endpoint("mybucket").as_deref(),
1534            Some("https://s3.example.com")
1535        );
1536        let forced_on = S3Config {
1537            endpoint_bucket_in_host: BucketInHost::True,
1538            ..path.clone()
1539        };
1540        assert_eq!(
1541            forced_on.effective_endpoint("mybucket").as_deref(),
1542            Some("https://mybucket.s3.example.com")
1543        );
1544        // No endpoint -> None.
1545        assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1546    }
1547
1548    #[test]
1549    fn env_prefix_sanitises_name() {
1550        let mk = |name: &str| DatasetConfig {
1551            name: name.into(),
1552            source: SourceConfig {
1553                kind: SourceKind::Parquet,
1554                location: "x".into(),
1555            },
1556            s3: None,
1557            index: IndexConfig::default(),
1558            columns: vec![],
1559            dict_encode: true,
1560            lazy: false,
1561        };
1562        assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1563        assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1564        assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1565    }
1566
1567    #[test]
1568    fn resolve_local_parquet_single_file_and_dir() {
1569        use std::io::Write;
1570        let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1571        let _ = std::fs::remove_dir_all(&dir);
1572        std::fs::create_dir_all(&dir).unwrap();
1573        let f = dir.join("a.parquet");
1574        let mut fh = std::fs::File::create(&f).unwrap();
1575        fh.write_all(b"not really parquet").unwrap();
1576
1577        let mk = |loc: &str| DatasetConfig {
1578            name: "ds".into(),
1579            source: SourceConfig {
1580                kind: SourceKind::Parquet,
1581                location: loc.into(),
1582            },
1583            s3: None,
1584            index: IndexConfig::default(),
1585            columns: vec![],
1586            dict_encode: true,
1587            lazy: false,
1588        };
1589
1590        // Direct file.
1591        let files = mk(f.to_str().unwrap())
1592            .resolve_local_parquet_files()
1593            .unwrap();
1594        assert_eq!(files, vec![f.clone()]);
1595
1596        // Directory.
1597        let files = mk(dir.to_str().unwrap())
1598            .resolve_local_parquet_files()
1599            .unwrap();
1600        assert_eq!(files, vec![f.clone()]);
1601
1602        // Missing path.
1603        assert!(
1604            mk("/no/such/place.parquet")
1605                .resolve_local_parquet_files()
1606                .is_err()
1607        );
1608
1609        let _ = std::fs::remove_dir_all(&dir);
1610    }
1611}