Skip to main content

datapress_core/
config.rs

1//! Runtime configuration loaded from `datasets.toml`.
2//!
3//! Each instance binds to a list of datasets. A dataset's `[dataset.source]`
4//! block selects the format (`parquet` or `delta`) and the location (a
5//! local path or an `s3://bucket/key` URL). When the location is on S3,
6//! an optional `[dataset.s3]` block carries non-secret connection details
7//! (region, endpoint, addressing style, …).
8//!
9//! Credentials are resolved at runtime via [`DatasetConfig::resolved_creds`]
10//! in this precedence order:
11//!
12//! 1. Per-dataset env vars `${PREFIX}_AWS_ACCESS_KEY_ID`,
13//!    `${PREFIX}_AWS_SECRET_ACCESS_KEY`, `${PREFIX}_AWS_SESSION_TOKEN`
14//!    where `${PREFIX}` is the dataset name uppercased with non-alphanumeric
15//!    characters replaced by `_` (e.g. `accidents` → `ACCIDENTS`,
16//!    `sales.eu-1` → `SALES_EU_1`).
17//! 2. Inline `access_key_id` / `secret_access_key` / `session_token` in the
18//!    `[dataset.s3]` block.
19//! 3. Plain `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` /
20//!    `AWS_SESSION_TOKEN`.
21//! 4. None — fall back to the engine's own provider chain
22//!    (`~/.aws/credentials`, IMDS, …).
23
24use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32/// Mount paths the user MUST NOT pick for `[docs].path` or
33/// `[swagger].path` — they would shadow first-party routes (probes,
34/// API scopes, root).
35const RESERVED_MOUNTS: &[&str] = &[
36    "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39// ---------------------------------------------------------------------------
40// Public types
41// ---------------------------------------------------------------------------
42
43#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45    #[serde(default)]
46    pub server: ServerConfig,
47    #[serde(default)]
48    pub docs: DocsConfig,
49    #[serde(default)]
50    pub swagger: SwaggerConfig,
51    #[serde(default)]
52    pub metrics: MetricsConfig,
53    #[serde(default)]
54    pub auth: AuthConfig,
55    #[serde(rename = "dataset", default)]
56    pub datasets: Vec<DatasetConfig>,
57}
58
59#[derive(Debug, Deserialize)]
60#[serde(default)]
61pub struct ServerConfig {
62    /// Which engine to run. Must match the binary's compile-time feature.
63    pub backend: Backend,
64    /// Listen address. Defaults to loopback (127.0.0.1) — explicitly opt in
65    /// to 0.0.0.0 if you want to expose the port.
66    pub listen: IpAddr,
67    /// TCP port.
68    pub port: u16,
69    /// Number of actix worker threads. `None` (= unset) → one per CPU.
70    pub workers: Option<usize>,
71    /// Optional URL path prefix — useful when sitting behind a reverse
72    /// proxy that rewrites e.g. `/datapress/...` → `/...`. When set, every
73    /// route is mounted under this prefix (so the proxy can pass the URL
74    /// through unchanged). Must start with `/` and not end with `/`; the
75    /// empty string (default) means no prefix.
76    pub prefix: String,
77    /// Negotiate response compression (gzip / brotli / zstd) via the
78    /// `Accept-Encoding` request header. Enabled by default. Disable when
79    /// running behind a proxy that already compresses, or when the extra
80    /// CPU is not worth the bandwidth saving.
81    pub compress: bool,
82    /// Maximum accepted JSON request body size, in bytes. Larger bodies
83    /// are rejected with `413 Payload Too Large` before any handler runs.
84    /// Default `1 MiB`. Most query bodies are well under 10 KiB; this is
85    /// a DoS guard, not a tuning knob.
86    pub max_body_bytes: usize,
87    /// Maximum rows returned by a single `/query` page. Larger
88    /// `page_size` values are clamped before the backend runs.
89    /// Default `100_000`.
90    pub max_page_size: u64,
91    /// Per-request handler timeout, in milliseconds. If a handler hasn't
92    /// produced a response within this budget the request is aborted with
93    /// `504 Gateway Timeout`. Default `30_000` (30 s). Set `0` to disable.
94    pub request_timeout_ms: u64,
95    /// Grace period for in-flight requests after the server has received
96    /// `SIGTERM` / `SIGINT`, in seconds. The listening socket is closed
97    /// immediately; existing connections then have up to this many
98    /// seconds to finish before workers are force-stopped. Default `30`.
99    pub shutdown_timeout_secs: u64,
100    /// Optional DuckDB Quack remote SQL server. Only used by the DuckDB
101    /// backend; ignored by DataFusion.
102    pub quack: QuackConfig,
103}
104
105impl Default for ServerConfig {
106    fn default() -> Self {
107        Self {
108            backend: Backend::default(),
109            listen: IpAddr::from([127, 0, 0, 1]),
110            port: 8080,
111            workers: None,
112            prefix: String::new(),
113            compress: true,
114            max_body_bytes: 1024 * 1024,
115            max_page_size: 100_000,
116            request_timeout_ms: 30_000,
117            shutdown_timeout_secs: 30,
118            quack: QuackConfig::default(),
119        }
120    }
121}
122
123/// Experimental DuckDB Quack remote protocol server.
124///
125/// Quack exposes the DuckDB SQL surface of the in-process database. Keep it
126/// disabled unless you intentionally want DuckDB clients to attach/query this
127/// process directly.
128#[derive(Debug, Clone, Deserialize)]
129#[serde(default)]
130pub struct QuackConfig {
131    /// Install/load the Quack extension and start `quack_serve` after
132    /// datasets are registered.
133    pub enabled: bool,
134    /// Quack URI to listen on. `quack:localhost` uses DuckDB's default
135    /// port 9494.
136    pub uri: String,
137    /// Optional explicit authentication token. If omitted, Quack generates
138    /// one at startup and DataPress logs it once.
139    pub token: Option<String>,
140    /// Allow binding a non-local hostname such as `quack:0.0.0.0:9494`.
141    /// For external exposure, put a TLS-terminating reverse proxy in front.
142    pub allow_other_hostname: bool,
143    /// Install a read-only authorization macro for remote queries. Enabled
144    /// by default to match DataPress' read-oriented HTTP API.
145    pub read_only: bool,
146}
147
148impl Default for QuackConfig {
149    fn default() -> Self {
150        Self {
151            enabled: false,
152            uri: "quack:localhost".into(),
153            token: None,
154            allow_other_hostname: false,
155            read_only: true,
156        }
157    }
158}
159
160impl QuackConfig {
161    /// Validate the enabled Quack configuration against DuckDB's current
162    /// safety rules. The extension treats only the literal `localhost` as
163    /// local unless `allow_other_hostname` is set.
164    pub fn validate_enabled(&self) -> Result<(), AppError> {
165        if self.uri.trim().is_empty() {
166            return Err(AppError::Internal(
167                "server.quack.uri must not be empty when server.quack.enabled = true".into(),
168            ));
169        }
170        if !self.uri.starts_with("quack:") {
171            return Err(AppError::Internal(format!(
172                "server.quack.uri must start with 'quack:' (got '{}')",
173                self.uri
174            )));
175        }
176        if !self.allow_other_hostname {
177            let host = self.hostname().unwrap_or_default();
178            if host != "localhost" {
179                return Err(AppError::Internal(format!(
180                    "server.quack.uri host must be 'localhost' unless \
181                     server.quack.allow_other_hostname = true (got '{}')",
182                    self.uri
183                )));
184            }
185        }
186        if let Some(token) = self.token.as_deref()
187            && token.len() < 4
188        {
189            return Err(AppError::Internal(
190                "server.quack.token must be at least 4 characters".into(),
191            ));
192        }
193        Ok(())
194    }
195
196    fn hostname(&self) -> Option<&str> {
197        let rest = self.uri.strip_prefix("quack:")?;
198        let rest = rest.strip_prefix("//").unwrap_or(rest);
199        let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
200        (!host.is_empty()).then_some(host)
201    }
202}
203
204#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
205#[serde(rename_all = "lowercase")]
206pub enum Backend {
207    #[default]
208    Datafusion,
209    Duckdb,
210}
211
212/// Embedded MkDocs documentation site (`[docs]` block).
213///
214/// Enabled by default — when the binary was built with the `docs`
215/// cargo feature, the site is served at [`DocsConfig::path`] out of
216/// the box. Set `enabled = false` in `datasets.toml` to suppress it
217/// (e.g. in prod). When the binary was built without the feature,
218/// `enabled = true` is harmless: the server logs a warning at startup
219/// and skips the mount. The mount path must be a non-trivial sub-path;
220/// reserved API and probe roots are rejected at startup.
221#[derive(Debug, Clone, Deserialize)]
222#[serde(default, deny_unknown_fields)]
223pub struct DocsConfig {
224    pub enabled: bool,
225    pub path: String,
226}
227
228impl Default for DocsConfig {
229    fn default() -> Self {
230        Self {
231            enabled: true,
232            path: "/mkdocs".into(),
233        }
234    }
235}
236
237/// Swagger UI + embedded OpenAPI spec (`[swagger]` block).
238///
239/// Enabled by default — when the binary was built with the `swagger`
240/// cargo feature, an interactive Swagger UI is served at
241/// [`SwaggerConfig::path`] (default `/docs`) and the raw OpenAPI JSON
242/// at `<path>/openapi.json`. Set `enabled = false` in `datasets.toml`
243/// to suppress it (e.g. in prod). When the binary was built without
244/// the feature, `enabled = true` is harmless: the server logs a
245/// warning at startup and skips the mount.
246///
247/// To let users sign in to the UI itself (Authorization Code + PKCE
248/// against any OIDC provider), populate the optional `[swagger.oauth2]`
249/// sub-block. Acquired tokens are attached as `Authorization: Bearer …`
250/// to every "Try it out" request — useful for exercising auth-protected
251/// endpoints from the docs page. This drives the UI only; it does not
252/// turn on server-side token validation.
253#[derive(Debug, Clone, Deserialize)]
254#[serde(default, deny_unknown_fields)]
255pub struct SwaggerConfig {
256    pub enabled: bool,
257    pub path: String,
258    pub oauth2: Option<SwaggerOAuth2Config>,
259}
260
261impl Default for SwaggerConfig {
262    fn default() -> Self {
263        Self {
264            enabled: true,
265            path: "/docs".into(),
266            oauth2: None,
267        }
268    }
269}
270
271/// OIDC single-sign-on for the Swagger UI (`[swagger.oauth2]`).
272///
273/// Configures the UI to drive an Authorization Code + PKCE flow against
274/// the given OIDC issuer. Swagger UI auto-discovers the authorize /
275/// token endpoints from `<issuer>/.well-known/openid-configuration`,
276/// so we don't need to pin them here.
277///
278/// All fields are required when the block is present — there is no
279/// sensible default for `issuer` or `client_id`.
280#[derive(Debug, Clone, Deserialize)]
281#[serde(deny_unknown_fields)]
282pub struct SwaggerOAuth2Config {
283    /// OIDC issuer URL, e.g.
284    /// `https://login.microsoftonline.com/<tenant>/v2.0` or
285    /// `https://accounts.google.com`. Must not end in `/`.
286    pub issuer: String,
287    /// Public OAuth2 client identifier registered with the IdP. The
288    /// client must be a SPA / public client (no secret) with
289    /// `https://<your-host>{swagger.path}/oauth2-redirect.html` listed
290    /// as an allowed redirect URI.
291    pub client_id: String,
292    /// Scopes to request by default. Will be pre-checked in the Swagger
293    /// UI authorize dialog; users can edit them before signing in.
294    /// `openid` is always added if missing.
295    #[serde(default)]
296    pub scopes: Vec<String>,
297    /// Use PKCE for the authorization code flow. Defaults to `true`;
298    /// disable only if your IdP doesn't support PKCE for public clients.
299    #[serde(default = "default_true")]
300    pub pkce: bool,
301}
302
303/// Prometheus metrics endpoint (`[metrics]` block).
304///
305/// Disabled by default. When `enabled = true` (and the binary was built
306/// with the `metrics` cargo feature), the server installs a middleware
307/// that records per-request HTTP counters and latency histograms, and
308/// exposes them in the Prometheus text exposition format at
309/// [`MetricsConfig::path`] (default `/metrics`).
310///
311/// The endpoint is mounted at a fixed, *unprefixed* path — like the
312/// health probes — so a scrape config doesn't need to know about any
313/// reverse-proxy `server.prefix`. It is **not** behind the `[auth]`
314/// layer: Prometheus scrapers rarely carry bearer tokens, and the
315/// endpoint exposes only aggregate request metrics (no row data). Keep
316/// it on a network the scraper can reach but the public cannot, e.g. by
317/// binding `server.listen` to a private interface.
318///
319/// When the binary was built without the `metrics` feature,
320/// `enabled = true` is harmless: the server logs a warning at startup
321/// and skips the endpoint.
322#[derive(Debug, Clone, Deserialize)]
323#[serde(default, deny_unknown_fields)]
324pub struct MetricsConfig {
325    pub enabled: bool,
326    pub path: String,
327}
328
329impl Default for MetricsConfig {
330    fn default() -> Self {
331        Self {
332            enabled: false,
333            path: "/metrics".into(),
334        }
335    }
336}
337
338/// OIDC bearer-token enforcement for the HTTP API (`[auth]` block).
339///
340/// Disabled by default. When `enabled = true`, the server validates
341/// every request's `Authorization: Bearer …` JWT against the JWKS
342/// discovered from the issuer's OIDC metadata
343/// (`<issuer>/.well-known/openid-configuration` → `jwks_uri`), then
344/// enforces the configured scope requirements per route.
345///
346/// Only compiled in when the binary was built with the `auth` cargo
347/// feature. Without the feature, `enabled = true` is rejected at
348/// startup so a misconfigured production deployment can't silently
349/// fall back to "no auth".
350///
351/// The Swagger UI's SSO support (`[swagger.oauth2]`) is *independent*
352/// of this block — `[swagger.oauth2]` only drives the UI's login
353/// dialog; `[auth]` is what enforces tokens on the API.
354#[derive(Debug, Clone, Deserialize)]
355#[serde(default, deny_unknown_fields)]
356pub struct AuthConfig {
357    /// Master switch. `false` (default) skips all auth processing.
358    pub enabled: bool,
359    /// OIDC issuer URL — must match the `iss` claim of every accepted
360    /// token. Required when `enabled = true`.
361    pub issuer: String,
362    /// Expected `aud` claim. When empty, audience validation is
363    /// skipped (not recommended in production).
364    pub audience: String,
365    /// Scopes a caller must hold to read datasets (GET endpoints +
366    /// POST `…/query` and `…/count`). Empty list means "no scope check,
367    /// just a valid token is enough".
368    pub read_scopes: Vec<String>,
369    /// Scopes required for admin/mutation endpoints (POST `…/reload`).
370    /// Empty list means "no scope check, just a valid token is enough".
371    pub reload_scopes: Vec<String>,
372    /// Allow unauthenticated GETs through. Useful for public datasets
373    /// and demo deployments. Defaults to `false`.
374    pub anonymous_read: bool,
375    /// Continue serving even if the JWKS fetch fails at startup.
376    /// When `true` (default), the server starts in a degraded mode that
377    /// rejects every auth'd request with 503 until JWKS becomes
378    /// reachable. When `false`, startup fails outright.
379    pub start_degraded: bool,
380    /// Allowed signing algorithms. Pinned to RS256 by default; never
381    /// include `HS*` or `none` here unless you really know what you're
382    /// doing.
383    pub algorithms: Vec<String>,
384    /// Clock-skew leeway for `exp`/`nbf` checks, in seconds.
385    pub leeway_secs: u64,
386    /// How often (in seconds) the background refresher re-fetches the
387    /// JWKS. On a `kid` cache miss the JWKS is also refreshed
388    /// out-of-band.
389    pub jwks_refresh_secs: u64,
390    /// Optional JSON-pointer into the JWT claims that extracts a
391    /// tenant identifier — attached to the principal and logged on
392    /// every request. Example: `"/tid"` (Azure AD), `"/org_id"`.
393    /// When empty, no tenant is extracted.
394    pub tenant_claim: String,
395    /// If non-empty, requests whose extracted tenant ID is not in this
396    /// list are rejected with 403. Has no effect when `tenant_claim`
397    /// is empty.
398    pub allowed_tenants: Vec<String>,
399    /// If `true`, `POST …/reload` accepts *either* a valid token with
400    /// `reload_scopes` *or* the legacy `X-Admin-Token` header. Defaults
401    /// to `true` for one-release backwards compatibility — flip to
402    /// `false` once your automation has migrated to OIDC.
403    pub admin_token_fallback: bool,
404}
405
406impl Default for AuthConfig {
407    fn default() -> Self {
408        Self {
409            enabled: false,
410            issuer: String::new(),
411            audience: String::new(),
412            read_scopes: Vec::new(),
413            reload_scopes: Vec::new(),
414            anonymous_read: false,
415            start_degraded: true,
416            algorithms: vec!["RS256".into()],
417            leeway_secs: 60,
418            jwks_refresh_secs: 3600,
419            tenant_claim: String::new(),
420            allowed_tenants: Vec::new(),
421            admin_token_fallback: true,
422        }
423    }
424}
425
426impl Backend {
427    pub fn as_str(self) -> &'static str {
428        match self {
429            Backend::Datafusion => "datafusion",
430            Backend::Duckdb => "duckdb",
431        }
432    }
433}
434
435#[derive(Debug, Clone, Deserialize)]
436pub struct DatasetConfig {
437    pub name: String,
438    pub source: SourceConfig,
439    #[serde(default)]
440    pub s3: Option<S3Config>,
441    #[serde(default)]
442    pub index: IndexConfig,
443    /// Optional column projection applied at load time. When non-empty,
444    /// only the listed columns are read from the parquet/delta source —
445    /// every other column is skipped entirely (no decode, no allocation,
446    /// no resident memory). Empty (default) = read all columns. Names are
447    /// matched case-insensitively against the source schema.
448    #[serde(default)]
449    pub columns: Vec<String>,
450    /// When `true` (default), Utf8 columns that are dictionary-encoded in
451    /// the source parquet are read as Arrow `Dictionary(Int32, Utf8)`
452    /// instead of being expanded to plain Utf8. Massively cheaper in RAM
453    /// for low-cardinality columns. Set to `false` to bypass the override
454    /// — useful as a workaround if you observe null-handling oddities on
455    /// a particular parquet file.
456    #[serde(default = "default_true")]
457    pub dict_encode: bool,
458    /// When `true`, the backend should keep the dataset on disk and stream
459    /// it at query time instead of materialising it into RAM at startup.
460    /// Trades the in-memory hot paths (raw Arrow slice, equality index)
461    /// for bounded memory use on large / multi-file sources. Currently
462    /// honoured by the DataFusion backend for local parquet.
463    #[serde(default)]
464    pub lazy: bool,
465}
466
467fn default_true() -> bool {
468    true
469}
470
471#[derive(Debug, Clone, Deserialize)]
472pub struct SourceConfig {
473    pub kind: SourceKind,
474    /// Either a local filesystem path or an `s3://bucket/key` URL.
475    pub location: String,
476}
477
478#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
479#[serde(rename_all = "lowercase")]
480pub enum SourceKind {
481    #[default]
482    Parquet,
483    Delta,
484}
485
486impl SourceKind {
487    pub fn as_str(self) -> &'static str {
488        match self {
489            SourceKind::Parquet => "parquet",
490            SourceKind::Delta => "delta",
491        }
492    }
493}
494
495/// Non-secret S3 connection settings. Credentials are pulled from env / the
496/// AWS credential chain — see [`DatasetConfig::resolved_creds`].
497#[derive(Debug, Clone, Deserialize)]
498#[serde(default)]
499pub struct S3Config {
500    pub region: Option<String>,
501    /// Custom endpoint (MinIO, R2, Wasabi, LocalStack, …). Omit for AWS.
502    pub endpoint: Option<String>,
503    /// `virtual` (default — `bucket.host`) or `path` (`host/bucket/`).
504    /// MinIO and most non-AWS providers require `path`.
505    pub addressing_style: AddressingStyle,
506    /// Allow plain-HTTP endpoints. Required for local MinIO over `http://…`.
507    pub allow_http: bool,
508    /// Inline credentials. Strongly discouraged in production — prefer env
509    /// vars (see module docs).
510    pub access_key_id: Option<String>,
511    pub secret_access_key: Option<String>,
512    pub session_token: Option<String>,
513}
514
515impl Default for S3Config {
516    fn default() -> Self {
517        Self {
518            region: None,
519            endpoint: None,
520            addressing_style: AddressingStyle::Virtual,
521            allow_http: false,
522            access_key_id: None,
523            secret_access_key: None,
524            session_token: None,
525        }
526    }
527}
528
529#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
530#[serde(rename_all = "lowercase")]
531pub enum AddressingStyle {
532    #[default]
533    Virtual,
534    Path,
535}
536
537impl AddressingStyle {
538    pub fn as_str(self) -> &'static str {
539        match self {
540            AddressingStyle::Virtual => "virtual",
541            AddressingStyle::Path => "path",
542        }
543    }
544}
545
546#[derive(Debug, Clone, Deserialize)]
547#[serde(default)]
548pub struct IndexConfig {
549    pub mode: IndexMode,
550    pub columns: Vec<String>,
551    pub max_cardinality: usize,
552}
553
554impl Default for IndexConfig {
555    fn default() -> Self {
556        Self {
557            mode: IndexMode::Auto,
558            columns: Vec::new(),
559            max_cardinality: 100_000,
560        }
561    }
562}
563
564#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
565#[serde(rename_all = "lowercase")]
566pub enum IndexMode {
567    #[default]
568    Auto,
569    None,
570    List,
571}
572
573/// Resolved S3 credentials. `None` fields mean "let the engine's default
574/// provider chain figure it out".
575#[derive(Debug, Clone, Default)]
576pub struct ResolvedCreds {
577    pub access_key_id: Option<String>,
578    pub secret_access_key: Option<String>,
579    pub session_token: Option<String>,
580}
581
582impl ResolvedCreds {
583    pub fn has_keypair(&self) -> bool {
584        self.access_key_id.is_some() && self.secret_access_key.is_some()
585    }
586}
587
588// ---------------------------------------------------------------------------
589// Loading + validation
590// ---------------------------------------------------------------------------
591
592impl AppConfig {
593    /// Read and validate a TOML config file.
594    pub fn load(path: &str) -> Result<Self, AppError> {
595        let raw = std::fs::read_to_string(path)
596            .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
597        let mut cfg: AppConfig =
598            toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
599        cfg.normalize();
600        cfg.validate()?;
601        Ok(cfg)
602    }
603
604    /// Canonicalise fields that are compared case-insensitively at runtime.
605    ///
606    /// Token scopes are lowercased when parsed out of a JWT (see `auth.rs`),
607    /// so the configured `read_scopes` / `reload_scopes` are lowercased here
608    /// once at load time. Without this an operator who writes
609    /// `"Datasets:Read"` would silently 403 every caller, since the token
610    /// side would have become `datasets:read`.
611    fn normalize(&mut self) {
612        for s in self
613            .auth
614            .read_scopes
615            .iter_mut()
616            .chain(self.auth.reload_scopes.iter_mut())
617        {
618            *s = s.to_ascii_lowercase();
619        }
620    }
621
622    fn validate(&self) -> Result<(), AppError> {
623        // Server prefix: empty, or must start with '/' and not end with '/'.
624        let p = &self.server.prefix;
625        if !p.is_empty() {
626            if !p.starts_with('/') {
627                return Err(AppError::Internal(format!(
628                    "server.prefix must start with '/' (got '{p}')"
629                )));
630            }
631            if p.ends_with('/') {
632                return Err(AppError::Internal(format!(
633                    "server.prefix must not end with '/' (got '{p}')"
634                )));
635            }
636        }
637
638        if self.datasets.is_empty() {
639            return Err(AppError::Internal(
640                "datasets.toml has no [[dataset]] entries".into(),
641            ));
642        }
643
644        if self.server.quack.enabled {
645            self.server.quack.validate_enabled()?;
646        }
647
648        // Validate the docs mount path even when the section is disabled,
649        // so an inactive config typo can't go unnoticed.
650        {
651            let dp = &self.docs.path;
652            if !dp.starts_with('/') {
653                return Err(AppError::Internal(format!(
654                    "docs.path must start with '/' (got '{dp}')"
655                )));
656            }
657            if dp.len() > 1 && dp.ends_with('/') {
658                return Err(AppError::Internal(format!(
659                    "docs.path must not end with '/' (got '{dp}')"
660                )));
661            }
662            if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
663                return Err(AppError::Internal(format!(
664                    "docs.path '{dp}' collides with a reserved route"
665                )));
666            }
667        }
668
669        // Same for the swagger UI mount.
670        {
671            let sp = &self.swagger.path;
672            if !sp.starts_with('/') {
673                return Err(AppError::Internal(format!(
674                    "swagger.path must start with '/' (got '{sp}')"
675                )));
676            }
677            if sp.len() > 1 && sp.ends_with('/') {
678                return Err(AppError::Internal(format!(
679                    "swagger.path must not end with '/' (got '{sp}')"
680                )));
681            }
682            if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
683                return Err(AppError::Internal(format!(
684                    "swagger.path '{sp}' collides with a reserved route"
685                )));
686            }
687            if sp == &self.docs.path {
688                return Err(AppError::Internal(format!(
689                    "swagger.path and docs.path must differ (both '{sp}')"
690                )));
691            }
692            if let Some(o) = &self.swagger.oauth2 {
693                if o.issuer.trim().is_empty() {
694                    return Err(AppError::Internal(
695                        "swagger.oauth2.issuer must not be empty".into(),
696                    ));
697                }
698                if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
699                    return Err(AppError::Internal(format!(
700                        "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
701                        o.issuer
702                    )));
703                }
704                if o.client_id.trim().is_empty() {
705                    return Err(AppError::Internal(
706                        "swagger.oauth2.client_id must not be empty".into(),
707                    ));
708                }
709            }
710        }
711
712        // Metrics endpoint mount path. Validated even when disabled so an
713        // inactive config typo can't go unnoticed. `/metrics` is itself a
714        // reserved mount (so docs/swagger can't shadow it), so we check the
715        // remaining reserved routes — and the docs/swagger paths — for
716        // collisions rather than the whole list.
717        {
718            let mp = &self.metrics.path;
719            if !mp.starts_with('/') {
720                return Err(AppError::Internal(format!(
721                    "metrics.path must start with '/' (got '{mp}')"
722                )));
723            }
724            if mp.len() > 1 && mp.ends_with('/') {
725                return Err(AppError::Internal(format!(
726                    "metrics.path must not end with '/' (got '{mp}')"
727                )));
728            }
729            if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
730                return Err(AppError::Internal(format!(
731                    "metrics.path '{mp}' collides with a reserved route"
732                )));
733            }
734            if mp == &self.docs.path {
735                return Err(AppError::Internal(format!(
736                    "metrics.path and docs.path must differ (both '{mp}')"
737                )));
738            }
739            if mp == &self.swagger.path {
740                return Err(AppError::Internal(format!(
741                    "metrics.path and swagger.path must differ (both '{mp}')"
742                )));
743            }
744        }
745
746        // Auth block — only meaningful when `enabled = true`. The cargo
747        // feature gate is enforced separately in `server::serve` so a
748        // binary built without `--features auth` and a config with
749        // `auth.enabled = true` aborts with a clear error.
750        if self.auth.enabled {
751            let a = &self.auth;
752            if a.issuer.trim().is_empty() {
753                return Err(AppError::Internal(
754                    "auth.issuer must not be empty when auth.enabled = true".into(),
755                ));
756            }
757            if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
758                return Err(AppError::Internal(format!(
759                    "auth.issuer must be an absolute http(s) URL (got '{}')",
760                    a.issuer
761                )));
762            }
763            for alg in &a.algorithms {
764                match alg.as_str() {
765                    "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
766                    | "PS512" => {}
767                    other => {
768                        return Err(AppError::Internal(format!(
769                            "auth.algorithms[{other}] is not allowed; pick one of \
770                         RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
771                        )));
772                    }
773                }
774            }
775            if a.algorithms.is_empty() {
776                return Err(AppError::Internal(
777                    "auth.algorithms must not be empty".into(),
778                ));
779            }
780            if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
781                return Err(AppError::Internal(format!(
782                    "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
783                    a.tenant_claim
784                )));
785            }
786            if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
787                return Err(AppError::Internal(
788                    "auth.allowed_tenants is set but auth.tenant_claim is empty — \
789                     can't enforce a tenant allow-list without a claim to extract from"
790                        .into(),
791                ));
792            }
793        }
794
795        let mut seen = HashSet::new();
796        for d in &self.datasets {
797            if !seen.insert(d.name.as_str()) {
798                return Err(AppError::Internal(format!(
799                    "duplicate dataset name: {}",
800                    d.name
801                )));
802            }
803            if d.name.is_empty() {
804                return Err(AppError::Internal("dataset name must not be empty".into()));
805            }
806            // URL-safe: alphanum + _ - .
807            if !d
808                .name
809                .chars()
810                .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
811            {
812                return Err(AppError::Internal(format!(
813                    "dataset name '{}' must be alphanumeric (plus _ - .)",
814                    d.name
815                )));
816            }
817
818            if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
819                return Err(AppError::Internal(format!(
820                    "dataset '{}': index.mode = 'list' requires non-empty index.columns",
821                    d.name
822                )));
823            }
824
825            // Location-specific checks.
826            if d.source.is_s3() {
827                d.source.s3_bucket()?;
828                if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
829                    && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
830                    && std::env::var("AWS_REGION").is_err()
831                    && std::env::var("AWS_DEFAULT_REGION").is_err()
832                {
833                    log::warn!(
834                        "dataset '{}': S3 source without explicit region — \
835                         relying on AWS_REGION env var",
836                        d.name
837                    );
838                }
839            } else {
840                // Local path. For parquet we can fully resolve to a file
841                // list up front; for delta we only check that the directory
842                // exists (delta has its own layout — _delta_log/, …).
843                match d.source.kind {
844                    SourceKind::Parquet => {
845                        d.resolve_local_parquet_files()?;
846                    }
847                    SourceKind::Delta => {
848                        let p = Path::new(&d.source.location);
849                        if !p.exists() {
850                            return Err(AppError::Internal(format!(
851                                "dataset '{}': delta location does not exist: {}",
852                                d.name, d.source.location
853                            )));
854                        }
855                    }
856                }
857            }
858        }
859        Ok(())
860    }
861}
862
863impl SourceConfig {
864    pub fn is_s3(&self) -> bool {
865        self.location.starts_with("s3://")
866    }
867
868    /// Returns `(bucket, key_prefix_or_empty)` for an `s3://…` location.
869    pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
870        let rest = self
871            .location
872            .strip_prefix("s3://")
873            .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
874        let (bucket, key) = match rest.split_once('/') {
875            Some((b, k)) => (b, k),
876            None => (rest, ""),
877        };
878        if bucket.is_empty() {
879            return Err(AppError::Internal(format!(
880                "s3 URL missing bucket: {}",
881                self.location
882            )));
883        }
884        Ok((bucket, key))
885    }
886}
887
888impl DatasetConfig {
889    /// Expand `source.location` to a concrete list of local `.parquet`
890    /// files. Only valid for `kind = parquet` on local paths — S3 and
891    /// Delta sources are resolved by the backend itself.
892    ///
893    /// Accepts three location shapes:
894    ///   * a single `*.parquet` file
895    ///   * a directory (lists every `*.parquet` directly inside, non-recursive)
896    ///   * a glob pattern containing `*`, `?` or `[…]` (e.g.
897    ///     `data/year=2024/*.parquet`, `data/**/*.parquet`)
898    pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
899        if self.source.is_s3() {
900            return Err(AppError::Internal(format!(
901                "dataset '{}': resolve_local_parquet_files called on s3 source",
902                self.name
903            )));
904        }
905        let loc = &self.source.location;
906
907        // Glob pattern? Expand and require at least one match.
908        if loc.contains('*') || loc.contains('?') || loc.contains('[') {
909            let mut files: Vec<PathBuf> = glob::glob(loc)
910                .map_err(|e| {
911                    AppError::Internal(format!(
912                        "dataset '{}': bad glob pattern '{loc}': {e}",
913                        self.name
914                    ))
915                })?
916                .filter_map(|r| r.ok())
917                .filter(|p| {
918                    p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
919                })
920                .collect();
921            files.sort();
922            if files.is_empty() {
923                return Err(AppError::Internal(format!(
924                    "dataset '{}': glob '{loc}' matched no .parquet files",
925                    self.name
926                )));
927            }
928            return Ok(files);
929        }
930
931        let path = Path::new(loc);
932        if !path.exists() {
933            return Err(AppError::Internal(format!(
934                "dataset '{}': source path does not exist: {loc}",
935                self.name
936            )));
937        }
938
939        if path.is_file() {
940            if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
941                return Err(AppError::Internal(format!(
942                    "dataset '{}': source must be a .parquet file",
943                    self.name
944                )));
945            }
946            return Ok(vec![path.to_path_buf()]);
947        }
948
949        let mut files: Vec<PathBuf> = std::fs::read_dir(path)
950            .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
951            .filter_map(|entry| entry.ok().map(|e| e.path()))
952            .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
953            .collect();
954        files.sort();
955        if files.is_empty() {
956            return Err(AppError::Internal(format!(
957                "dataset '{}': no *.parquet files found in {loc}",
958                self.name
959            )));
960        }
961        Ok(files)
962    }
963
964    /// Env-var prefix derived from the dataset name: uppercase with
965    /// non-alphanumeric chars replaced by `_`. E.g. `sales.eu-1` →
966    /// `SALES_EU_1`.
967    pub fn env_prefix(&self) -> String {
968        self.name
969            .chars()
970            .map(|c| {
971                if c.is_ascii_alphanumeric() {
972                    c.to_ascii_uppercase()
973                } else {
974                    '_'
975                }
976            })
977            .collect()
978    }
979
980    /// Resolve S3 credentials following the precedence chain documented at
981    /// the top of this module. Returns an empty struct when nothing was
982    /// found — the caller should then leave credential resolution to the
983    /// engine's default provider chain.
984    pub fn resolved_creds(&self) -> ResolvedCreds {
985        let prefix = self.env_prefix();
986        let from_env = |suffix: &str| {
987            std::env::var(format!("{prefix}_{suffix}"))
988                .ok()
989                .filter(|s| !s.is_empty())
990        };
991        let inline = self.s3.as_ref();
992        let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
993
994        ResolvedCreds {
995            access_key_id: from_env("AWS_ACCESS_KEY_ID")
996                .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
997                .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
998            secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
999                .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1000                .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1001            session_token: from_env("AWS_SESSION_TOKEN")
1002                .or_else(|| inline.and_then(|s| s.session_token.clone()))
1003                .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1004        }
1005    }
1006
1007    /// Resolved S3 region: per-dataset env (`${PREFIX}_AWS_REGION`)
1008    /// → inline → `AWS_REGION` → `AWS_DEFAULT_REGION` → `us-east-1`.
1009    pub fn resolved_region(&self) -> String {
1010        let prefix = self.env_prefix();
1011        std::env::var(format!("{prefix}_AWS_REGION"))
1012            .ok()
1013            .filter(|s| !s.is_empty())
1014            .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1015            .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1016            .or_else(|| {
1017                std::env::var("AWS_DEFAULT_REGION")
1018                    .ok()
1019                    .filter(|s| !s.is_empty())
1020            })
1021            .unwrap_or_else(|| "us-east-1".to_string())
1022    }
1023}
1024
1025#[cfg(test)]
1026mod tests {
1027    use super::*;
1028
1029    #[test]
1030    fn server_defaults() {
1031        let s = ServerConfig::default();
1032        assert_eq!(s.backend, Backend::Datafusion);
1033        assert_eq!(s.port, 8080);
1034        assert!(s.compress);
1035        assert_eq!(s.max_body_bytes, 1024 * 1024);
1036        assert_eq!(s.max_page_size, 100_000);
1037        assert_eq!(s.request_timeout_ms, 30_000);
1038        assert!(!s.quack.enabled);
1039        assert_eq!(s.quack.uri, "quack:localhost");
1040        assert!(s.quack.token.is_none());
1041        assert!(!s.quack.allow_other_hostname);
1042        assert!(s.quack.read_only);
1043        assert_eq!(s.prefix, "");
1044        assert!(s.listen.is_loopback());
1045    }
1046
1047    #[test]
1048    fn server_overrides_from_toml() {
1049        let toml = r#"
1050            [server]
1051            backend = "duckdb"
1052            port = 9000
1053            prefix = "/datapress"
1054            compress = false
1055            max_body_bytes = 4096
1056            max_page_size = 50000
1057            request_timeout_ms = 0
1058
1059            [server.quack]
1060            enabled = true
1061            uri = "quack:localhost:9495"
1062            token = "test-token"
1063            read_only = false
1064            [[dataset]]
1065            name = "x"
1066            source.kind = "parquet"
1067            source.location = "/tmp/missing.parquet"
1068        "#;
1069        let cfg: AppConfig = toml::from_str(toml).unwrap();
1070        assert_eq!(cfg.server.backend, Backend::Duckdb);
1071        assert_eq!(cfg.server.port, 9000);
1072        assert_eq!(cfg.server.prefix, "/datapress");
1073        assert!(!cfg.server.compress);
1074        assert_eq!(cfg.server.max_body_bytes, 4096);
1075        assert_eq!(cfg.server.max_page_size, 50_000);
1076        assert_eq!(cfg.server.request_timeout_ms, 0);
1077        assert!(cfg.server.quack.enabled);
1078        assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1079        assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1080        assert!(!cfg.server.quack.read_only);
1081        assert_eq!(cfg.datasets.len(), 1);
1082        assert_eq!(cfg.datasets[0].name, "x");
1083        assert!(cfg.datasets[0].dict_encode); // default
1084    }
1085
1086    #[test]
1087    fn validate_rejects_bad_prefix() {
1088        let bad = ["no-leading-slash", "/trailing/"];
1089        for p in bad {
1090            let cfg = AppConfig {
1091                server: ServerConfig {
1092                    prefix: p.to_string(),
1093                    ..Default::default()
1094                },
1095                docs: DocsConfig::default(),
1096                swagger: SwaggerConfig::default(),
1097                metrics: MetricsConfig::default(),
1098                auth: AuthConfig::default(),
1099                datasets: vec![],
1100            };
1101            assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1102        }
1103    }
1104
1105    #[test]
1106    fn normalize_lowercases_configured_scopes() {
1107        let mut cfg = AppConfig {
1108            server: ServerConfig::default(),
1109            docs: DocsConfig::default(),
1110            swagger: SwaggerConfig::default(),
1111            metrics: MetricsConfig::default(),
1112            auth: AuthConfig {
1113                read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1114                reload_scopes: vec!["Datasets:Reload".into()],
1115                ..Default::default()
1116            },
1117            datasets: vec![],
1118        };
1119        cfg.normalize();
1120        assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1121        assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1122    }
1123
1124    #[test]
1125    fn validate_rejects_no_datasets() {
1126        let cfg = AppConfig {
1127            server: ServerConfig::default(),
1128            docs: DocsConfig::default(),
1129            swagger: SwaggerConfig::default(),
1130            metrics: MetricsConfig::default(),
1131            auth: AuthConfig::default(),
1132            datasets: vec![],
1133        };
1134        let err = cfg.validate().unwrap_err();
1135        assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1136    }
1137
1138    #[cfg(feature = "auth")]
1139    #[test]
1140    fn validate_accepts_auth_issuer_with_trailing_slash() {
1141        use std::io::Write;
1142
1143        let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1144        let _ = std::fs::remove_dir_all(&dir);
1145        std::fs::create_dir_all(&dir).unwrap();
1146        let file = dir.join("a.parquet");
1147        std::fs::File::create(&file)
1148            .unwrap()
1149            .write_all(b"x")
1150            .unwrap();
1151
1152        let cfg = AppConfig {
1153            server: ServerConfig::default(),
1154            docs: DocsConfig::default(),
1155            swagger: SwaggerConfig::default(),
1156            metrics: MetricsConfig::default(),
1157            auth: AuthConfig {
1158                enabled: true,
1159                issuer: "https://tenant.example.com/".into(),
1160                ..Default::default()
1161            },
1162            datasets: vec![DatasetConfig {
1163                name: "x".into(),
1164                source: SourceConfig {
1165                    kind: SourceKind::Parquet,
1166                    location: file.to_string_lossy().into_owned(),
1167                },
1168                s3: None,
1169                index: IndexConfig::default(),
1170                columns: vec![],
1171                dict_encode: true,
1172                lazy: false,
1173            }],
1174        };
1175
1176        assert!(cfg.validate().is_ok());
1177        let _ = std::fs::remove_dir_all(&dir);
1178    }
1179
1180    #[test]
1181    fn validate_rejects_quack_non_local_host_without_override() {
1182        let cfg = AppConfig {
1183            server: ServerConfig {
1184                quack: QuackConfig {
1185                    enabled: true,
1186                    uri: "quack:127.0.0.1".into(),
1187                    token: Some("test-token".into()),
1188                    ..Default::default()
1189                },
1190                ..Default::default()
1191            },
1192            docs: DocsConfig::default(),
1193            swagger: SwaggerConfig::default(),
1194            metrics: MetricsConfig::default(),
1195            auth: AuthConfig::default(),
1196            datasets: vec![DatasetConfig {
1197                name: "x".into(),
1198                source: SourceConfig {
1199                    kind: SourceKind::Parquet,
1200                    location: "/tmp/missing.parquet".into(),
1201                },
1202                s3: None,
1203                index: IndexConfig::default(),
1204                columns: vec![],
1205                dict_encode: true,
1206                lazy: false,
1207            }],
1208        };
1209        let err = cfg.validate().unwrap_err();
1210        assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1211    }
1212
1213    #[test]
1214    fn validate_rejects_bad_dataset_name() {
1215        let cfg: AppConfig = toml::from_str(
1216            r#"
1217            [[dataset]]
1218            name = "bad name!"
1219            source.kind = "parquet"
1220            source.location = "/tmp/whatever"
1221        "#,
1222        )
1223        .unwrap();
1224        let err = cfg.validate().unwrap_err();
1225        assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1226    }
1227
1228    #[test]
1229    fn validate_rejects_duplicate_names() {
1230        use std::io::Write;
1231        let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1232        let _ = std::fs::remove_dir_all(&dir);
1233        std::fs::create_dir_all(&dir).unwrap();
1234        let f = dir.join("a.parquet");
1235        std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1236        let path = f.to_str().unwrap();
1237
1238        let cfg: AppConfig = toml::from_str(&format!(
1239            r#"
1240            [[dataset]]
1241            name = "a"
1242            source.kind = "parquet"
1243            source.location = "{path}"
1244            [[dataset]]
1245            name = "a"
1246            source.kind = "parquet"
1247            source.location = "{path}"
1248        "#
1249        ))
1250        .unwrap();
1251        let err = cfg.validate().expect_err("expected error");
1252        assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1253
1254        let _ = std::fs::remove_dir_all(&dir);
1255    }
1256
1257    #[test]
1258    fn s3_bucket_parsing() {
1259        let mk = |loc: &str| SourceConfig {
1260            kind: SourceKind::Parquet,
1261            location: loc.into(),
1262        };
1263        let s1 = mk("s3://bucket/path/key");
1264        assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1265        let s2 = mk("s3://only-bucket");
1266        assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1267        assert!(mk("s3:///nokey").s3_bucket().is_err());
1268        assert!(mk("/local/path").s3_bucket().is_err());
1269    }
1270
1271    #[test]
1272    fn env_prefix_sanitises_name() {
1273        let mk = |name: &str| DatasetConfig {
1274            name: name.into(),
1275            source: SourceConfig {
1276                kind: SourceKind::Parquet,
1277                location: "x".into(),
1278            },
1279            s3: None,
1280            index: IndexConfig::default(),
1281            columns: vec![],
1282            dict_encode: true,
1283            lazy: false,
1284        };
1285        assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1286        assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1287        assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1288    }
1289
1290    #[test]
1291    fn resolve_local_parquet_single_file_and_dir() {
1292        use std::io::Write;
1293        let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1294        let _ = std::fs::remove_dir_all(&dir);
1295        std::fs::create_dir_all(&dir).unwrap();
1296        let f = dir.join("a.parquet");
1297        let mut fh = std::fs::File::create(&f).unwrap();
1298        fh.write_all(b"not really parquet").unwrap();
1299
1300        let mk = |loc: &str| DatasetConfig {
1301            name: "ds".into(),
1302            source: SourceConfig {
1303                kind: SourceKind::Parquet,
1304                location: loc.into(),
1305            },
1306            s3: None,
1307            index: IndexConfig::default(),
1308            columns: vec![],
1309            dict_encode: true,
1310            lazy: false,
1311        };
1312
1313        // Direct file.
1314        let files = mk(f.to_str().unwrap())
1315            .resolve_local_parquet_files()
1316            .unwrap();
1317        assert_eq!(files, vec![f.clone()]);
1318
1319        // Directory.
1320        let files = mk(dir.to_str().unwrap())
1321            .resolve_local_parquet_files()
1322            .unwrap();
1323        assert_eq!(files, vec![f.clone()]);
1324
1325        // Missing path.
1326        assert!(
1327            mk("/no/such/place.parquet")
1328                .resolve_local_parquet_files()
1329                .is_err()
1330        );
1331
1332        let _ = std::fs::remove_dir_all(&dir);
1333    }
1334}