Skip to main content

datapress_core/
config.rs

1//! Runtime configuration loaded from `datasets.toml`.
2//!
3//! Each instance binds to a list of datasets. A dataset's `[dataset.source]`
4//! block selects the format (`parquet` or `delta`) and the location (a
5//! local path or an `s3://bucket/key` URL). When the location is on S3,
6//! an optional `[dataset.s3]` block carries non-secret connection details
7//! (region, endpoint, addressing style, …).
8//!
9//! Credentials are resolved at runtime via [`DatasetConfig::resolved_creds`]
10//! in this precedence order:
11//!
12//! 1. Per-dataset env vars `${PREFIX}_AWS_ACCESS_KEY_ID`,
13//!    `${PREFIX}_AWS_SECRET_ACCESS_KEY`, `${PREFIX}_AWS_SESSION_TOKEN`
14//!    where `${PREFIX}` is the dataset name uppercased with non-alphanumeric
15//!    characters replaced by `_` (e.g. `accidents` → `ACCIDENTS`,
16//!    `sales.eu-1` → `SALES_EU_1`).
17//! 2. Inline `access_key_id` / `secret_access_key` / `session_token` in the
18//!    `[dataset.s3]` block.
19//! 3. Plain `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` /
20//!    `AWS_SESSION_TOKEN`.
21//! 4. None — fall back to the engine's own provider chain
22//!    (`~/.aws/credentials`, IMDS, …).
23
24use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32/// Mount paths the user MUST NOT pick for `[docs].path` or
33/// `[swagger].path` — they would shadow first-party routes (probes,
34/// API scopes, root).
35const RESERVED_MOUNTS: &[&str] = &[
36    "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39// ---------------------------------------------------------------------------
40// Public types
41// ---------------------------------------------------------------------------
42
43#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45    #[serde(default)]
46    pub server: ServerConfig,
47    #[serde(default)]
48    pub docs: DocsConfig,
49    #[serde(default)]
50    pub swagger: SwaggerConfig,
51    #[serde(default)]
52    pub metrics: MetricsConfig,
53    #[serde(default)]
54    pub explorer: ExplorerConfig,
55    #[serde(default)]
56    pub sql: SqlConfig,
57    #[serde(default)]
58    pub datafusion: DataFusionConfig,
59    #[serde(default)]
60    pub auth: AuthConfig,
61    #[serde(rename = "dataset", default)]
62    pub datasets: Vec<DatasetConfig>,
63}
64
65#[derive(Debug, Deserialize)]
66#[serde(default)]
67pub struct ServerConfig {
68    /// Which engine to run. Must match the binary's compile-time feature.
69    pub backend: Backend,
70    /// Listen address. Defaults to loopback (127.0.0.1) — explicitly opt in
71    /// to 0.0.0.0 if you want to expose the port.
72    pub listen: IpAddr,
73    /// TCP port.
74    pub port: u16,
75    /// Number of actix worker threads. `None` (= unset) → one per CPU.
76    pub workers: Option<usize>,
77    /// Optional URL path prefix — useful when sitting behind a reverse
78    /// proxy that rewrites e.g. `/datapress/...` → `/...`. When set, every
79    /// route is mounted under this prefix (so the proxy can pass the URL
80    /// through unchanged). Must start with `/` and not end with `/`; the
81    /// empty string (default) means no prefix.
82    pub prefix: String,
83    /// Negotiate response compression (gzip / brotli / zstd) via the
84    /// `Accept-Encoding` request header. Enabled by default. Disable when
85    /// running behind a proxy that already compresses, or when the extra
86    /// CPU is not worth the bandwidth saving.
87    pub compress: bool,
88    /// Maximum accepted JSON request body size, in bytes. Larger bodies
89    /// are rejected with `413 Payload Too Large` before any handler runs.
90    /// Default `1 MiB`. Most query bodies are well under 10 KiB; this is
91    /// a DoS guard, not a tuning knob.
92    pub max_body_bytes: usize,
93    /// Maximum rows returned by a single `/query` page. Larger
94    /// `page_size` values are clamped before the backend runs.
95    /// Default `100_000`.
96    pub max_page_size: u64,
97    /// When > 0, any dataset whose backing files exceed this many
98    /// megabytes is forced into `lazy` mode at startup (streamed from
99    /// disk instead of materialised into RAM), even if `lazy` was not set
100    /// on the dataset. `0` (default) disables the size check. Local
101    /// sources are sized with a filesystem stat; on the DataFusion backend
102    /// S3 sources are sized by listing the object store under their prefix
103    /// (the DuckDB backend only sizes local sources — S3 datasets there
104    /// must opt in with an explicit `lazy = true`). Delta tables are
105    /// measured by summing their parquet data files.
106    pub force_lazy_above_mb: u64,
107    /// Per-request handler timeout, in milliseconds. If a handler hasn't
108    /// produced a response within this budget the request is aborted with
109    /// `504 Gateway Timeout`. Default `30_000` (30 s). Set `0` to disable.
110    pub request_timeout_ms: u64,
111    /// Grace period for in-flight requests after the server has received
112    /// `SIGTERM` / `SIGINT`, in seconds. The listening socket is closed
113    /// immediately; existing connections then have up to this many
114    /// seconds to finish before workers are force-stopped. Default `30`.
115    pub shutdown_timeout_secs: u64,
116    /// Optional DuckDB Quack remote SQL server. Only used by the DuckDB
117    /// backend; ignored by DataFusion.
118    pub quack: QuackConfig,
119}
120
121impl Default for ServerConfig {
122    fn default() -> Self {
123        Self {
124            backend: Backend::default(),
125            listen: IpAddr::from([127, 0, 0, 1]),
126            port: 8080,
127            workers: None,
128            prefix: String::new(),
129            compress: true,
130            max_body_bytes: 1024 * 1024,
131            max_page_size: 100_000,
132            force_lazy_above_mb: 0,
133            request_timeout_ms: 30_000,
134            shutdown_timeout_secs: 30,
135            quack: QuackConfig::default(),
136        }
137    }
138}
139
140/// Experimental DuckDB Quack remote protocol server.
141///
142/// Quack exposes the DuckDB SQL surface of the in-process database. Keep it
143/// disabled unless you intentionally want DuckDB clients to attach/query this
144/// process directly.
145#[derive(Debug, Clone, Deserialize)]
146#[serde(default)]
147pub struct QuackConfig {
148    /// Install/load the Quack extension and start `quack_serve` after
149    /// datasets are registered.
150    pub enabled: bool,
151    /// Quack URI to listen on. `quack:localhost` uses DuckDB's default
152    /// port 9494.
153    pub uri: String,
154    /// Optional explicit authentication token. If omitted, Quack generates
155    /// one at startup and DataPress logs it once.
156    pub token: Option<String>,
157    /// Allow binding a non-local hostname such as `quack:0.0.0.0:9494`.
158    /// For external exposure, put a TLS-terminating reverse proxy in front.
159    pub allow_other_hostname: bool,
160    /// Install a read-only authorization macro for remote queries. Enabled
161    /// by default to match DataPress' read-oriented HTTP API.
162    pub read_only: bool,
163}
164
165impl Default for QuackConfig {
166    fn default() -> Self {
167        Self {
168            enabled: false,
169            uri: "quack:localhost".into(),
170            token: None,
171            allow_other_hostname: false,
172            read_only: true,
173        }
174    }
175}
176
177impl QuackConfig {
178    /// Validate the enabled Quack configuration against DuckDB's current
179    /// safety rules. The extension treats only the literal `localhost` as
180    /// local unless `allow_other_hostname` is set.
181    pub fn validate_enabled(&self) -> Result<(), AppError> {
182        if self.uri.trim().is_empty() {
183            return Err(AppError::Internal(
184                "server.quack.uri must not be empty when server.quack.enabled = true".into(),
185            ));
186        }
187        if !self.uri.starts_with("quack:") {
188            return Err(AppError::Internal(format!(
189                "server.quack.uri must start with 'quack:' (got '{}')",
190                self.uri
191            )));
192        }
193        if !self.allow_other_hostname {
194            let host = self.hostname().unwrap_or_default();
195            if host != "localhost" {
196                return Err(AppError::Internal(format!(
197                    "server.quack.uri host must be 'localhost' unless \
198                     server.quack.allow_other_hostname = true (got '{}')",
199                    self.uri
200                )));
201            }
202        }
203        if let Some(token) = self.token.as_deref()
204            && token.len() < 4
205        {
206            return Err(AppError::Internal(
207                "server.quack.token must be at least 4 characters".into(),
208            ));
209        }
210        Ok(())
211    }
212
213    fn hostname(&self) -> Option<&str> {
214        let rest = self.uri.strip_prefix("quack:")?;
215        let rest = rest.strip_prefix("//").unwrap_or(rest);
216        let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
217        (!host.is_empty()).then_some(host)
218    }
219}
220
221#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
222#[serde(rename_all = "lowercase")]
223pub enum Backend {
224    #[default]
225    Datafusion,
226    Duckdb,
227}
228
229/// Embedded MkDocs documentation site (`[docs]` block).
230///
231/// Enabled by default — when the binary was built with the `docs`
232/// cargo feature, the site is served at [`DocsConfig::path`] out of
233/// the box. Set `enabled = false` in `datasets.toml` to suppress it
234/// (e.g. in prod). When the binary was built without the feature,
235/// `enabled = true` is harmless: the server logs a warning at startup
236/// and skips the mount. The mount path must be a non-trivial sub-path;
237/// reserved API and probe roots are rejected at startup.
238#[derive(Debug, Clone, Deserialize)]
239#[serde(default, deny_unknown_fields)]
240pub struct DocsConfig {
241    pub enabled: bool,
242    pub path: String,
243}
244
245impl Default for DocsConfig {
246    fn default() -> Self {
247        Self {
248            enabled: true,
249            path: "/mkdocs".into(),
250        }
251    }
252}
253
254/// Swagger UI + embedded OpenAPI spec (`[swagger]` block).
255///
256/// Enabled by default — when the binary was built with the `swagger`
257/// cargo feature, an interactive Swagger UI is served at
258/// [`SwaggerConfig::path`] (default `/docs`) and the raw OpenAPI JSON
259/// at `<path>/openapi.json`. Set `enabled = false` in `datasets.toml`
260/// to suppress it (e.g. in prod). When the binary was built without
261/// the feature, `enabled = true` is harmless: the server logs a
262/// warning at startup and skips the mount.
263///
264/// To let users sign in to the UI itself (Authorization Code + PKCE
265/// against any OIDC provider), populate the optional `[swagger.oauth2]`
266/// sub-block. Acquired tokens are attached as `Authorization: Bearer …`
267/// to every "Try it out" request — useful for exercising auth-protected
268/// endpoints from the docs page. This drives the UI only; it does not
269/// turn on server-side token validation.
270#[derive(Debug, Clone, Deserialize)]
271#[serde(default, deny_unknown_fields)]
272pub struct SwaggerConfig {
273    pub enabled: bool,
274    pub path: String,
275    pub oauth2: Option<SwaggerOAuth2Config>,
276}
277
278impl Default for SwaggerConfig {
279    fn default() -> Self {
280        Self {
281            enabled: true,
282            path: "/docs".into(),
283            oauth2: None,
284        }
285    }
286}
287
288/// OIDC single-sign-on for the Swagger UI (`[swagger.oauth2]`).
289///
290/// Configures the UI to drive an Authorization Code + PKCE flow against
291/// the given OIDC issuer. Swagger UI auto-discovers the authorize /
292/// token endpoints from `<issuer>/.well-known/openid-configuration`,
293/// so we don't need to pin them here.
294///
295/// All fields are required when the block is present — there is no
296/// sensible default for `issuer` or `client_id`.
297#[derive(Debug, Clone, Deserialize)]
298#[serde(deny_unknown_fields)]
299pub struct SwaggerOAuth2Config {
300    /// OIDC issuer URL, e.g.
301    /// `https://login.microsoftonline.com/<tenant>/v2.0` or
302    /// `https://accounts.google.com`. Must not end in `/`.
303    pub issuer: String,
304    /// Public OAuth2 client identifier registered with the IdP. The
305    /// client must be a SPA / public client (no secret) with
306    /// `https://<your-host>{swagger.path}/oauth2-redirect.html` listed
307    /// as an allowed redirect URI.
308    pub client_id: String,
309    /// Scopes to request by default. Will be pre-checked in the Swagger
310    /// UI authorize dialog; users can edit them before signing in.
311    /// `openid` is always added if missing.
312    #[serde(default)]
313    pub scopes: Vec<String>,
314    /// Use PKCE for the authorization code flow. Defaults to `true`;
315    /// disable only if your IdP doesn't support PKCE for public clients.
316    #[serde(default = "default_true")]
317    pub pkce: bool,
318}
319
320/// Prometheus metrics endpoint (`[metrics]` block).
321///
322/// Disabled by default. When `enabled = true` (and the binary was built
323/// with the `metrics` cargo feature), the server installs a middleware
324/// that records per-request HTTP counters and latency histograms, and
325/// exposes them in the Prometheus text exposition format at
326/// [`MetricsConfig::path`] (default `/metrics`).
327///
328/// The endpoint is mounted at a fixed, *unprefixed* path — like the
329/// health probes — so a scrape config doesn't need to know about any
330/// reverse-proxy `server.prefix`. It is **not** behind the `[auth]`
331/// layer: Prometheus scrapers rarely carry bearer tokens, and the
332/// endpoint exposes only aggregate request metrics (no row data). Keep
333/// it on a network the scraper can reach but the public cannot, e.g. by
334/// binding `server.listen` to a private interface.
335///
336/// When the binary was built without the `metrics` feature,
337/// `enabled = true` is harmless: the server logs a warning at startup
338/// and skips the endpoint.
339#[derive(Debug, Clone, Deserialize)]
340#[serde(default, deny_unknown_fields)]
341pub struct MetricsConfig {
342    pub enabled: bool,
343    pub path: String,
344}
345
346impl Default for MetricsConfig {
347    fn default() -> Self {
348        Self {
349            enabled: false,
350            path: "/metrics".into(),
351        }
352    }
353}
354
355/// Embedded dataset explorer UI (`[explorer]` block).
356///
357/// A server-rendered web app (Actix + Askama templates + htmx +
358/// Bootstrap) served at [`ExplorerConfig::path`] (default `/explore`).
359/// It offers a *discovery* view — per-dataset stats, schema, index and
360/// source configuration — and an in-browser *DuckDB* console (DuckDB-WASM)
361/// that queries each dataset's Parquet export directly.
362///
363/// Enabled by default. When the binary was built without the `explorer`
364/// cargo feature, `enabled = true` is harmless: the server logs a warning
365/// at startup and skips the mount. Set `enabled = false` to suppress it at
366/// runtime even when the feature is compiled in.
367#[derive(Debug, Clone, Deserialize)]
368#[serde(default, deny_unknown_fields)]
369pub struct ExplorerConfig {
370    pub enabled: bool,
371    pub path: String,
372}
373
374impl Default for ExplorerConfig {
375    fn default() -> Self {
376        Self {
377            enabled: true,
378            path: "/explore".into(),
379        }
380    }
381}
382
383/// Raw-SQL query endpoint (`[sql]` block).
384///
385/// Exposes `POST /api/v1/sql`, which accepts an arbitrary read-only
386/// `SELECT` in the request body and runs it against the engine. **Off by
387/// default** — raw SQL is a larger attack surface than the structured
388/// `/query` endpoint, so it must be opted into explicitly.
389///
390/// Phase 1 is scoped to a *single* dataset per query: the statement may
391/// reference at most one registered dataset (and no others / no files),
392/// enforced by a parse-time table allowlist. Cross-dataset joins are a
393/// future extension.
394///
395/// Safety rails applied to every accepted statement:
396/// - exactly one statement, and it must be a read-only `SELECT` / `WITH`,
397/// - every referenced table must be a registered dataset (no file
398///   functions, no `ATTACH`/`COPY`/`PRAGMA`/DDL/DML),
399/// - the result is hard-capped at [`SqlConfig::max_rows`] rows.
400#[derive(Debug, Clone, Deserialize)]
401#[serde(default, deny_unknown_fields)]
402pub struct SqlConfig {
403    /// Enable the `POST /api/v1/sql` endpoint. Default `false`.
404    pub enabled: bool,
405    /// Hard cap on the number of rows a single SQL query may return.
406    /// The query result is wrapped in an outer `LIMIT` so this bound is
407    /// enforced regardless of the user's own `LIMIT`. Default `100_000`.
408    pub max_rows: u64,
409}
410
411impl Default for SqlConfig {
412    fn default() -> Self {
413        Self {
414            enabled: false,
415            max_rows: 100_000,
416        }
417    }
418}
419
420/// DataFusion backend performance tuning (`[datafusion]` block).
421///
422/// Every knob is **off / stock by default**, so the backend behaves exactly
423/// like DataFusion out of the box unless you opt in. These mainly help lazy
424/// (`lazy = true`) parquet datasets, especially on object storage. Ignored by
425/// the DuckDB backend.
426#[derive(Debug, Clone, Deserialize)]
427#[serde(default, deny_unknown_fields)]
428pub struct DataFusionConfig {
429    /// Push row-level filters down into the parquet decoder so rows that fail
430    /// a predicate are never materialised (in addition to the row-group /
431    /// page-index pruning that always happens). DataFusion default is `false`
432    /// because for some workloads the extra per-row evaluation is not worth
433    /// it; turn it on for selective filters over large row groups.
434    pub pushdown_filters: bool,
435    /// Let the parquet scan reorder pushed-down predicates by estimated
436    /// selectivity. Only has an effect together with `pushdown_filters`.
437    /// DataFusion default is `false`.
438    pub reorder_filters: bool,
439    /// Cache object-store file listings on the shared runtime so repeated
440    /// lazy queries reuse `LIST` results instead of re-listing the source
441    /// prefix every time — the dominant per-query cost on S3. Default `false`.
442    pub list_files_cache: bool,
443    /// Memory budget for the file-listing cache, in MiB. Only used when
444    /// `list_files_cache = true`. Default `64`.
445    pub list_files_cache_mb: usize,
446    /// How long a cached listing stays valid, in seconds. Bounds how long it
447    /// takes for newly written files to become visible without an explicit
448    /// reload. `0` means no expiry (infinite). Default `60`.
449    pub list_files_cache_ttl_secs: u64,
450}
451
452impl Default for DataFusionConfig {
453    fn default() -> Self {
454        Self {
455            pushdown_filters: false,
456            reorder_filters: false,
457            list_files_cache: false,
458            list_files_cache_mb: 64,
459            list_files_cache_ttl_secs: 60,
460        }
461    }
462}
463
464/// OIDC bearer-token enforcement for the HTTP API (`[auth]` block).
465///
466/// Disabled by default. When `enabled = true`, the server validates
467/// every request's `Authorization: Bearer …` JWT against the JWKS
468/// discovered from the issuer's OIDC metadata
469/// (`<issuer>/.well-known/openid-configuration` → `jwks_uri`), then
470/// enforces the configured scope requirements per route.
471///
472/// Only compiled in when the binary was built with the `auth` cargo
473/// feature. Without the feature, `enabled = true` is rejected at
474/// startup so a misconfigured production deployment can't silently
475/// fall back to "no auth".
476///
477/// The Swagger UI's SSO support (`[swagger.oauth2]`) is *independent*
478/// of this block — `[swagger.oauth2]` only drives the UI's login
479/// dialog; `[auth]` is what enforces tokens on the API.
480#[derive(Debug, Clone, Deserialize)]
481#[serde(default, deny_unknown_fields)]
482pub struct AuthConfig {
483    /// Master switch. `false` (default) skips all auth processing.
484    pub enabled: bool,
485    /// OIDC issuer URL — must match the `iss` claim of every accepted
486    /// token. Required when `enabled = true`.
487    pub issuer: String,
488    /// Expected `aud` claim. When empty, audience validation is
489    /// skipped (not recommended in production).
490    pub audience: String,
491    /// Scopes a caller must hold to read datasets (GET endpoints +
492    /// POST `…/query` and `…/count`). Empty list means "no scope check,
493    /// just a valid token is enough".
494    pub read_scopes: Vec<String>,
495    /// Scopes required for admin/mutation endpoints (POST `…/reload`).
496    /// Empty list means "no scope check, just a valid token is enough".
497    pub reload_scopes: Vec<String>,
498    /// Allow unauthenticated GETs through. Useful for public datasets
499    /// and demo deployments. Defaults to `false`.
500    pub anonymous_read: bool,
501    /// Continue serving even if the JWKS fetch fails at startup.
502    /// When `true` (default), the server starts in a degraded mode that
503    /// rejects every auth'd request with 503 until JWKS becomes
504    /// reachable. When `false`, startup fails outright.
505    pub start_degraded: bool,
506    /// Allowed signing algorithms. Pinned to RS256 by default; never
507    /// include `HS*` or `none` here unless you really know what you're
508    /// doing.
509    pub algorithms: Vec<String>,
510    /// Clock-skew leeway for `exp`/`nbf` checks, in seconds.
511    pub leeway_secs: u64,
512    /// How often (in seconds) the background refresher re-fetches the
513    /// JWKS. On a `kid` cache miss the JWKS is also refreshed
514    /// out-of-band.
515    pub jwks_refresh_secs: u64,
516    /// Optional JSON-pointer into the JWT claims that extracts a
517    /// tenant identifier — attached to the principal and logged on
518    /// every request. Example: `"/tid"` (Azure AD), `"/org_id"`.
519    /// When empty, no tenant is extracted.
520    pub tenant_claim: String,
521    /// If non-empty, requests whose extracted tenant ID is not in this
522    /// list are rejected with 403. Has no effect when `tenant_claim`
523    /// is empty.
524    pub allowed_tenants: Vec<String>,
525    /// If `true`, `POST …/reload` accepts *either* a valid token with
526    /// `reload_scopes` *or* the legacy `X-Admin-Token` header. Defaults
527    /// to `true` for one-release backwards compatibility — flip to
528    /// `false` once your automation has migrated to OIDC.
529    pub admin_token_fallback: bool,
530}
531
532impl Default for AuthConfig {
533    fn default() -> Self {
534        Self {
535            enabled: false,
536            issuer: String::new(),
537            audience: String::new(),
538            read_scopes: Vec::new(),
539            reload_scopes: Vec::new(),
540            anonymous_read: false,
541            start_degraded: true,
542            algorithms: vec!["RS256".into()],
543            leeway_secs: 60,
544            jwks_refresh_secs: 3600,
545            tenant_claim: String::new(),
546            allowed_tenants: Vec::new(),
547            admin_token_fallback: true,
548        }
549    }
550}
551
552impl Backend {
553    pub fn as_str(self) -> &'static str {
554        match self {
555            Backend::Datafusion => "datafusion",
556            Backend::Duckdb => "duckdb",
557        }
558    }
559}
560
561#[derive(Debug, Clone, Deserialize)]
562pub struct DatasetConfig {
563    pub name: String,
564    pub source: SourceConfig,
565    #[serde(default)]
566    pub s3: Option<S3Config>,
567    #[serde(default)]
568    pub index: IndexConfig,
569    /// Optional column projection applied at load time. When non-empty,
570    /// only the listed columns are read from the parquet/delta source —
571    /// every other column is skipped entirely (no decode, no allocation,
572    /// no resident memory). Empty (default) = read all columns. Names are
573    /// matched case-insensitively against the source schema.
574    #[serde(default)]
575    pub columns: Vec<String>,
576    /// When `true` (default), Utf8 columns that are dictionary-encoded in
577    /// the source parquet are read as Arrow `Dictionary(Int32, Utf8)`
578    /// instead of being expanded to plain Utf8. Massively cheaper in RAM
579    /// for low-cardinality columns. Set to `false` to bypass the override
580    /// — useful as a workaround if you observe null-handling oddities on
581    /// a particular parquet file.
582    #[serde(default = "default_true")]
583    pub dict_encode: bool,
584    /// When `true`, the backend should keep the dataset on disk and stream
585    /// it at query time instead of materialising it into RAM at startup.
586    /// Trades the in-memory hot paths (raw Arrow slice, equality index)
587    /// for bounded memory use on large / multi-file sources. Honoured by
588    /// the DataFusion backend (local + S3 parquet) and by the DuckDB
589    /// backend, which registers the dataset as a view over the source scan
590    /// (local + S3 parquet, and delta) rather than materialising a table.
591    #[serde(default)]
592    pub lazy: bool,
593}
594
595fn default_true() -> bool {
596    true
597}
598
599#[derive(Debug, Clone, Deserialize)]
600pub struct SourceConfig {
601    pub kind: SourceKind,
602    /// Either a local filesystem path or an `s3://bucket/key` URL.
603    pub location: String,
604}
605
606#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
607#[serde(rename_all = "lowercase")]
608pub enum SourceKind {
609    #[default]
610    Parquet,
611    Delta,
612}
613
614impl SourceKind {
615    pub fn as_str(self) -> &'static str {
616        match self {
617            SourceKind::Parquet => "parquet",
618            SourceKind::Delta => "delta",
619        }
620    }
621}
622
623/// Non-secret S3 connection settings. Credentials are pulled from env / the
624/// AWS credential chain — see [`DatasetConfig::resolved_creds`].
625#[derive(Debug, Clone, Deserialize)]
626#[serde(default)]
627pub struct S3Config {
628    pub region: Option<String>,
629    /// Custom endpoint (MinIO, R2, Wasabi, LocalStack, …). Omit for AWS.
630    pub endpoint: Option<String>,
631    /// `virtual` (default — `bucket.host`) or `path` (`host/bucket/`).
632    /// MinIO and most non-AWS providers require `path`.
633    pub addressing_style: AddressingStyle,
634    /// Allow plain-HTTP endpoints. Required for local MinIO over `http://…`.
635    pub allow_http: bool,
636    /// Inline credentials. Strongly discouraged in production — prefer env
637    /// vars (see module docs).
638    pub access_key_id: Option<String>,
639    pub secret_access_key: Option<String>,
640    pub session_token: Option<String>,
641    /// Hive partition-column handling for parquet sources. Defaults to
642    /// `auto` (detect from the path). See [`Partitioning`].
643    pub partitioning: Partitioning,
644    /// Whether the bucket is folded into a custom `endpoint` host for
645    /// virtual-hosted-style requests. Defaults to `auto`. See
646    /// [`BucketInHost`].
647    pub endpoint_bucket_in_host: BucketInHost,
648}
649
650impl Default for S3Config {
651    fn default() -> Self {
652        Self {
653            region: None,
654            endpoint: None,
655            addressing_style: AddressingStyle::Virtual,
656            allow_http: false,
657            access_key_id: None,
658            secret_access_key: None,
659            session_token: None,
660            partitioning: Partitioning::Auto,
661            endpoint_bucket_in_host: BucketInHost::Auto,
662        }
663    }
664}
665
666impl S3Config {
667    /// Resolve the endpoint URL to hand to the object store, optionally
668    /// folding `bucket` into the host for virtual-hosted-style requests so a
669    /// plain `endpoint` works the same way it does on DuckDB.
670    ///
671    /// Returns `None` when no custom endpoint is configured (AWS default).
672    /// The bucket is only prepended when it isn't already the leading host
673    /// label, so re-running this (or a config that already embeds the
674    /// bucket) never produces `bucket.bucket.host`.
675    pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
676        let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
677
678        let fold = match self.endpoint_bucket_in_host {
679            BucketInHost::False => false,
680            BucketInHost::True => true,
681            BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
682        };
683        if !fold {
684            return Some(ep.to_string());
685        }
686
687        let (scheme, host_and_path) = match ep.split_once("://") {
688            Some((s, rest)) => (Some(s), rest),
689            None => (None, ep),
690        };
691        // Split host from any trailing path so we prefix the host label only.
692        let (host, path) = match host_and_path.split_once('/') {
693            Some((h, p)) => (h, Some(p)),
694            None => (host_and_path, None),
695        };
696        // Guard against double-prefixing.
697        if host == bucket || host.starts_with(&format!("{bucket}.")) {
698            return Some(ep.to_string());
699        }
700        let new_host = format!("{bucket}.{host}");
701        let rebuilt = match (scheme, path) {
702            (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
703            (Some(s), None) => format!("{s}://{new_host}"),
704            (None, Some(p)) => format!("{new_host}/{p}"),
705            (None, None) => new_host,
706        };
707        Some(rebuilt)
708    }
709}
710
711#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
712#[serde(rename_all = "lowercase")]
713pub enum AddressingStyle {
714    #[default]
715    Virtual,
716    Path,
717}
718
719impl AddressingStyle {
720    pub fn as_str(self) -> &'static str {
721        match self {
722            AddressingStyle::Virtual => "virtual",
723            AddressingStyle::Path => "path",
724        }
725    }
726}
727
728/// How hive-style partition columns (`key=value/` path segments) are handled
729/// for an S3 parquet source. Local parquet always auto-detects; this option
730/// brings S3 in line and lets you force or disable the behaviour.
731#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
732#[serde(rename_all = "lowercase")]
733pub enum Partitioning {
734    /// Detect `key=value` segments from the location glob or by listing the
735    /// prefix. No partition columns are added when none are found.
736    #[default]
737    Auto,
738    /// Force hive partitioning. Partition keys are taken from the location
739    /// glob, or discovered by listing the prefix.
740    Hive,
741    /// Treat the source as a flat set of parquet files — never add partition
742    /// columns even if the path looks hive-partitioned.
743    None,
744}
745
746impl Partitioning {
747    pub fn as_str(self) -> &'static str {
748        match self {
749            Partitioning::Auto => "auto",
750            Partitioning::Hive => "hive",
751            Partitioning::None => "none",
752        }
753    }
754}
755
756/// Whether the bucket name is folded into the endpoint hostname for
757/// virtual-hosted-style requests against a custom endpoint. This aligns the
758/// DataFusion object-store path with DuckDB, which builds the virtual host
759/// itself — so the same plain `endpoint` works on both backends.
760#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
761#[serde(rename_all = "lowercase")]
762pub enum BucketInHost {
763    /// Fold the bucket into the host when `addressing_style = "virtual"` and
764    /// a custom `endpoint` is set (guarded against double-prefixing).
765    #[default]
766    Auto,
767    /// Always fold the bucket into the endpoint host.
768    True,
769    /// Never rewrite the endpoint — pass it through verbatim.
770    False,
771}
772
773impl BucketInHost {
774    pub fn as_str(self) -> &'static str {
775        match self {
776            BucketInHost::Auto => "auto",
777            BucketInHost::True => "true",
778            BucketInHost::False => "false",
779        }
780    }
781}
782
783#[derive(Debug, Clone, Deserialize)]
784#[serde(default)]
785pub struct IndexConfig {
786    pub mode: IndexMode,
787    pub columns: Vec<String>,
788    pub max_cardinality: usize,
789}
790
791impl Default for IndexConfig {
792    fn default() -> Self {
793        Self {
794            mode: IndexMode::Auto,
795            columns: Vec::new(),
796            max_cardinality: 100_000,
797        }
798    }
799}
800
801#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
802#[serde(rename_all = "lowercase")]
803pub enum IndexMode {
804    #[default]
805    Auto,
806    None,
807    List,
808}
809
810/// Resolved S3 credentials. `None` fields mean "let the engine's default
811/// provider chain figure it out".
812#[derive(Debug, Clone, Default)]
813pub struct ResolvedCreds {
814    pub access_key_id: Option<String>,
815    pub secret_access_key: Option<String>,
816    pub session_token: Option<String>,
817}
818
819impl ResolvedCreds {
820    pub fn has_keypair(&self) -> bool {
821        self.access_key_id.is_some() && self.secret_access_key.is_some()
822    }
823}
824
825// ---------------------------------------------------------------------------
826// Loading + validation
827// ---------------------------------------------------------------------------
828
829impl AppConfig {
830    /// Read and validate a TOML config file.
831    pub fn load(path: &str) -> Result<Self, AppError> {
832        let raw = std::fs::read_to_string(path)
833            .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
834        let mut cfg: AppConfig =
835            toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
836        cfg.normalize();
837        cfg.validate()?;
838        Ok(cfg)
839    }
840
841    /// Canonicalise fields that are compared case-insensitively at runtime.
842    ///
843    /// Token scopes are lowercased when parsed out of a JWT (see `auth.rs`),
844    /// so the configured `read_scopes` / `reload_scopes` are lowercased here
845    /// once at load time. Without this an operator who writes
846    /// `"Datasets:Read"` would silently 403 every caller, since the token
847    /// side would have become `datasets:read`.
848    fn normalize(&mut self) {
849        for s in self
850            .auth
851            .read_scopes
852            .iter_mut()
853            .chain(self.auth.reload_scopes.iter_mut())
854        {
855            *s = s.to_ascii_lowercase();
856        }
857    }
858
859    fn validate(&self) -> Result<(), AppError> {
860        // Server prefix: empty, or must start with '/' and not end with '/'.
861        let p = &self.server.prefix;
862        if !p.is_empty() {
863            if !p.starts_with('/') {
864                return Err(AppError::Internal(format!(
865                    "server.prefix must start with '/' (got '{p}')"
866                )));
867            }
868            if p.ends_with('/') {
869                return Err(AppError::Internal(format!(
870                    "server.prefix must not end with '/' (got '{p}')"
871                )));
872            }
873        }
874
875        if self.datasets.is_empty() {
876            return Err(AppError::Internal(
877                "datasets.toml has no [[dataset]] entries".into(),
878            ));
879        }
880
881        if self.server.quack.enabled {
882            self.server.quack.validate_enabled()?;
883        }
884
885        // Validate the docs mount path even when the section is disabled,
886        // so an inactive config typo can't go unnoticed.
887        {
888            let dp = &self.docs.path;
889            if !dp.starts_with('/') {
890                return Err(AppError::Internal(format!(
891                    "docs.path must start with '/' (got '{dp}')"
892                )));
893            }
894            if dp.len() > 1 && dp.ends_with('/') {
895                return Err(AppError::Internal(format!(
896                    "docs.path must not end with '/' (got '{dp}')"
897                )));
898            }
899            if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
900                return Err(AppError::Internal(format!(
901                    "docs.path '{dp}' collides with a reserved route"
902                )));
903            }
904        }
905
906        // Same for the swagger UI mount.
907        {
908            let sp = &self.swagger.path;
909            if !sp.starts_with('/') {
910                return Err(AppError::Internal(format!(
911                    "swagger.path must start with '/' (got '{sp}')"
912                )));
913            }
914            if sp.len() > 1 && sp.ends_with('/') {
915                return Err(AppError::Internal(format!(
916                    "swagger.path must not end with '/' (got '{sp}')"
917                )));
918            }
919            if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
920                return Err(AppError::Internal(format!(
921                    "swagger.path '{sp}' collides with a reserved route"
922                )));
923            }
924            if sp == &self.docs.path {
925                return Err(AppError::Internal(format!(
926                    "swagger.path and docs.path must differ (both '{sp}')"
927                )));
928            }
929            if let Some(o) = &self.swagger.oauth2 {
930                if o.issuer.trim().is_empty() {
931                    return Err(AppError::Internal(
932                        "swagger.oauth2.issuer must not be empty".into(),
933                    ));
934                }
935                if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
936                    return Err(AppError::Internal(format!(
937                        "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
938                        o.issuer
939                    )));
940                }
941                if o.client_id.trim().is_empty() {
942                    return Err(AppError::Internal(
943                        "swagger.oauth2.client_id must not be empty".into(),
944                    ));
945                }
946            }
947        }
948
949        // Metrics endpoint mount path. Validated even when disabled so an
950        // inactive config typo can't go unnoticed. `/metrics` is itself a
951        // reserved mount (so docs/swagger can't shadow it), so we check the
952        // remaining reserved routes — and the docs/swagger paths — for
953        // collisions rather than the whole list.
954        {
955            let mp = &self.metrics.path;
956            if !mp.starts_with('/') {
957                return Err(AppError::Internal(format!(
958                    "metrics.path must start with '/' (got '{mp}')"
959                )));
960            }
961            if mp.len() > 1 && mp.ends_with('/') {
962                return Err(AppError::Internal(format!(
963                    "metrics.path must not end with '/' (got '{mp}')"
964                )));
965            }
966            if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
967                return Err(AppError::Internal(format!(
968                    "metrics.path '{mp}' collides with a reserved route"
969                )));
970            }
971            if mp == &self.docs.path {
972                return Err(AppError::Internal(format!(
973                    "metrics.path and docs.path must differ (both '{mp}')"
974                )));
975            }
976            if mp == &self.swagger.path {
977                return Err(AppError::Internal(format!(
978                    "metrics.path and swagger.path must differ (both '{mp}')"
979                )));
980            }
981        }
982
983        // Explorer UI mount path. Validated even when disabled so an
984        // inactive config typo can't go unnoticed.
985        {
986            let ep = &self.explorer.path;
987            if !ep.starts_with('/') {
988                return Err(AppError::Internal(format!(
989                    "explorer.path must start with '/' (got '{ep}')"
990                )));
991            }
992            if ep.len() > 1 && ep.ends_with('/') {
993                return Err(AppError::Internal(format!(
994                    "explorer.path must not end with '/' (got '{ep}')"
995                )));
996            }
997            if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
998                return Err(AppError::Internal(format!(
999                    "explorer.path '{ep}' collides with a reserved route"
1000                )));
1001            }
1002            if ep == &self.docs.path {
1003                return Err(AppError::Internal(format!(
1004                    "explorer.path and docs.path must differ (both '{ep}')"
1005                )));
1006            }
1007            if ep == &self.swagger.path {
1008                return Err(AppError::Internal(format!(
1009                    "explorer.path and swagger.path must differ (both '{ep}')"
1010                )));
1011            }
1012            if ep == &self.metrics.path {
1013                return Err(AppError::Internal(format!(
1014                    "explorer.path and metrics.path must differ (both '{ep}')"
1015                )));
1016            }
1017        }
1018
1019        // Auth block — only meaningful when `enabled = true`. The cargo
1020        // feature gate is enforced separately in `server::serve` so a
1021        // binary built without `--features auth` and a config with
1022        // `auth.enabled = true` aborts with a clear error.
1023        if self.auth.enabled {
1024            let a = &self.auth;
1025            if a.issuer.trim().is_empty() {
1026                return Err(AppError::Internal(
1027                    "auth.issuer must not be empty when auth.enabled = true".into(),
1028                ));
1029            }
1030            if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
1031                return Err(AppError::Internal(format!(
1032                    "auth.issuer must be an absolute http(s) URL (got '{}')",
1033                    a.issuer
1034                )));
1035            }
1036            for alg in &a.algorithms {
1037                match alg.as_str() {
1038                    "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
1039                    | "PS512" => {}
1040                    other => {
1041                        return Err(AppError::Internal(format!(
1042                            "auth.algorithms[{other}] is not allowed; pick one of \
1043                         RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
1044                        )));
1045                    }
1046                }
1047            }
1048            if a.algorithms.is_empty() {
1049                return Err(AppError::Internal(
1050                    "auth.algorithms must not be empty".into(),
1051                ));
1052            }
1053            if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
1054                return Err(AppError::Internal(format!(
1055                    "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
1056                    a.tenant_claim
1057                )));
1058            }
1059            if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
1060                return Err(AppError::Internal(
1061                    "auth.allowed_tenants is set but auth.tenant_claim is empty — \
1062                     can't enforce a tenant allow-list without a claim to extract from"
1063                        .into(),
1064                ));
1065            }
1066        }
1067
1068        let mut seen = HashSet::new();
1069        for d in &self.datasets {
1070            if !seen.insert(d.name.as_str()) {
1071                return Err(AppError::Internal(format!(
1072                    "duplicate dataset name: {}",
1073                    d.name
1074                )));
1075            }
1076            if d.name.is_empty() {
1077                return Err(AppError::Internal("dataset name must not be empty".into()));
1078            }
1079            // URL-safe: alphanum + _ - .
1080            if !d
1081                .name
1082                .chars()
1083                .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1084            {
1085                return Err(AppError::Internal(format!(
1086                    "dataset name '{}' must be alphanumeric (plus _ - .)",
1087                    d.name
1088                )));
1089            }
1090
1091            if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
1092                return Err(AppError::Internal(format!(
1093                    "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1094                    d.name
1095                )));
1096            }
1097
1098            // Location-specific checks.
1099            if d.source.is_s3() {
1100                d.source.s3_bucket()?;
1101                if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1102                    && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1103                    && std::env::var("AWS_REGION").is_err()
1104                    && std::env::var("AWS_DEFAULT_REGION").is_err()
1105                {
1106                    log::warn!(
1107                        "dataset '{}': S3 source without explicit region — \
1108                         relying on AWS_REGION env var",
1109                        d.name
1110                    );
1111                }
1112            } else {
1113                // Local path. For parquet we can fully resolve to a file
1114                // list up front; for delta we only check that the directory
1115                // exists (delta has its own layout — _delta_log/, …).
1116                match d.source.kind {
1117                    SourceKind::Parquet => {
1118                        d.resolve_local_parquet_files()?;
1119                    }
1120                    SourceKind::Delta => {
1121                        let p = Path::new(&d.source.location);
1122                        if !p.exists() {
1123                            return Err(AppError::Internal(format!(
1124                                "dataset '{}': delta location does not exist: {}",
1125                                d.name, d.source.location
1126                            )));
1127                        }
1128                    }
1129                }
1130            }
1131        }
1132        Ok(())
1133    }
1134}
1135
1136impl SourceConfig {
1137    pub fn is_s3(&self) -> bool {
1138        self.location.starts_with("s3://")
1139    }
1140
1141    /// True when the location already contains a glob metacharacter
1142    /// (`*`, `?`, or `[`).
1143    pub fn has_glob(&self) -> bool {
1144        self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1145    }
1146
1147    /// Location to hand to a backend that needs an explicit parquet glob
1148    /// (DuckDB). When the location is a plain S3 prefix with no glob, append
1149    /// a recursive `**/*.parquet` so DuckDB lists the prefix the same way
1150    /// DataFusion's object-store listing does. Globbed or non-S3 locations
1151    /// are returned unchanged.
1152    pub fn s3_recursive_parquet_glob(&self) -> String {
1153        if !self.is_s3() || self.has_glob() {
1154            return self.location.clone();
1155        }
1156        let trimmed = self.location.trim_end_matches('/');
1157        format!("{trimmed}/**/*.parquet")
1158    }
1159
1160    /// Returns `(bucket, key_prefix_or_empty)` for an `s3://…` location.
1161    pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1162        let rest = self
1163            .location
1164            .strip_prefix("s3://")
1165            .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1166        let (bucket, key) = match rest.split_once('/') {
1167            Some((b, k)) => (b, k),
1168            None => (rest, ""),
1169        };
1170        if bucket.is_empty() {
1171            return Err(AppError::Internal(format!(
1172                "s3 URL missing bucket: {}",
1173                self.location
1174            )));
1175        }
1176        Ok((bucket, key))
1177    }
1178}
1179
1180impl DatasetConfig {
1181    /// Expand `source.location` to a concrete list of local `.parquet`
1182    /// files. Only valid for `kind = parquet` on local paths — S3 and
1183    /// Delta sources are resolved by the backend itself.
1184    ///
1185    /// Accepts three location shapes:
1186    ///   * a single `*.parquet` file
1187    ///   * a directory (lists every `*.parquet` directly inside, non-recursive)
1188    ///   * a glob pattern containing `*`, `?` or `[…]` (e.g.
1189    ///     `data/year=2024/*.parquet`, `data/**/*.parquet`)
1190    pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1191        if self.source.is_s3() {
1192            return Err(AppError::Internal(format!(
1193                "dataset '{}': resolve_local_parquet_files called on s3 source",
1194                self.name
1195            )));
1196        }
1197        let loc = &self.source.location;
1198
1199        // Glob pattern? Expand and require at least one match.
1200        if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1201            let mut files: Vec<PathBuf> = glob::glob(loc)
1202                .map_err(|e| {
1203                    AppError::Internal(format!(
1204                        "dataset '{}': bad glob pattern '{loc}': {e}",
1205                        self.name
1206                    ))
1207                })?
1208                .filter_map(|r| r.ok())
1209                .filter(|p| {
1210                    p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1211                })
1212                .collect();
1213            files.sort();
1214            if files.is_empty() {
1215                return Err(AppError::EmptyDataset(format!(
1216                    "dataset '{}': glob '{loc}' matched no .parquet files",
1217                    self.name
1218                )));
1219            }
1220            return Ok(files);
1221        }
1222
1223        let path = Path::new(loc);
1224        if !path.exists() {
1225            return Err(AppError::Internal(format!(
1226                "dataset '{}': source path does not exist: {loc}",
1227                self.name
1228            )));
1229        }
1230
1231        if path.is_file() {
1232            if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1233                return Err(AppError::Internal(format!(
1234                    "dataset '{}': source must be a .parquet file",
1235                    self.name
1236                )));
1237            }
1238            return Ok(vec![path.to_path_buf()]);
1239        }
1240
1241        let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1242            .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1243            .filter_map(|entry| entry.ok().map(|e| e.path()))
1244            .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1245            .collect();
1246        files.sort();
1247        if files.is_empty() {
1248            return Err(AppError::EmptyDataset(format!(
1249                "dataset '{}': no *.parquet files found in {loc}",
1250                self.name
1251            )));
1252        }
1253        Ok(files)
1254    }
1255
1256    /// Estimate the on-disk byte size of this dataset's local backing
1257    /// files. Returns `None` for S3 sources (sizing would require a
1258    /// network round-trip) or when nothing can be measured.
1259    ///
1260    /// * `parquet` sums the resolved `.parquet` files (single file,
1261    ///   directory, or glob).
1262    /// * `delta` sums every `*.parquet` data file under the table root.
1263    ///   This slightly over-counts when stale files haven't been vacuumed,
1264    ///   which is fine for a coarse force-lazy threshold.
1265    pub fn estimate_local_bytes(&self) -> Option<u64> {
1266        if self.source.is_s3() {
1267            return None;
1268        }
1269        match self.source.kind {
1270            SourceKind::Parquet => {
1271                let files = self.resolve_local_parquet_files().ok()?;
1272                Some(
1273                    files
1274                        .iter()
1275                        .filter_map(|p| std::fs::metadata(p).ok())
1276                        .map(|m| m.len())
1277                        .sum(),
1278                )
1279            }
1280            SourceKind::Delta => {
1281                let root = self.source.location.trim_end_matches('/');
1282                let pattern = format!("{root}/**/*.parquet");
1283                let paths = glob::glob(&pattern).ok()?;
1284                Some(
1285                    paths
1286                        .filter_map(Result::ok)
1287                        .filter_map(|p| std::fs::metadata(&p).ok())
1288                        .filter(|m| m.is_file())
1289                        .map(|m| m.len())
1290                        .sum(),
1291                )
1292            }
1293        }
1294    }
1295
1296    /// Decide whether this dataset should be forced into lazy mode given
1297    /// the server's `force_lazy_above_mb` threshold. Returns `Some(bytes)`
1298    /// (the measured size) when it should be forced, so the caller can log
1299    /// it. Returns `None` when the dataset is already lazy, the threshold
1300    /// is disabled, the source is S3, or the measured size is unknown or at
1301    /// or below the threshold.
1302    pub fn force_lazy_bytes(&self, server: &ServerConfig) -> Option<u64> {
1303        if self.lazy || server.force_lazy_above_mb == 0 {
1304            return None;
1305        }
1306        let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1307        match self.estimate_local_bytes() {
1308            Some(bytes) if bytes > threshold => Some(bytes),
1309            _ => None,
1310        }
1311    }
1312
1313    /// Env-var prefix derived from the dataset name: uppercase with
1314    /// non-alphanumeric chars replaced by `_`. E.g. `sales.eu-1` →
1315    /// `SALES_EU_1`.
1316    pub fn env_prefix(&self) -> String {
1317        self.name
1318            .chars()
1319            .map(|c| {
1320                if c.is_ascii_alphanumeric() {
1321                    c.to_ascii_uppercase()
1322                } else {
1323                    '_'
1324                }
1325            })
1326            .collect()
1327    }
1328
1329    /// Resolve S3 credentials following the precedence chain documented at
1330    /// the top of this module. Returns an empty struct when nothing was
1331    /// found — the caller should then leave credential resolution to the
1332    /// engine's default provider chain.
1333    pub fn resolved_creds(&self) -> ResolvedCreds {
1334        let prefix = self.env_prefix();
1335        let from_env = |suffix: &str| {
1336            std::env::var(format!("{prefix}_{suffix}"))
1337                .ok()
1338                .filter(|s| !s.is_empty())
1339        };
1340        let inline = self.s3.as_ref();
1341        let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1342
1343        ResolvedCreds {
1344            access_key_id: from_env("AWS_ACCESS_KEY_ID")
1345                .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1346                .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1347            secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1348                .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1349                .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1350            session_token: from_env("AWS_SESSION_TOKEN")
1351                .or_else(|| inline.and_then(|s| s.session_token.clone()))
1352                .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1353        }
1354    }
1355
1356    /// Resolved S3 region: per-dataset env (`${PREFIX}_AWS_REGION`)
1357    /// → inline → `AWS_REGION` → `AWS_DEFAULT_REGION` → `us-east-1`.
1358    pub fn resolved_region(&self) -> String {
1359        let prefix = self.env_prefix();
1360        std::env::var(format!("{prefix}_AWS_REGION"))
1361            .ok()
1362            .filter(|s| !s.is_empty())
1363            .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1364            .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1365            .or_else(|| {
1366                std::env::var("AWS_DEFAULT_REGION")
1367                    .ok()
1368                    .filter(|s| !s.is_empty())
1369            })
1370            .unwrap_or_else(|| "us-east-1".to_string())
1371    }
1372}
1373
1374#[cfg(test)]
1375mod tests {
1376    use super::*;
1377
1378    #[test]
1379    fn server_defaults() {
1380        let s = ServerConfig::default();
1381        assert_eq!(s.backend, Backend::Datafusion);
1382        assert_eq!(s.port, 8080);
1383        assert!(s.compress);
1384        assert_eq!(s.max_body_bytes, 1024 * 1024);
1385        assert_eq!(s.max_page_size, 100_000);
1386        assert_eq!(s.force_lazy_above_mb, 0);
1387        assert_eq!(s.request_timeout_ms, 30_000);
1388        assert!(!s.quack.enabled);
1389        assert_eq!(s.quack.uri, "quack:localhost");
1390        assert!(s.quack.token.is_none());
1391        assert!(!s.quack.allow_other_hostname);
1392        assert!(s.quack.read_only);
1393        assert_eq!(s.prefix, "");
1394        assert!(s.listen.is_loopback());
1395    }
1396
1397    #[test]
1398    fn server_overrides_from_toml() {
1399        let toml = r#"
1400            [server]
1401            backend = "duckdb"
1402            port = 9000
1403            prefix = "/datapress"
1404            compress = false
1405            max_body_bytes = 4096
1406            max_page_size = 50000
1407            force_lazy_above_mb = 256
1408            request_timeout_ms = 0
1409
1410            [server.quack]
1411            enabled = true
1412            uri = "quack:localhost:9495"
1413            token = "test-token"
1414            read_only = false
1415            [[dataset]]
1416            name = "x"
1417            source.kind = "parquet"
1418            source.location = "/tmp/missing.parquet"
1419        "#;
1420        let cfg: AppConfig = toml::from_str(toml).unwrap();
1421        assert_eq!(cfg.server.backend, Backend::Duckdb);
1422        assert_eq!(cfg.server.port, 9000);
1423        assert_eq!(cfg.server.prefix, "/datapress");
1424        assert!(!cfg.server.compress);
1425        assert_eq!(cfg.server.max_body_bytes, 4096);
1426        assert_eq!(cfg.server.max_page_size, 50_000);
1427        assert_eq!(cfg.server.force_lazy_above_mb, 256);
1428        assert_eq!(cfg.server.request_timeout_ms, 0);
1429        assert!(cfg.server.quack.enabled);
1430        assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1431        assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1432        assert!(!cfg.server.quack.read_only);
1433        assert_eq!(cfg.datasets.len(), 1);
1434        assert_eq!(cfg.datasets[0].name, "x");
1435        assert!(cfg.datasets[0].dict_encode); // default
1436    }
1437
1438    #[test]
1439    fn force_lazy_bytes_logic() {
1440        // A unique temp dir with one 2 MiB "parquet" file (contents are
1441        // irrelevant — estimate_local_bytes only stats file lengths).
1442        let dir = std::env::temp_dir().join(format!(
1443            "dp-force-lazy-{}-{}",
1444            std::process::id(),
1445            std::time::SystemTime::now()
1446                .duration_since(std::time::UNIX_EPOCH)
1447                .unwrap()
1448                .as_nanos()
1449        ));
1450        std::fs::create_dir_all(&dir).unwrap();
1451        let two_mib = 2 * 1024 * 1024;
1452        let file = dir.join("data.parquet");
1453        std::fs::write(&file, vec![0u8; two_mib]).unwrap();
1454
1455        let mk = |kind: SourceKind, location: &str, lazy: bool| DatasetConfig {
1456            name: "t".into(),
1457            source: SourceConfig {
1458                kind,
1459                location: location.to_string(),
1460            },
1461            s3: None,
1462            index: IndexConfig::default(),
1463            columns: vec![],
1464            dict_encode: true,
1465            lazy,
1466        };
1467        let server = |mb: u64| ServerConfig {
1468            force_lazy_above_mb: mb,
1469            ..ServerConfig::default()
1470        };
1471
1472        // Sizing: single file, directory walk, and delta dir walk all see 2 MiB.
1473        let file_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), false);
1474        assert_eq!(file_ds.estimate_local_bytes(), Some(two_mib as u64));
1475        let dir_ds = mk(SourceKind::Parquet, dir.to_str().unwrap(), false);
1476        assert_eq!(dir_ds.estimate_local_bytes(), Some(two_mib as u64));
1477        let delta_ds = mk(SourceKind::Delta, dir.to_str().unwrap(), false);
1478        assert_eq!(delta_ds.estimate_local_bytes(), Some(two_mib as u64));
1479
1480        // Threshold disabled → never force.
1481        assert_eq!(file_ds.force_lazy_bytes(&server(0)), None);
1482        // 1 MiB threshold, 2 MiB file → forced (returns measured size).
1483        assert_eq!(file_ds.force_lazy_bytes(&server(1)), Some(two_mib as u64));
1484        // 4 MiB threshold → not forced.
1485        assert_eq!(file_ds.force_lazy_bytes(&server(4)), None);
1486
1487        // Already-lazy datasets are never re-flagged.
1488        let lazy_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), true);
1489        assert_eq!(lazy_ds.force_lazy_bytes(&server(1)), None);
1490
1491        // S3 sources can't be measured → never auto-forced.
1492        let s3_ds = mk(SourceKind::Parquet, "s3://bucket/data.parquet", false);
1493        assert_eq!(s3_ds.estimate_local_bytes(), None);
1494        assert_eq!(s3_ds.force_lazy_bytes(&server(1)), None);
1495
1496        std::fs::remove_dir_all(&dir).ok();
1497    }
1498
1499    #[test]
1500    fn validate_rejects_bad_prefix() {
1501        let bad = ["no-leading-slash", "/trailing/"];
1502        for p in bad {
1503            let cfg = AppConfig {
1504                server: ServerConfig {
1505                    prefix: p.to_string(),
1506                    ..Default::default()
1507                },
1508                docs: DocsConfig::default(),
1509                swagger: SwaggerConfig::default(),
1510                metrics: MetricsConfig::default(),
1511                explorer: ExplorerConfig::default(),
1512                sql: SqlConfig::default(),
1513                datafusion: DataFusionConfig::default(),
1514                auth: AuthConfig::default(),
1515                datasets: vec![],
1516            };
1517            assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1518        }
1519    }
1520
1521    #[test]
1522    fn normalize_lowercases_configured_scopes() {
1523        let mut cfg = AppConfig {
1524            server: ServerConfig::default(),
1525            docs: DocsConfig::default(),
1526            swagger: SwaggerConfig::default(),
1527            metrics: MetricsConfig::default(),
1528            explorer: ExplorerConfig::default(),
1529            sql: SqlConfig::default(),
1530            datafusion: DataFusionConfig::default(),
1531            auth: AuthConfig {
1532                read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1533                reload_scopes: vec!["Datasets:Reload".into()],
1534                ..Default::default()
1535            },
1536            datasets: vec![],
1537        };
1538        cfg.normalize();
1539        assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1540        assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1541    }
1542
1543    #[test]
1544    fn validate_rejects_no_datasets() {
1545        let cfg = AppConfig {
1546            server: ServerConfig::default(),
1547            docs: DocsConfig::default(),
1548            swagger: SwaggerConfig::default(),
1549            metrics: MetricsConfig::default(),
1550            explorer: ExplorerConfig::default(),
1551            sql: SqlConfig::default(),
1552            datafusion: DataFusionConfig::default(),
1553            auth: AuthConfig::default(),
1554            datasets: vec![],
1555        };
1556        let err = cfg.validate().unwrap_err();
1557        assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1558    }
1559
1560    #[cfg(feature = "auth")]
1561    #[test]
1562    fn validate_accepts_auth_issuer_with_trailing_slash() {
1563        use std::io::Write;
1564
1565        let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1566        let _ = std::fs::remove_dir_all(&dir);
1567        std::fs::create_dir_all(&dir).unwrap();
1568        let file = dir.join("a.parquet");
1569        std::fs::File::create(&file)
1570            .unwrap()
1571            .write_all(b"x")
1572            .unwrap();
1573
1574        let cfg = AppConfig {
1575            server: ServerConfig::default(),
1576            docs: DocsConfig::default(),
1577            swagger: SwaggerConfig::default(),
1578            metrics: MetricsConfig::default(),
1579            explorer: ExplorerConfig::default(),
1580            sql: SqlConfig::default(),
1581            datafusion: DataFusionConfig::default(),
1582            auth: AuthConfig {
1583                enabled: true,
1584                issuer: "https://tenant.example.com/".into(),
1585                ..Default::default()
1586            },
1587            datasets: vec![DatasetConfig {
1588                name: "x".into(),
1589                source: SourceConfig {
1590                    kind: SourceKind::Parquet,
1591                    location: file.to_string_lossy().into_owned(),
1592                },
1593                s3: None,
1594                index: IndexConfig::default(),
1595                columns: vec![],
1596                dict_encode: true,
1597                lazy: false,
1598            }],
1599        };
1600
1601        assert!(cfg.validate().is_ok());
1602        let _ = std::fs::remove_dir_all(&dir);
1603    }
1604
1605    #[test]
1606    fn validate_rejects_quack_non_local_host_without_override() {
1607        let cfg = AppConfig {
1608            server: ServerConfig {
1609                quack: QuackConfig {
1610                    enabled: true,
1611                    uri: "quack:127.0.0.1".into(),
1612                    token: Some("test-token".into()),
1613                    ..Default::default()
1614                },
1615                ..Default::default()
1616            },
1617            docs: DocsConfig::default(),
1618            swagger: SwaggerConfig::default(),
1619            metrics: MetricsConfig::default(),
1620            explorer: ExplorerConfig::default(),
1621            sql: SqlConfig::default(),
1622            datafusion: DataFusionConfig::default(),
1623            auth: AuthConfig::default(),
1624            datasets: vec![DatasetConfig {
1625                name: "x".into(),
1626                source: SourceConfig {
1627                    kind: SourceKind::Parquet,
1628                    location: "/tmp/missing.parquet".into(),
1629                },
1630                s3: None,
1631                index: IndexConfig::default(),
1632                columns: vec![],
1633                dict_encode: true,
1634                lazy: false,
1635            }],
1636        };
1637        let err = cfg.validate().unwrap_err();
1638        assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1639    }
1640
1641    #[test]
1642    fn validate_rejects_bad_dataset_name() {
1643        let cfg: AppConfig = toml::from_str(
1644            r#"
1645            [[dataset]]
1646            name = "bad name!"
1647            source.kind = "parquet"
1648            source.location = "/tmp/whatever"
1649        "#,
1650        )
1651        .unwrap();
1652        let err = cfg.validate().unwrap_err();
1653        assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1654    }
1655
1656    #[test]
1657    fn validate_rejects_duplicate_names() {
1658        use std::io::Write;
1659        let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1660        let _ = std::fs::remove_dir_all(&dir);
1661        std::fs::create_dir_all(&dir).unwrap();
1662        let f = dir.join("a.parquet");
1663        std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1664        let path = f.to_str().unwrap();
1665
1666        let cfg: AppConfig = toml::from_str(&format!(
1667            r#"
1668            [[dataset]]
1669            name = "a"
1670            source.kind = "parquet"
1671            source.location = "{path}"
1672            [[dataset]]
1673            name = "a"
1674            source.kind = "parquet"
1675            source.location = "{path}"
1676        "#
1677        ))
1678        .unwrap();
1679        let err = cfg.validate().expect_err("expected error");
1680        assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1681
1682        let _ = std::fs::remove_dir_all(&dir);
1683    }
1684
1685    #[test]
1686    fn s3_bucket_parsing() {
1687        let mk = |loc: &str| SourceConfig {
1688            kind: SourceKind::Parquet,
1689            location: loc.into(),
1690        };
1691        let s1 = mk("s3://bucket/path/key");
1692        assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1693        let s2 = mk("s3://only-bucket");
1694        assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1695        assert!(mk("s3:///nokey").s3_bucket().is_err());
1696        assert!(mk("/local/path").s3_bucket().is_err());
1697    }
1698
1699    #[test]
1700    fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1701        let mk = |loc: &str| SourceConfig {
1702            kind: SourceKind::Parquet,
1703            location: loc.into(),
1704        };
1705        // Plain prefix -> recursive parquet glob (trailing slash trimmed).
1706        assert_eq!(
1707            mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1708            "s3://bucket/logs/**/*.parquet"
1709        );
1710        assert_eq!(
1711            mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1712            "s3://bucket/logs/**/*.parquet"
1713        );
1714        // Already globbed -> unchanged.
1715        assert_eq!(
1716            mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1717            "s3://bucket/logs/*.parquet"
1718        );
1719        // Non-S3 -> unchanged.
1720        assert_eq!(
1721            mk("/local/logs").s3_recursive_parquet_glob(),
1722            "/local/logs"
1723        );
1724    }
1725
1726    #[test]
1727    fn effective_endpoint_folds_bucket_per_mode() {
1728        let virt = S3Config {
1729            endpoint: Some("https://s3.example.com".into()),
1730            addressing_style: AddressingStyle::Virtual,
1731            ..Default::default()
1732        };
1733        // Auto + virtual -> bucket folded into host.
1734        assert_eq!(
1735            virt.effective_endpoint("mybucket").as_deref(),
1736            Some("https://mybucket.s3.example.com")
1737        );
1738        // Idempotent: already-prefixed host is left alone.
1739        let prefixed = S3Config {
1740            endpoint: Some("https://mybucket.s3.example.com".into()),
1741            ..virt.clone()
1742        };
1743        assert_eq!(
1744            prefixed.effective_endpoint("mybucket").as_deref(),
1745            Some("https://mybucket.s3.example.com")
1746        );
1747        // Path style (auto) -> host untouched.
1748        let path = S3Config {
1749            addressing_style: AddressingStyle::Path,
1750            ..virt.clone()
1751        };
1752        assert_eq!(
1753            path.effective_endpoint("mybucket").as_deref(),
1754            Some("https://s3.example.com")
1755        );
1756        // Explicit overrides win over addressing style.
1757        let forced_off = S3Config {
1758            endpoint_bucket_in_host: BucketInHost::False,
1759            ..virt.clone()
1760        };
1761        assert_eq!(
1762            forced_off.effective_endpoint("mybucket").as_deref(),
1763            Some("https://s3.example.com")
1764        );
1765        let forced_on = S3Config {
1766            endpoint_bucket_in_host: BucketInHost::True,
1767            ..path.clone()
1768        };
1769        assert_eq!(
1770            forced_on.effective_endpoint("mybucket").as_deref(),
1771            Some("https://mybucket.s3.example.com")
1772        );
1773        // No endpoint -> None.
1774        assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1775    }
1776
1777    #[test]
1778    fn env_prefix_sanitises_name() {
1779        let mk = |name: &str| DatasetConfig {
1780            name: name.into(),
1781            source: SourceConfig {
1782                kind: SourceKind::Parquet,
1783                location: "x".into(),
1784            },
1785            s3: None,
1786            index: IndexConfig::default(),
1787            columns: vec![],
1788            dict_encode: true,
1789            lazy: false,
1790        };
1791        assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1792        assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1793        assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1794    }
1795
1796    #[test]
1797    fn resolve_local_parquet_single_file_and_dir() {
1798        use std::io::Write;
1799        let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1800        let _ = std::fs::remove_dir_all(&dir);
1801        std::fs::create_dir_all(&dir).unwrap();
1802        let f = dir.join("a.parquet");
1803        let mut fh = std::fs::File::create(&f).unwrap();
1804        fh.write_all(b"not really parquet").unwrap();
1805
1806        let mk = |loc: &str| DatasetConfig {
1807            name: "ds".into(),
1808            source: SourceConfig {
1809                kind: SourceKind::Parquet,
1810                location: loc.into(),
1811            },
1812            s3: None,
1813            index: IndexConfig::default(),
1814            columns: vec![],
1815            dict_encode: true,
1816            lazy: false,
1817        };
1818
1819        // Direct file.
1820        let files = mk(f.to_str().unwrap())
1821            .resolve_local_parquet_files()
1822            .unwrap();
1823        assert_eq!(files, vec![f.clone()]);
1824
1825        // Directory.
1826        let files = mk(dir.to_str().unwrap())
1827            .resolve_local_parquet_files()
1828            .unwrap();
1829        assert_eq!(files, vec![f.clone()]);
1830
1831        // Missing path.
1832        assert!(
1833            mk("/no/such/place.parquet")
1834                .resolve_local_parquet_files()
1835                .is_err()
1836        );
1837
1838        let _ = std::fs::remove_dir_all(&dir);
1839    }
1840}