Skip to main content

datapress_core/
config.rs

1//! Runtime configuration loaded from `datasets.toml`.
2//!
3//! Each instance binds to a list of datasets. A dataset's `[dataset.source]`
4//! block selects the format (`parquet` or `delta`) and the location (a
5//! local path or an `s3://bucket/key` URL). When the location is on S3,
6//! an optional `[dataset.s3]` block carries non-secret connection details
7//! (region, endpoint, addressing style, …).
8//!
9//! Credentials are resolved at runtime via [`DatasetConfig::resolved_creds`]
10//! in this precedence order:
11//!
12//! 1. Per-dataset env vars `${PREFIX}_AWS_ACCESS_KEY_ID`,
13//!    `${PREFIX}_AWS_SECRET_ACCESS_KEY`, `${PREFIX}_AWS_SESSION_TOKEN`
14//!    where `${PREFIX}` is the dataset name uppercased with non-alphanumeric
15//!    characters replaced by `_` (e.g. `accidents` → `ACCIDENTS`,
16//!    `sales.eu-1` → `SALES_EU_1`).
17//! 2. Inline `access_key_id` / `secret_access_key` / `session_token` in the
18//!    `[dataset.s3]` block.
19//! 3. Plain `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` /
20//!    `AWS_SESSION_TOKEN`.
21//! 4. None — fall back to the engine's own provider chain
22//!    (`~/.aws/credentials`, IMDS, …).
23
24use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32/// Mount paths the user MUST NOT pick for `[docs].path` or
33/// `[swagger].path` — they would shadow first-party routes (probes,
34/// API scopes, root).
35const RESERVED_MOUNTS: &[&str] = &[
36    "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39// ---------------------------------------------------------------------------
40// Public types
41// ---------------------------------------------------------------------------
42
43#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45    #[serde(default)]
46    pub server: ServerConfig,
47    #[serde(default)]
48    pub docs: DocsConfig,
49    #[serde(default)]
50    pub swagger: SwaggerConfig,
51    #[serde(default)]
52    pub metrics: MetricsConfig,
53    #[serde(default)]
54    pub auth: AuthConfig,
55    #[serde(rename = "dataset", default)]
56    pub datasets: Vec<DatasetConfig>,
57}
58
59#[derive(Debug, Deserialize)]
60#[serde(default)]
61pub struct ServerConfig {
62    /// Which engine to run. Must match the binary's compile-time feature.
63    pub backend: Backend,
64    /// Listen address. Defaults to loopback (127.0.0.1) — explicitly opt in
65    /// to 0.0.0.0 if you want to expose the port.
66    pub listen: IpAddr,
67    /// TCP port.
68    pub port: u16,
69    /// Number of actix worker threads. `None` (= unset) → one per CPU.
70    pub workers: Option<usize>,
71    /// Optional URL path prefix — useful when sitting behind a reverse
72    /// proxy that rewrites e.g. `/datapress/...` → `/...`. When set, every
73    /// route is mounted under this prefix (so the proxy can pass the URL
74    /// through unchanged). Must start with `/` and not end with `/`; the
75    /// empty string (default) means no prefix.
76    pub prefix: String,
77    /// Negotiate response compression (gzip / brotli / zstd) via the
78    /// `Accept-Encoding` request header. Enabled by default. Disable when
79    /// running behind a proxy that already compresses, or when the extra
80    /// CPU is not worth the bandwidth saving.
81    pub compress: bool,
82    /// Maximum accepted JSON request body size, in bytes. Larger bodies
83    /// are rejected with `413 Payload Too Large` before any handler runs.
84    /// Default `1 MiB`. Most query bodies are well under 10 KiB; this is
85    /// a DoS guard, not a tuning knob.
86    pub max_body_bytes: usize,
87    /// Maximum rows returned by a single `/query` page. Larger
88    /// `page_size` values are clamped before the backend runs.
89    /// Default `100_000`.
90    pub max_page_size: u64,
91    /// Per-request handler timeout, in milliseconds. If a handler hasn't
92    /// produced a response within this budget the request is aborted with
93    /// `504 Gateway Timeout`. Default `30_000` (30 s). Set `0` to disable.
94    pub request_timeout_ms: u64,
95    /// Grace period for in-flight requests after the server has received
96    /// `SIGTERM` / `SIGINT`, in seconds. The listening socket is closed
97    /// immediately; existing connections then have up to this many
98    /// seconds to finish before workers are force-stopped. Default `30`.
99    pub shutdown_timeout_secs: u64,
100    /// Optional DuckDB Quack remote SQL server. Only used by the DuckDB
101    /// backend; ignored by DataFusion.
102    pub quack: QuackConfig,
103}
104
105impl Default for ServerConfig {
106    fn default() -> Self {
107        Self {
108            backend: Backend::default(),
109            listen: IpAddr::from([127, 0, 0, 1]),
110            port: 8080,
111            workers: None,
112            prefix: String::new(),
113            compress: true,
114            max_body_bytes: 1024 * 1024,
115            max_page_size: 100_000,
116            request_timeout_ms: 30_000,
117            shutdown_timeout_secs: 30,
118            quack: QuackConfig::default(),
119        }
120    }
121}
122
123/// Experimental DuckDB Quack remote protocol server.
124///
125/// Quack exposes the DuckDB SQL surface of the in-process database. Keep it
126/// disabled unless you intentionally want DuckDB clients to attach/query this
127/// process directly.
128#[derive(Debug, Clone, Deserialize)]
129#[serde(default)]
130pub struct QuackConfig {
131    /// Install/load the Quack extension and start `quack_serve` after
132    /// datasets are registered.
133    pub enabled: bool,
134    /// Quack URI to listen on. `quack:localhost` uses DuckDB's default
135    /// port 9494.
136    pub uri: String,
137    /// Optional explicit authentication token. If omitted, Quack generates
138    /// one at startup and DataPress logs it once.
139    pub token: Option<String>,
140    /// Allow binding a non-local hostname such as `quack:0.0.0.0:9494`.
141    /// For external exposure, put a TLS-terminating reverse proxy in front.
142    pub allow_other_hostname: bool,
143    /// Install a read-only authorization macro for remote queries. Enabled
144    /// by default to match DataPress' read-oriented HTTP API.
145    pub read_only: bool,
146}
147
148impl Default for QuackConfig {
149    fn default() -> Self {
150        Self {
151            enabled: false,
152            uri: "quack:localhost".into(),
153            token: None,
154            allow_other_hostname: false,
155            read_only: true,
156        }
157    }
158}
159
160impl QuackConfig {
161    /// Validate the enabled Quack configuration against DuckDB's current
162    /// safety rules. The extension treats only the literal `localhost` as
163    /// local unless `allow_other_hostname` is set.
164    pub fn validate_enabled(&self) -> Result<(), AppError> {
165        if self.uri.trim().is_empty() {
166            return Err(AppError::Internal(
167                "server.quack.uri must not be empty when server.quack.enabled = true".into(),
168            ));
169        }
170        if !self.uri.starts_with("quack:") {
171            return Err(AppError::Internal(format!(
172                "server.quack.uri must start with 'quack:' (got '{}')",
173                self.uri
174            )));
175        }
176        if !self.allow_other_hostname {
177            let host = self.hostname().unwrap_or_default();
178            if host != "localhost" {
179                return Err(AppError::Internal(format!(
180                    "server.quack.uri host must be 'localhost' unless \
181                     server.quack.allow_other_hostname = true (got '{}')",
182                    self.uri
183                )));
184            }
185        }
186        if let Some(token) = self.token.as_deref()
187            && token.len() < 4
188        {
189            return Err(AppError::Internal(
190                "server.quack.token must be at least 4 characters".into(),
191            ));
192        }
193        Ok(())
194    }
195
196    fn hostname(&self) -> Option<&str> {
197        let rest = self.uri.strip_prefix("quack:")?;
198        let rest = rest.strip_prefix("//").unwrap_or(rest);
199        let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
200        (!host.is_empty()).then_some(host)
201    }
202}
203
204#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
205#[serde(rename_all = "lowercase")]
206pub enum Backend {
207    #[default]
208    Datafusion,
209    Duckdb,
210}
211
212/// Embedded MkDocs documentation site (`[docs]` block).
213///
214/// Enabled by default — when the binary was built with the `docs`
215/// cargo feature, the site is served at [`DocsConfig::path`] out of
216/// the box. Set `enabled = false` in `datasets.toml` to suppress it
217/// (e.g. in prod). When the binary was built without the feature,
218/// `enabled = true` is harmless: the server logs a warning at startup
219/// and skips the mount. The mount path must be a non-trivial sub-path;
220/// reserved API and probe roots are rejected at startup.
221#[derive(Debug, Clone, Deserialize)]
222#[serde(default, deny_unknown_fields)]
223pub struct DocsConfig {
224    pub enabled: bool,
225    pub path: String,
226}
227
228impl Default for DocsConfig {
229    fn default() -> Self {
230        Self {
231            enabled: true,
232            path: "/mkdocs".into(),
233        }
234    }
235}
236
237/// Swagger UI + embedded OpenAPI spec (`[swagger]` block).
238///
239/// Enabled by default — when the binary was built with the `swagger`
240/// cargo feature, an interactive Swagger UI is served at
241/// [`SwaggerConfig::path`] (default `/docs`) and the raw OpenAPI JSON
242/// at `<path>/openapi.json`. Set `enabled = false` in `datasets.toml`
243/// to suppress it (e.g. in prod). When the binary was built without
244/// the feature, `enabled = true` is harmless: the server logs a
245/// warning at startup and skips the mount.
246///
247/// To let users sign in to the UI itself (Authorization Code + PKCE
248/// against any OIDC provider), populate the optional `[swagger.oauth2]`
249/// sub-block. Acquired tokens are attached as `Authorization: Bearer …`
250/// to every "Try it out" request — useful for exercising auth-protected
251/// endpoints from the docs page. This drives the UI only; it does not
252/// turn on server-side token validation.
253#[derive(Debug, Clone, Deserialize)]
254#[serde(default, deny_unknown_fields)]
255pub struct SwaggerConfig {
256    pub enabled: bool,
257    pub path: String,
258    pub oauth2: Option<SwaggerOAuth2Config>,
259}
260
261impl Default for SwaggerConfig {
262    fn default() -> Self {
263        Self {
264            enabled: true,
265            path: "/docs".into(),
266            oauth2: None,
267        }
268    }
269}
270
271/// OIDC single-sign-on for the Swagger UI (`[swagger.oauth2]`).
272///
273/// Configures the UI to drive an Authorization Code + PKCE flow against
274/// the given OIDC issuer. Swagger UI auto-discovers the authorize /
275/// token endpoints from `<issuer>/.well-known/openid-configuration`,
276/// so we don't need to pin them here.
277///
278/// All fields are required when the block is present — there is no
279/// sensible default for `issuer` or `client_id`.
280#[derive(Debug, Clone, Deserialize)]
281#[serde(deny_unknown_fields)]
282pub struct SwaggerOAuth2Config {
283    /// OIDC issuer URL, e.g.
284    /// `https://login.microsoftonline.com/<tenant>/v2.0` or
285    /// `https://accounts.google.com`. Must not end in `/`.
286    pub issuer: String,
287    /// Public OAuth2 client identifier registered with the IdP. The
288    /// client must be a SPA / public client (no secret) with
289    /// `https://<your-host>{swagger.path}/oauth2-redirect.html` listed
290    /// as an allowed redirect URI.
291    pub client_id: String,
292    /// Scopes to request by default. Will be pre-checked in the Swagger
293    /// UI authorize dialog; users can edit them before signing in.
294    /// `openid` is always added if missing.
295    #[serde(default)]
296    pub scopes: Vec<String>,
297    /// Use PKCE for the authorization code flow. Defaults to `true`;
298    /// disable only if your IdP doesn't support PKCE for public clients.
299    #[serde(default = "default_true")]
300    pub pkce: bool,
301}
302
303/// Prometheus metrics endpoint (`[metrics]` block).
304///
305/// Disabled by default. When `enabled = true` (and the binary was built
306/// with the `metrics` cargo feature), the server installs a middleware
307/// that records per-request HTTP counters and latency histograms, and
308/// exposes them in the Prometheus text exposition format at
309/// [`MetricsConfig::path`] (default `/metrics`).
310///
311/// The endpoint is mounted at a fixed, *unprefixed* path — like the
312/// health probes — so a scrape config doesn't need to know about any
313/// reverse-proxy `server.prefix`. It is **not** behind the `[auth]`
314/// layer: Prometheus scrapers rarely carry bearer tokens, and the
315/// endpoint exposes only aggregate request metrics (no row data). Keep
316/// it on a network the scraper can reach but the public cannot, e.g. by
317/// binding `server.listen` to a private interface.
318///
319/// When the binary was built without the `metrics` feature,
320/// `enabled = true` is harmless: the server logs a warning at startup
321/// and skips the endpoint.
322#[derive(Debug, Clone, Deserialize)]
323#[serde(default, deny_unknown_fields)]
324pub struct MetricsConfig {
325    pub enabled: bool,
326    pub path: String,
327}
328
329impl Default for MetricsConfig {
330    fn default() -> Self {
331        Self {
332            enabled: false,
333            path: "/metrics".into(),
334        }
335    }
336}
337
338/// OIDC bearer-token enforcement for the HTTP API (`[auth]` block).
339///
340/// Disabled by default. When `enabled = true`, the server validates
341/// every request's `Authorization: Bearer …` JWT against the JWKS
342/// discovered from the issuer's OIDC metadata
343/// (`<issuer>/.well-known/openid-configuration` → `jwks_uri`), then
344/// enforces the configured scope requirements per route.
345///
346/// Only compiled in when the binary was built with the `auth` cargo
347/// feature. Without the feature, `enabled = true` is rejected at
348/// startup so a misconfigured production deployment can't silently
349/// fall back to "no auth".
350///
351/// The Swagger UI's SSO support (`[swagger.oauth2]`) is *independent*
352/// of this block — `[swagger.oauth2]` only drives the UI's login
353/// dialog; `[auth]` is what enforces tokens on the API.
354#[derive(Debug, Clone, Deserialize)]
355#[serde(default, deny_unknown_fields)]
356pub struct AuthConfig {
357    /// Master switch. `false` (default) skips all auth processing.
358    pub enabled: bool,
359    /// OIDC issuer URL — must match the `iss` claim of every accepted
360    /// token. Required when `enabled = true`.
361    pub issuer: String,
362    /// Expected `aud` claim. When empty, audience validation is
363    /// skipped (not recommended in production).
364    pub audience: String,
365    /// Scopes a caller must hold to read datasets (GET endpoints +
366    /// POST `…/query` and `…/count`). Empty list means "no scope check,
367    /// just a valid token is enough".
368    pub read_scopes: Vec<String>,
369    /// Scopes required for admin/mutation endpoints (POST `…/reload`).
370    /// Empty list means "no scope check, just a valid token is enough".
371    pub reload_scopes: Vec<String>,
372    /// Allow unauthenticated GETs through. Useful for public datasets
373    /// and demo deployments. Defaults to `false`.
374    pub anonymous_read: bool,
375    /// Continue serving even if the JWKS fetch fails at startup.
376    /// When `true` (default), the server starts in a degraded mode that
377    /// rejects every auth'd request with 503 until JWKS becomes
378    /// reachable. When `false`, startup fails outright.
379    pub start_degraded: bool,
380    /// Allowed signing algorithms. Pinned to RS256 by default; never
381    /// include `HS*` or `none` here unless you really know what you're
382    /// doing.
383    pub algorithms: Vec<String>,
384    /// Clock-skew leeway for `exp`/`nbf` checks, in seconds.
385    pub leeway_secs: u64,
386    /// How often (in seconds) the background refresher re-fetches the
387    /// JWKS. On a `kid` cache miss the JWKS is also refreshed
388    /// out-of-band.
389    pub jwks_refresh_secs: u64,
390    /// Optional JSON-pointer into the JWT claims that extracts a
391    /// tenant identifier — attached to the principal and logged on
392    /// every request. Example: `"/tid"` (Azure AD), `"/org_id"`.
393    /// When empty, no tenant is extracted.
394    pub tenant_claim: String,
395    /// If non-empty, requests whose extracted tenant ID is not in this
396    /// list are rejected with 403. Has no effect when `tenant_claim`
397    /// is empty.
398    pub allowed_tenants: Vec<String>,
399    /// If `true`, `POST …/reload` accepts *either* a valid token with
400    /// `reload_scopes` *or* the legacy `X-Admin-Token` header. Defaults
401    /// to `true` for one-release backwards compatibility — flip to
402    /// `false` once your automation has migrated to OIDC.
403    pub admin_token_fallback: bool,
404}
405
406impl Default for AuthConfig {
407    fn default() -> Self {
408        Self {
409            enabled: false,
410            issuer: String::new(),
411            audience: String::new(),
412            read_scopes: Vec::new(),
413            reload_scopes: Vec::new(),
414            anonymous_read: false,
415            start_degraded: true,
416            algorithms: vec!["RS256".into()],
417            leeway_secs: 60,
418            jwks_refresh_secs: 3600,
419            tenant_claim: String::new(),
420            allowed_tenants: Vec::new(),
421            admin_token_fallback: true,
422        }
423    }
424}
425
426impl Backend {
427    pub fn as_str(self) -> &'static str {
428        match self {
429            Backend::Datafusion => "datafusion",
430            Backend::Duckdb => "duckdb",
431        }
432    }
433}
434
435#[derive(Debug, Clone, Deserialize)]
436pub struct DatasetConfig {
437    pub name: String,
438    pub source: SourceConfig,
439    #[serde(default)]
440    pub s3: Option<S3Config>,
441    #[serde(default)]
442    pub index: IndexConfig,
443    /// Optional column projection applied at load time. When non-empty,
444    /// only the listed columns are read from the parquet/delta source —
445    /// every other column is skipped entirely (no decode, no allocation,
446    /// no resident memory). Empty (default) = read all columns. Names are
447    /// matched case-insensitively against the source schema.
448    #[serde(default)]
449    pub columns: Vec<String>,
450    /// When `true` (default), Utf8 columns that are dictionary-encoded in
451    /// the source parquet are read as Arrow `Dictionary(Int32, Utf8)`
452    /// instead of being expanded to plain Utf8. Massively cheaper in RAM
453    /// for low-cardinality columns. Set to `false` to bypass the override
454    /// — useful as a workaround if you observe null-handling oddities on
455    /// a particular parquet file.
456    #[serde(default = "default_true")]
457    pub dict_encode: bool,
458    /// When `true`, the backend should keep the dataset on disk and stream
459    /// it at query time instead of materialising it into RAM at startup.
460    /// Trades the in-memory hot paths (raw Arrow slice, equality index)
461    /// for bounded memory use on large / multi-file sources. Currently
462    /// honoured by the DataFusion backend for local parquet.
463    #[serde(default)]
464    pub lazy: bool,
465}
466
467fn default_true() -> bool {
468    true
469}
470
471#[derive(Debug, Clone, Deserialize)]
472pub struct SourceConfig {
473    pub kind: SourceKind,
474    /// Either a local filesystem path or an `s3://bucket/key` URL.
475    pub location: String,
476}
477
478#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
479#[serde(rename_all = "lowercase")]
480pub enum SourceKind {
481    #[default]
482    Parquet,
483    Delta,
484}
485
486impl SourceKind {
487    pub fn as_str(self) -> &'static str {
488        match self {
489            SourceKind::Parquet => "parquet",
490            SourceKind::Delta => "delta",
491        }
492    }
493}
494
495/// Non-secret S3 connection settings. Credentials are pulled from env / the
496/// AWS credential chain — see [`DatasetConfig::resolved_creds`].
497#[derive(Debug, Clone, Deserialize)]
498#[serde(default)]
499pub struct S3Config {
500    pub region: Option<String>,
501    /// Custom endpoint (MinIO, R2, Wasabi, LocalStack, …). Omit for AWS.
502    pub endpoint: Option<String>,
503    /// `virtual` (default — `bucket.host`) or `path` (`host/bucket/`).
504    /// MinIO and most non-AWS providers require `path`.
505    pub addressing_style: AddressingStyle,
506    /// Allow plain-HTTP endpoints. Required for local MinIO over `http://…`.
507    pub allow_http: bool,
508    /// Inline credentials. Strongly discouraged in production — prefer env
509    /// vars (see module docs).
510    pub access_key_id: Option<String>,
511    pub secret_access_key: Option<String>,
512    pub session_token: Option<String>,
513    /// Hive partition-column handling for parquet sources. Defaults to
514    /// `auto` (detect from the path). See [`Partitioning`].
515    pub partitioning: Partitioning,
516    /// Whether the bucket is folded into a custom `endpoint` host for
517    /// virtual-hosted-style requests. Defaults to `auto`. See
518    /// [`BucketInHost`].
519    pub endpoint_bucket_in_host: BucketInHost,
520}
521
522impl Default for S3Config {
523    fn default() -> Self {
524        Self {
525            region: None,
526            endpoint: None,
527            addressing_style: AddressingStyle::Virtual,
528            allow_http: false,
529            access_key_id: None,
530            secret_access_key: None,
531            session_token: None,
532            partitioning: Partitioning::Auto,
533            endpoint_bucket_in_host: BucketInHost::Auto,
534        }
535    }
536}
537
538impl S3Config {
539    /// Resolve the endpoint URL to hand to the object store, optionally
540    /// folding `bucket` into the host for virtual-hosted-style requests so a
541    /// plain `endpoint` works the same way it does on DuckDB.
542    ///
543    /// Returns `None` when no custom endpoint is configured (AWS default).
544    /// The bucket is only prepended when it isn't already the leading host
545    /// label, so re-running this (or a config that already embeds the
546    /// bucket) never produces `bucket.bucket.host`.
547    pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
548        let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
549
550        let fold = match self.endpoint_bucket_in_host {
551            BucketInHost::False => false,
552            BucketInHost::True => true,
553            BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
554        };
555        if !fold {
556            return Some(ep.to_string());
557        }
558
559        let (scheme, host_and_path) = match ep.split_once("://") {
560            Some((s, rest)) => (Some(s), rest),
561            None => (None, ep),
562        };
563        // Split host from any trailing path so we prefix the host label only.
564        let (host, path) = match host_and_path.split_once('/') {
565            Some((h, p)) => (h, Some(p)),
566            None => (host_and_path, None),
567        };
568        // Guard against double-prefixing.
569        if host == bucket || host.starts_with(&format!("{bucket}.")) {
570            return Some(ep.to_string());
571        }
572        let new_host = format!("{bucket}.{host}");
573        let rebuilt = match (scheme, path) {
574            (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
575            (Some(s), None) => format!("{s}://{new_host}"),
576            (None, Some(p)) => format!("{new_host}/{p}"),
577            (None, None) => new_host,
578        };
579        Some(rebuilt)
580    }
581}
582
583#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
584#[serde(rename_all = "lowercase")]
585pub enum AddressingStyle {
586    #[default]
587    Virtual,
588    Path,
589}
590
591impl AddressingStyle {
592    pub fn as_str(self) -> &'static str {
593        match self {
594            AddressingStyle::Virtual => "virtual",
595            AddressingStyle::Path => "path",
596        }
597    }
598}
599
600/// How hive-style partition columns (`key=value/` path segments) are handled
601/// for an S3 parquet source. Local parquet always auto-detects; this option
602/// brings S3 in line and lets you force or disable the behaviour.
603#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
604#[serde(rename_all = "lowercase")]
605pub enum Partitioning {
606    /// Detect `key=value` segments from the location glob or by listing the
607    /// prefix. No partition columns are added when none are found.
608    #[default]
609    Auto,
610    /// Force hive partitioning. Partition keys are taken from the location
611    /// glob, or discovered by listing the prefix.
612    Hive,
613    /// Treat the source as a flat set of parquet files — never add partition
614    /// columns even if the path looks hive-partitioned.
615    None,
616}
617
618impl Partitioning {
619    pub fn as_str(self) -> &'static str {
620        match self {
621            Partitioning::Auto => "auto",
622            Partitioning::Hive => "hive",
623            Partitioning::None => "none",
624        }
625    }
626}
627
628/// Whether the bucket name is folded into the endpoint hostname for
629/// virtual-hosted-style requests against a custom endpoint. This aligns the
630/// DataFusion object-store path with DuckDB, which builds the virtual host
631/// itself — so the same plain `endpoint` works on both backends.
632#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
633#[serde(rename_all = "lowercase")]
634pub enum BucketInHost {
635    /// Fold the bucket into the host when `addressing_style = "virtual"` and
636    /// a custom `endpoint` is set (guarded against double-prefixing).
637    #[default]
638    Auto,
639    /// Always fold the bucket into the endpoint host.
640    True,
641    /// Never rewrite the endpoint — pass it through verbatim.
642    False,
643}
644
645impl BucketInHost {
646    pub fn as_str(self) -> &'static str {
647        match self {
648            BucketInHost::Auto => "auto",
649            BucketInHost::True => "true",
650            BucketInHost::False => "false",
651        }
652    }
653}
654
655#[derive(Debug, Clone, Deserialize)]
656#[serde(default)]
657pub struct IndexConfig {
658    pub mode: IndexMode,
659    pub columns: Vec<String>,
660    pub max_cardinality: usize,
661}
662
663impl Default for IndexConfig {
664    fn default() -> Self {
665        Self {
666            mode: IndexMode::Auto,
667            columns: Vec::new(),
668            max_cardinality: 100_000,
669        }
670    }
671}
672
673#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
674#[serde(rename_all = "lowercase")]
675pub enum IndexMode {
676    #[default]
677    Auto,
678    None,
679    List,
680}
681
682/// Resolved S3 credentials. `None` fields mean "let the engine's default
683/// provider chain figure it out".
684#[derive(Debug, Clone, Default)]
685pub struct ResolvedCreds {
686    pub access_key_id: Option<String>,
687    pub secret_access_key: Option<String>,
688    pub session_token: Option<String>,
689}
690
691impl ResolvedCreds {
692    pub fn has_keypair(&self) -> bool {
693        self.access_key_id.is_some() && self.secret_access_key.is_some()
694    }
695}
696
697// ---------------------------------------------------------------------------
698// Loading + validation
699// ---------------------------------------------------------------------------
700
701impl AppConfig {
702    /// Read and validate a TOML config file.
703    pub fn load(path: &str) -> Result<Self, AppError> {
704        let raw = std::fs::read_to_string(path)
705            .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
706        let mut cfg: AppConfig =
707            toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
708        cfg.normalize();
709        cfg.validate()?;
710        Ok(cfg)
711    }
712
713    /// Canonicalise fields that are compared case-insensitively at runtime.
714    ///
715    /// Token scopes are lowercased when parsed out of a JWT (see `auth.rs`),
716    /// so the configured `read_scopes` / `reload_scopes` are lowercased here
717    /// once at load time. Without this an operator who writes
718    /// `"Datasets:Read"` would silently 403 every caller, since the token
719    /// side would have become `datasets:read`.
720    fn normalize(&mut self) {
721        for s in self
722            .auth
723            .read_scopes
724            .iter_mut()
725            .chain(self.auth.reload_scopes.iter_mut())
726        {
727            *s = s.to_ascii_lowercase();
728        }
729    }
730
731    fn validate(&self) -> Result<(), AppError> {
732        // Server prefix: empty, or must start with '/' and not end with '/'.
733        let p = &self.server.prefix;
734        if !p.is_empty() {
735            if !p.starts_with('/') {
736                return Err(AppError::Internal(format!(
737                    "server.prefix must start with '/' (got '{p}')"
738                )));
739            }
740            if p.ends_with('/') {
741                return Err(AppError::Internal(format!(
742                    "server.prefix must not end with '/' (got '{p}')"
743                )));
744            }
745        }
746
747        if self.datasets.is_empty() {
748            return Err(AppError::Internal(
749                "datasets.toml has no [[dataset]] entries".into(),
750            ));
751        }
752
753        if self.server.quack.enabled {
754            self.server.quack.validate_enabled()?;
755        }
756
757        // Validate the docs mount path even when the section is disabled,
758        // so an inactive config typo can't go unnoticed.
759        {
760            let dp = &self.docs.path;
761            if !dp.starts_with('/') {
762                return Err(AppError::Internal(format!(
763                    "docs.path must start with '/' (got '{dp}')"
764                )));
765            }
766            if dp.len() > 1 && dp.ends_with('/') {
767                return Err(AppError::Internal(format!(
768                    "docs.path must not end with '/' (got '{dp}')"
769                )));
770            }
771            if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
772                return Err(AppError::Internal(format!(
773                    "docs.path '{dp}' collides with a reserved route"
774                )));
775            }
776        }
777
778        // Same for the swagger UI mount.
779        {
780            let sp = &self.swagger.path;
781            if !sp.starts_with('/') {
782                return Err(AppError::Internal(format!(
783                    "swagger.path must start with '/' (got '{sp}')"
784                )));
785            }
786            if sp.len() > 1 && sp.ends_with('/') {
787                return Err(AppError::Internal(format!(
788                    "swagger.path must not end with '/' (got '{sp}')"
789                )));
790            }
791            if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
792                return Err(AppError::Internal(format!(
793                    "swagger.path '{sp}' collides with a reserved route"
794                )));
795            }
796            if sp == &self.docs.path {
797                return Err(AppError::Internal(format!(
798                    "swagger.path and docs.path must differ (both '{sp}')"
799                )));
800            }
801            if let Some(o) = &self.swagger.oauth2 {
802                if o.issuer.trim().is_empty() {
803                    return Err(AppError::Internal(
804                        "swagger.oauth2.issuer must not be empty".into(),
805                    ));
806                }
807                if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
808                    return Err(AppError::Internal(format!(
809                        "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
810                        o.issuer
811                    )));
812                }
813                if o.client_id.trim().is_empty() {
814                    return Err(AppError::Internal(
815                        "swagger.oauth2.client_id must not be empty".into(),
816                    ));
817                }
818            }
819        }
820
821        // Metrics endpoint mount path. Validated even when disabled so an
822        // inactive config typo can't go unnoticed. `/metrics` is itself a
823        // reserved mount (so docs/swagger can't shadow it), so we check the
824        // remaining reserved routes — and the docs/swagger paths — for
825        // collisions rather than the whole list.
826        {
827            let mp = &self.metrics.path;
828            if !mp.starts_with('/') {
829                return Err(AppError::Internal(format!(
830                    "metrics.path must start with '/' (got '{mp}')"
831                )));
832            }
833            if mp.len() > 1 && mp.ends_with('/') {
834                return Err(AppError::Internal(format!(
835                    "metrics.path must not end with '/' (got '{mp}')"
836                )));
837            }
838            if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
839                return Err(AppError::Internal(format!(
840                    "metrics.path '{mp}' collides with a reserved route"
841                )));
842            }
843            if mp == &self.docs.path {
844                return Err(AppError::Internal(format!(
845                    "metrics.path and docs.path must differ (both '{mp}')"
846                )));
847            }
848            if mp == &self.swagger.path {
849                return Err(AppError::Internal(format!(
850                    "metrics.path and swagger.path must differ (both '{mp}')"
851                )));
852            }
853        }
854
855        // Auth block — only meaningful when `enabled = true`. The cargo
856        // feature gate is enforced separately in `server::serve` so a
857        // binary built without `--features auth` and a config with
858        // `auth.enabled = true` aborts with a clear error.
859        if self.auth.enabled {
860            let a = &self.auth;
861            if a.issuer.trim().is_empty() {
862                return Err(AppError::Internal(
863                    "auth.issuer must not be empty when auth.enabled = true".into(),
864                ));
865            }
866            if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
867                return Err(AppError::Internal(format!(
868                    "auth.issuer must be an absolute http(s) URL (got '{}')",
869                    a.issuer
870                )));
871            }
872            for alg in &a.algorithms {
873                match alg.as_str() {
874                    "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
875                    | "PS512" => {}
876                    other => {
877                        return Err(AppError::Internal(format!(
878                            "auth.algorithms[{other}] is not allowed; pick one of \
879                         RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
880                        )));
881                    }
882                }
883            }
884            if a.algorithms.is_empty() {
885                return Err(AppError::Internal(
886                    "auth.algorithms must not be empty".into(),
887                ));
888            }
889            if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
890                return Err(AppError::Internal(format!(
891                    "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
892                    a.tenant_claim
893                )));
894            }
895            if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
896                return Err(AppError::Internal(
897                    "auth.allowed_tenants is set but auth.tenant_claim is empty — \
898                     can't enforce a tenant allow-list without a claim to extract from"
899                        .into(),
900                ));
901            }
902        }
903
904        let mut seen = HashSet::new();
905        for d in &self.datasets {
906            if !seen.insert(d.name.as_str()) {
907                return Err(AppError::Internal(format!(
908                    "duplicate dataset name: {}",
909                    d.name
910                )));
911            }
912            if d.name.is_empty() {
913                return Err(AppError::Internal("dataset name must not be empty".into()));
914            }
915            // URL-safe: alphanum + _ - .
916            if !d
917                .name
918                .chars()
919                .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
920            {
921                return Err(AppError::Internal(format!(
922                    "dataset name '{}' must be alphanumeric (plus _ - .)",
923                    d.name
924                )));
925            }
926
927            if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
928                return Err(AppError::Internal(format!(
929                    "dataset '{}': index.mode = 'list' requires non-empty index.columns",
930                    d.name
931                )));
932            }
933
934            // Location-specific checks.
935            if d.source.is_s3() {
936                d.source.s3_bucket()?;
937                if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
938                    && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
939                    && std::env::var("AWS_REGION").is_err()
940                    && std::env::var("AWS_DEFAULT_REGION").is_err()
941                {
942                    log::warn!(
943                        "dataset '{}': S3 source without explicit region — \
944                         relying on AWS_REGION env var",
945                        d.name
946                    );
947                }
948            } else {
949                // Local path. For parquet we can fully resolve to a file
950                // list up front; for delta we only check that the directory
951                // exists (delta has its own layout — _delta_log/, …).
952                match d.source.kind {
953                    SourceKind::Parquet => {
954                        d.resolve_local_parquet_files()?;
955                    }
956                    SourceKind::Delta => {
957                        let p = Path::new(&d.source.location);
958                        if !p.exists() {
959                            return Err(AppError::Internal(format!(
960                                "dataset '{}': delta location does not exist: {}",
961                                d.name, d.source.location
962                            )));
963                        }
964                    }
965                }
966            }
967        }
968        Ok(())
969    }
970}
971
972impl SourceConfig {
973    pub fn is_s3(&self) -> bool {
974        self.location.starts_with("s3://")
975    }
976
977    /// True when the location already contains a glob metacharacter
978    /// (`*`, `?`, or `[`).
979    pub fn has_glob(&self) -> bool {
980        self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
981    }
982
983    /// Location to hand to a backend that needs an explicit parquet glob
984    /// (DuckDB). When the location is a plain S3 prefix with no glob, append
985    /// a recursive `**/*.parquet` so DuckDB lists the prefix the same way
986    /// DataFusion's object-store listing does. Globbed or non-S3 locations
987    /// are returned unchanged.
988    pub fn s3_recursive_parquet_glob(&self) -> String {
989        if !self.is_s3() || self.has_glob() {
990            return self.location.clone();
991        }
992        let trimmed = self.location.trim_end_matches('/');
993        format!("{trimmed}/**/*.parquet")
994    }
995
996    /// Returns `(bucket, key_prefix_or_empty)` for an `s3://…` location.
997    pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
998        let rest = self
999            .location
1000            .strip_prefix("s3://")
1001            .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1002        let (bucket, key) = match rest.split_once('/') {
1003            Some((b, k)) => (b, k),
1004            None => (rest, ""),
1005        };
1006        if bucket.is_empty() {
1007            return Err(AppError::Internal(format!(
1008                "s3 URL missing bucket: {}",
1009                self.location
1010            )));
1011        }
1012        Ok((bucket, key))
1013    }
1014}
1015
1016impl DatasetConfig {
1017    /// Expand `source.location` to a concrete list of local `.parquet`
1018    /// files. Only valid for `kind = parquet` on local paths — S3 and
1019    /// Delta sources are resolved by the backend itself.
1020    ///
1021    /// Accepts three location shapes:
1022    ///   * a single `*.parquet` file
1023    ///   * a directory (lists every `*.parquet` directly inside, non-recursive)
1024    ///   * a glob pattern containing `*`, `?` or `[…]` (e.g.
1025    ///     `data/year=2024/*.parquet`, `data/**/*.parquet`)
1026    pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1027        if self.source.is_s3() {
1028            return Err(AppError::Internal(format!(
1029                "dataset '{}': resolve_local_parquet_files called on s3 source",
1030                self.name
1031            )));
1032        }
1033        let loc = &self.source.location;
1034
1035        // Glob pattern? Expand and require at least one match.
1036        if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1037            let mut files: Vec<PathBuf> = glob::glob(loc)
1038                .map_err(|e| {
1039                    AppError::Internal(format!(
1040                        "dataset '{}': bad glob pattern '{loc}': {e}",
1041                        self.name
1042                    ))
1043                })?
1044                .filter_map(|r| r.ok())
1045                .filter(|p| {
1046                    p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1047                })
1048                .collect();
1049            files.sort();
1050            if files.is_empty() {
1051                return Err(AppError::Internal(format!(
1052                    "dataset '{}': glob '{loc}' matched no .parquet files",
1053                    self.name
1054                )));
1055            }
1056            return Ok(files);
1057        }
1058
1059        let path = Path::new(loc);
1060        if !path.exists() {
1061            return Err(AppError::Internal(format!(
1062                "dataset '{}': source path does not exist: {loc}",
1063                self.name
1064            )));
1065        }
1066
1067        if path.is_file() {
1068            if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1069                return Err(AppError::Internal(format!(
1070                    "dataset '{}': source must be a .parquet file",
1071                    self.name
1072                )));
1073            }
1074            return Ok(vec![path.to_path_buf()]);
1075        }
1076
1077        let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1078            .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1079            .filter_map(|entry| entry.ok().map(|e| e.path()))
1080            .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1081            .collect();
1082        files.sort();
1083        if files.is_empty() {
1084            return Err(AppError::Internal(format!(
1085                "dataset '{}': no *.parquet files found in {loc}",
1086                self.name
1087            )));
1088        }
1089        Ok(files)
1090    }
1091
1092    /// Env-var prefix derived from the dataset name: uppercase with
1093    /// non-alphanumeric chars replaced by `_`. E.g. `sales.eu-1` →
1094    /// `SALES_EU_1`.
1095    pub fn env_prefix(&self) -> String {
1096        self.name
1097            .chars()
1098            .map(|c| {
1099                if c.is_ascii_alphanumeric() {
1100                    c.to_ascii_uppercase()
1101                } else {
1102                    '_'
1103                }
1104            })
1105            .collect()
1106    }
1107
1108    /// Resolve S3 credentials following the precedence chain documented at
1109    /// the top of this module. Returns an empty struct when nothing was
1110    /// found — the caller should then leave credential resolution to the
1111    /// engine's default provider chain.
1112    pub fn resolved_creds(&self) -> ResolvedCreds {
1113        let prefix = self.env_prefix();
1114        let from_env = |suffix: &str| {
1115            std::env::var(format!("{prefix}_{suffix}"))
1116                .ok()
1117                .filter(|s| !s.is_empty())
1118        };
1119        let inline = self.s3.as_ref();
1120        let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1121
1122        ResolvedCreds {
1123            access_key_id: from_env("AWS_ACCESS_KEY_ID")
1124                .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1125                .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1126            secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1127                .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1128                .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1129            session_token: from_env("AWS_SESSION_TOKEN")
1130                .or_else(|| inline.and_then(|s| s.session_token.clone()))
1131                .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1132        }
1133    }
1134
1135    /// Resolved S3 region: per-dataset env (`${PREFIX}_AWS_REGION`)
1136    /// → inline → `AWS_REGION` → `AWS_DEFAULT_REGION` → `us-east-1`.
1137    pub fn resolved_region(&self) -> String {
1138        let prefix = self.env_prefix();
1139        std::env::var(format!("{prefix}_AWS_REGION"))
1140            .ok()
1141            .filter(|s| !s.is_empty())
1142            .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1143            .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1144            .or_else(|| {
1145                std::env::var("AWS_DEFAULT_REGION")
1146                    .ok()
1147                    .filter(|s| !s.is_empty())
1148            })
1149            .unwrap_or_else(|| "us-east-1".to_string())
1150    }
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155    use super::*;
1156
1157    #[test]
1158    fn server_defaults() {
1159        let s = ServerConfig::default();
1160        assert_eq!(s.backend, Backend::Datafusion);
1161        assert_eq!(s.port, 8080);
1162        assert!(s.compress);
1163        assert_eq!(s.max_body_bytes, 1024 * 1024);
1164        assert_eq!(s.max_page_size, 100_000);
1165        assert_eq!(s.request_timeout_ms, 30_000);
1166        assert!(!s.quack.enabled);
1167        assert_eq!(s.quack.uri, "quack:localhost");
1168        assert!(s.quack.token.is_none());
1169        assert!(!s.quack.allow_other_hostname);
1170        assert!(s.quack.read_only);
1171        assert_eq!(s.prefix, "");
1172        assert!(s.listen.is_loopback());
1173    }
1174
1175    #[test]
1176    fn server_overrides_from_toml() {
1177        let toml = r#"
1178            [server]
1179            backend = "duckdb"
1180            port = 9000
1181            prefix = "/datapress"
1182            compress = false
1183            max_body_bytes = 4096
1184            max_page_size = 50000
1185            request_timeout_ms = 0
1186
1187            [server.quack]
1188            enabled = true
1189            uri = "quack:localhost:9495"
1190            token = "test-token"
1191            read_only = false
1192            [[dataset]]
1193            name = "x"
1194            source.kind = "parquet"
1195            source.location = "/tmp/missing.parquet"
1196        "#;
1197        let cfg: AppConfig = toml::from_str(toml).unwrap();
1198        assert_eq!(cfg.server.backend, Backend::Duckdb);
1199        assert_eq!(cfg.server.port, 9000);
1200        assert_eq!(cfg.server.prefix, "/datapress");
1201        assert!(!cfg.server.compress);
1202        assert_eq!(cfg.server.max_body_bytes, 4096);
1203        assert_eq!(cfg.server.max_page_size, 50_000);
1204        assert_eq!(cfg.server.request_timeout_ms, 0);
1205        assert!(cfg.server.quack.enabled);
1206        assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1207        assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1208        assert!(!cfg.server.quack.read_only);
1209        assert_eq!(cfg.datasets.len(), 1);
1210        assert_eq!(cfg.datasets[0].name, "x");
1211        assert!(cfg.datasets[0].dict_encode); // default
1212    }
1213
1214    #[test]
1215    fn validate_rejects_bad_prefix() {
1216        let bad = ["no-leading-slash", "/trailing/"];
1217        for p in bad {
1218            let cfg = AppConfig {
1219                server: ServerConfig {
1220                    prefix: p.to_string(),
1221                    ..Default::default()
1222                },
1223                docs: DocsConfig::default(),
1224                swagger: SwaggerConfig::default(),
1225                metrics: MetricsConfig::default(),
1226                auth: AuthConfig::default(),
1227                datasets: vec![],
1228            };
1229            assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1230        }
1231    }
1232
1233    #[test]
1234    fn normalize_lowercases_configured_scopes() {
1235        let mut cfg = AppConfig {
1236            server: ServerConfig::default(),
1237            docs: DocsConfig::default(),
1238            swagger: SwaggerConfig::default(),
1239            metrics: MetricsConfig::default(),
1240            auth: AuthConfig {
1241                read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1242                reload_scopes: vec!["Datasets:Reload".into()],
1243                ..Default::default()
1244            },
1245            datasets: vec![],
1246        };
1247        cfg.normalize();
1248        assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1249        assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1250    }
1251
1252    #[test]
1253    fn validate_rejects_no_datasets() {
1254        let cfg = AppConfig {
1255            server: ServerConfig::default(),
1256            docs: DocsConfig::default(),
1257            swagger: SwaggerConfig::default(),
1258            metrics: MetricsConfig::default(),
1259            auth: AuthConfig::default(),
1260            datasets: vec![],
1261        };
1262        let err = cfg.validate().unwrap_err();
1263        assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1264    }
1265
1266    #[cfg(feature = "auth")]
1267    #[test]
1268    fn validate_accepts_auth_issuer_with_trailing_slash() {
1269        use std::io::Write;
1270
1271        let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1272        let _ = std::fs::remove_dir_all(&dir);
1273        std::fs::create_dir_all(&dir).unwrap();
1274        let file = dir.join("a.parquet");
1275        std::fs::File::create(&file)
1276            .unwrap()
1277            .write_all(b"x")
1278            .unwrap();
1279
1280        let cfg = AppConfig {
1281            server: ServerConfig::default(),
1282            docs: DocsConfig::default(),
1283            swagger: SwaggerConfig::default(),
1284            metrics: MetricsConfig::default(),
1285            auth: AuthConfig {
1286                enabled: true,
1287                issuer: "https://tenant.example.com/".into(),
1288                ..Default::default()
1289            },
1290            datasets: vec![DatasetConfig {
1291                name: "x".into(),
1292                source: SourceConfig {
1293                    kind: SourceKind::Parquet,
1294                    location: file.to_string_lossy().into_owned(),
1295                },
1296                s3: None,
1297                index: IndexConfig::default(),
1298                columns: vec![],
1299                dict_encode: true,
1300                lazy: false,
1301            }],
1302        };
1303
1304        assert!(cfg.validate().is_ok());
1305        let _ = std::fs::remove_dir_all(&dir);
1306    }
1307
1308    #[test]
1309    fn validate_rejects_quack_non_local_host_without_override() {
1310        let cfg = AppConfig {
1311            server: ServerConfig {
1312                quack: QuackConfig {
1313                    enabled: true,
1314                    uri: "quack:127.0.0.1".into(),
1315                    token: Some("test-token".into()),
1316                    ..Default::default()
1317                },
1318                ..Default::default()
1319            },
1320            docs: DocsConfig::default(),
1321            swagger: SwaggerConfig::default(),
1322            metrics: MetricsConfig::default(),
1323            auth: AuthConfig::default(),
1324            datasets: vec![DatasetConfig {
1325                name: "x".into(),
1326                source: SourceConfig {
1327                    kind: SourceKind::Parquet,
1328                    location: "/tmp/missing.parquet".into(),
1329                },
1330                s3: None,
1331                index: IndexConfig::default(),
1332                columns: vec![],
1333                dict_encode: true,
1334                lazy: false,
1335            }],
1336        };
1337        let err = cfg.validate().unwrap_err();
1338        assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1339    }
1340
1341    #[test]
1342    fn validate_rejects_bad_dataset_name() {
1343        let cfg: AppConfig = toml::from_str(
1344            r#"
1345            [[dataset]]
1346            name = "bad name!"
1347            source.kind = "parquet"
1348            source.location = "/tmp/whatever"
1349        "#,
1350        )
1351        .unwrap();
1352        let err = cfg.validate().unwrap_err();
1353        assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1354    }
1355
1356    #[test]
1357    fn validate_rejects_duplicate_names() {
1358        use std::io::Write;
1359        let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1360        let _ = std::fs::remove_dir_all(&dir);
1361        std::fs::create_dir_all(&dir).unwrap();
1362        let f = dir.join("a.parquet");
1363        std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1364        let path = f.to_str().unwrap();
1365
1366        let cfg: AppConfig = toml::from_str(&format!(
1367            r#"
1368            [[dataset]]
1369            name = "a"
1370            source.kind = "parquet"
1371            source.location = "{path}"
1372            [[dataset]]
1373            name = "a"
1374            source.kind = "parquet"
1375            source.location = "{path}"
1376        "#
1377        ))
1378        .unwrap();
1379        let err = cfg.validate().expect_err("expected error");
1380        assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1381
1382        let _ = std::fs::remove_dir_all(&dir);
1383    }
1384
1385    #[test]
1386    fn s3_bucket_parsing() {
1387        let mk = |loc: &str| SourceConfig {
1388            kind: SourceKind::Parquet,
1389            location: loc.into(),
1390        };
1391        let s1 = mk("s3://bucket/path/key");
1392        assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1393        let s2 = mk("s3://only-bucket");
1394        assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1395        assert!(mk("s3:///nokey").s3_bucket().is_err());
1396        assert!(mk("/local/path").s3_bucket().is_err());
1397    }
1398
1399    #[test]
1400    fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1401        let mk = |loc: &str| SourceConfig {
1402            kind: SourceKind::Parquet,
1403            location: loc.into(),
1404        };
1405        // Plain prefix -> recursive parquet glob (trailing slash trimmed).
1406        assert_eq!(
1407            mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1408            "s3://bucket/logs/**/*.parquet"
1409        );
1410        assert_eq!(
1411            mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1412            "s3://bucket/logs/**/*.parquet"
1413        );
1414        // Already globbed -> unchanged.
1415        assert_eq!(
1416            mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1417            "s3://bucket/logs/*.parquet"
1418        );
1419        // Non-S3 -> unchanged.
1420        assert_eq!(
1421            mk("/local/logs").s3_recursive_parquet_glob(),
1422            "/local/logs"
1423        );
1424    }
1425
1426    #[test]
1427    fn effective_endpoint_folds_bucket_per_mode() {
1428        let virt = S3Config {
1429            endpoint: Some("https://s3.example.com".into()),
1430            addressing_style: AddressingStyle::Virtual,
1431            ..Default::default()
1432        };
1433        // Auto + virtual -> bucket folded into host.
1434        assert_eq!(
1435            virt.effective_endpoint("mybucket").as_deref(),
1436            Some("https://mybucket.s3.example.com")
1437        );
1438        // Idempotent: already-prefixed host is left alone.
1439        let prefixed = S3Config {
1440            endpoint: Some("https://mybucket.s3.example.com".into()),
1441            ..virt.clone()
1442        };
1443        assert_eq!(
1444            prefixed.effective_endpoint("mybucket").as_deref(),
1445            Some("https://mybucket.s3.example.com")
1446        );
1447        // Path style (auto) -> host untouched.
1448        let path = S3Config {
1449            addressing_style: AddressingStyle::Path,
1450            ..virt.clone()
1451        };
1452        assert_eq!(
1453            path.effective_endpoint("mybucket").as_deref(),
1454            Some("https://s3.example.com")
1455        );
1456        // Explicit overrides win over addressing style.
1457        let forced_off = S3Config {
1458            endpoint_bucket_in_host: BucketInHost::False,
1459            ..virt.clone()
1460        };
1461        assert_eq!(
1462            forced_off.effective_endpoint("mybucket").as_deref(),
1463            Some("https://s3.example.com")
1464        );
1465        let forced_on = S3Config {
1466            endpoint_bucket_in_host: BucketInHost::True,
1467            ..path.clone()
1468        };
1469        assert_eq!(
1470            forced_on.effective_endpoint("mybucket").as_deref(),
1471            Some("https://mybucket.s3.example.com")
1472        );
1473        // No endpoint -> None.
1474        assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1475    }
1476
1477    #[test]
1478    fn env_prefix_sanitises_name() {
1479        let mk = |name: &str| DatasetConfig {
1480            name: name.into(),
1481            source: SourceConfig {
1482                kind: SourceKind::Parquet,
1483                location: "x".into(),
1484            },
1485            s3: None,
1486            index: IndexConfig::default(),
1487            columns: vec![],
1488            dict_encode: true,
1489            lazy: false,
1490        };
1491        assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1492        assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1493        assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1494    }
1495
1496    #[test]
1497    fn resolve_local_parquet_single_file_and_dir() {
1498        use std::io::Write;
1499        let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1500        let _ = std::fs::remove_dir_all(&dir);
1501        std::fs::create_dir_all(&dir).unwrap();
1502        let f = dir.join("a.parquet");
1503        let mut fh = std::fs::File::create(&f).unwrap();
1504        fh.write_all(b"not really parquet").unwrap();
1505
1506        let mk = |loc: &str| DatasetConfig {
1507            name: "ds".into(),
1508            source: SourceConfig {
1509                kind: SourceKind::Parquet,
1510                location: loc.into(),
1511            },
1512            s3: None,
1513            index: IndexConfig::default(),
1514            columns: vec![],
1515            dict_encode: true,
1516            lazy: false,
1517        };
1518
1519        // Direct file.
1520        let files = mk(f.to_str().unwrap())
1521            .resolve_local_parquet_files()
1522            .unwrap();
1523        assert_eq!(files, vec![f.clone()]);
1524
1525        // Directory.
1526        let files = mk(dir.to_str().unwrap())
1527            .resolve_local_parquet_files()
1528            .unwrap();
1529        assert_eq!(files, vec![f.clone()]);
1530
1531        // Missing path.
1532        assert!(
1533            mk("/no/such/place.parquet")
1534                .resolve_local_parquet_files()
1535                .is_err()
1536        );
1537
1538        let _ = std::fs::remove_dir_all(&dir);
1539    }
1540}