use std::collections::HashSet;
use std::net::IpAddr;
use std::path::{Path, PathBuf};
use serde::Deserialize;
use crate::errors::AppError;
const RESERVED_MOUNTS: &[&str] = &[
"/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
];
#[derive(Debug, Deserialize)]
pub struct AppConfig {
#[serde(default)]
pub server: ServerConfig,
#[serde(default)]
pub docs: DocsConfig,
#[serde(default)]
pub swagger: SwaggerConfig,
#[serde(default)]
pub metrics: MetricsConfig,
#[serde(default)]
pub auth: AuthConfig,
#[serde(rename = "dataset", default)]
pub datasets: Vec<DatasetConfig>,
}
#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct ServerConfig {
pub backend: Backend,
pub listen: IpAddr,
pub port: u16,
pub workers: Option<usize>,
pub prefix: String,
pub compress: bool,
pub max_body_bytes: usize,
pub max_page_size: u64,
pub request_timeout_ms: u64,
pub shutdown_timeout_secs: u64,
pub quack: QuackConfig,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
backend: Backend::default(),
listen: IpAddr::from([127, 0, 0, 1]),
port: 8080,
workers: None,
prefix: String::new(),
compress: true,
max_body_bytes: 1024 * 1024,
max_page_size: 100_000,
request_timeout_ms: 30_000,
shutdown_timeout_secs: 30,
quack: QuackConfig::default(),
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(default)]
pub struct QuackConfig {
pub enabled: bool,
pub uri: String,
pub token: Option<String>,
pub allow_other_hostname: bool,
pub read_only: bool,
}
impl Default for QuackConfig {
fn default() -> Self {
Self {
enabled: false,
uri: "quack:localhost".into(),
token: None,
allow_other_hostname: false,
read_only: true,
}
}
}
impl QuackConfig {
pub fn validate_enabled(&self) -> Result<(), AppError> {
if self.uri.trim().is_empty() {
return Err(AppError::Internal(
"server.quack.uri must not be empty when server.quack.enabled = true".into(),
));
}
if !self.uri.starts_with("quack:") {
return Err(AppError::Internal(format!(
"server.quack.uri must start with 'quack:' (got '{}')",
self.uri
)));
}
if !self.allow_other_hostname {
let host = self.hostname().unwrap_or_default();
if host != "localhost" {
return Err(AppError::Internal(format!(
"server.quack.uri host must be 'localhost' unless \
server.quack.allow_other_hostname = true (got '{}')",
self.uri
)));
}
}
if let Some(token) = self.token.as_deref()
&& token.len() < 4
{
return Err(AppError::Internal(
"server.quack.token must be at least 4 characters".into(),
));
}
Ok(())
}
fn hostname(&self) -> Option<&str> {
let rest = self.uri.strip_prefix("quack:")?;
let rest = rest.strip_prefix("//").unwrap_or(rest);
let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
(!host.is_empty()).then_some(host)
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Backend {
#[default]
Datafusion,
Duckdb,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct DocsConfig {
pub enabled: bool,
pub path: String,
}
impl Default for DocsConfig {
fn default() -> Self {
Self {
enabled: true,
path: "/mkdocs".into(),
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct SwaggerConfig {
pub enabled: bool,
pub path: String,
pub oauth2: Option<SwaggerOAuth2Config>,
}
impl Default for SwaggerConfig {
fn default() -> Self {
Self {
enabled: true,
path: "/docs".into(),
oauth2: None,
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SwaggerOAuth2Config {
pub issuer: String,
pub client_id: String,
#[serde(default)]
pub scopes: Vec<String>,
#[serde(default = "default_true")]
pub pkce: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct MetricsConfig {
pub enabled: bool,
pub path: String,
}
impl Default for MetricsConfig {
fn default() -> Self {
Self {
enabled: false,
path: "/metrics".into(),
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct AuthConfig {
pub enabled: bool,
pub issuer: String,
pub audience: String,
pub read_scopes: Vec<String>,
pub reload_scopes: Vec<String>,
pub anonymous_read: bool,
pub start_degraded: bool,
pub algorithms: Vec<String>,
pub leeway_secs: u64,
pub jwks_refresh_secs: u64,
pub tenant_claim: String,
pub allowed_tenants: Vec<String>,
pub admin_token_fallback: bool,
}
impl Default for AuthConfig {
fn default() -> Self {
Self {
enabled: false,
issuer: String::new(),
audience: String::new(),
read_scopes: Vec::new(),
reload_scopes: Vec::new(),
anonymous_read: false,
start_degraded: true,
algorithms: vec!["RS256".into()],
leeway_secs: 60,
jwks_refresh_secs: 3600,
tenant_claim: String::new(),
allowed_tenants: Vec::new(),
admin_token_fallback: true,
}
}
}
impl Backend {
pub fn as_str(self) -> &'static str {
match self {
Backend::Datafusion => "datafusion",
Backend::Duckdb => "duckdb",
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct DatasetConfig {
pub name: String,
pub source: SourceConfig,
#[serde(default)]
pub s3: Option<S3Config>,
#[serde(default)]
pub index: IndexConfig,
#[serde(default)]
pub columns: Vec<String>,
#[serde(default = "default_true")]
pub dict_encode: bool,
#[serde(default)]
pub lazy: bool,
}
fn default_true() -> bool {
true
}
#[derive(Debug, Clone, Deserialize)]
pub struct SourceConfig {
pub kind: SourceKind,
pub location: String,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SourceKind {
#[default]
Parquet,
Delta,
}
impl SourceKind {
pub fn as_str(self) -> &'static str {
match self {
SourceKind::Parquet => "parquet",
SourceKind::Delta => "delta",
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(default)]
pub struct S3Config {
pub region: Option<String>,
pub endpoint: Option<String>,
pub addressing_style: AddressingStyle,
pub allow_http: bool,
pub access_key_id: Option<String>,
pub secret_access_key: Option<String>,
pub session_token: Option<String>,
}
impl Default for S3Config {
fn default() -> Self {
Self {
region: None,
endpoint: None,
addressing_style: AddressingStyle::Virtual,
allow_http: false,
access_key_id: None,
secret_access_key: None,
session_token: None,
}
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum AddressingStyle {
#[default]
Virtual,
Path,
}
impl AddressingStyle {
pub fn as_str(self) -> &'static str {
match self {
AddressingStyle::Virtual => "virtual",
AddressingStyle::Path => "path",
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(default)]
pub struct IndexConfig {
pub mode: IndexMode,
pub columns: Vec<String>,
pub max_cardinality: usize,
}
impl Default for IndexConfig {
fn default() -> Self {
Self {
mode: IndexMode::Auto,
columns: Vec::new(),
max_cardinality: 100_000,
}
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum IndexMode {
#[default]
Auto,
None,
List,
}
#[derive(Debug, Clone, Default)]
pub struct ResolvedCreds {
pub access_key_id: Option<String>,
pub secret_access_key: Option<String>,
pub session_token: Option<String>,
}
impl ResolvedCreds {
pub fn has_keypair(&self) -> bool {
self.access_key_id.is_some() && self.secret_access_key.is_some()
}
}
impl AppConfig {
pub fn load(path: &str) -> Result<Self, AppError> {
let raw = std::fs::read_to_string(path)
.map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
let mut cfg: AppConfig =
toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
cfg.normalize();
cfg.validate()?;
Ok(cfg)
}
fn normalize(&mut self) {
for s in self
.auth
.read_scopes
.iter_mut()
.chain(self.auth.reload_scopes.iter_mut())
{
*s = s.to_ascii_lowercase();
}
}
fn validate(&self) -> Result<(), AppError> {
let p = &self.server.prefix;
if !p.is_empty() {
if !p.starts_with('/') {
return Err(AppError::Internal(format!(
"server.prefix must start with '/' (got '{p}')"
)));
}
if p.ends_with('/') {
return Err(AppError::Internal(format!(
"server.prefix must not end with '/' (got '{p}')"
)));
}
}
if self.datasets.is_empty() {
return Err(AppError::Internal(
"datasets.toml has no [[dataset]] entries".into(),
));
}
if self.server.quack.enabled {
self.server.quack.validate_enabled()?;
}
{
let dp = &self.docs.path;
if !dp.starts_with('/') {
return Err(AppError::Internal(format!(
"docs.path must start with '/' (got '{dp}')"
)));
}
if dp.len() > 1 && dp.ends_with('/') {
return Err(AppError::Internal(format!(
"docs.path must not end with '/' (got '{dp}')"
)));
}
if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
return Err(AppError::Internal(format!(
"docs.path '{dp}' collides with a reserved route"
)));
}
}
{
let sp = &self.swagger.path;
if !sp.starts_with('/') {
return Err(AppError::Internal(format!(
"swagger.path must start with '/' (got '{sp}')"
)));
}
if sp.len() > 1 && sp.ends_with('/') {
return Err(AppError::Internal(format!(
"swagger.path must not end with '/' (got '{sp}')"
)));
}
if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
return Err(AppError::Internal(format!(
"swagger.path '{sp}' collides with a reserved route"
)));
}
if sp == &self.docs.path {
return Err(AppError::Internal(format!(
"swagger.path and docs.path must differ (both '{sp}')"
)));
}
if let Some(o) = &self.swagger.oauth2 {
if o.issuer.trim().is_empty() {
return Err(AppError::Internal(
"swagger.oauth2.issuer must not be empty".into(),
));
}
if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
return Err(AppError::Internal(format!(
"swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
o.issuer
)));
}
if o.client_id.trim().is_empty() {
return Err(AppError::Internal(
"swagger.oauth2.client_id must not be empty".into(),
));
}
}
}
{
let mp = &self.metrics.path;
if !mp.starts_with('/') {
return Err(AppError::Internal(format!(
"metrics.path must start with '/' (got '{mp}')"
)));
}
if mp.len() > 1 && mp.ends_with('/') {
return Err(AppError::Internal(format!(
"metrics.path must not end with '/' (got '{mp}')"
)));
}
if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
return Err(AppError::Internal(format!(
"metrics.path '{mp}' collides with a reserved route"
)));
}
if mp == &self.docs.path {
return Err(AppError::Internal(format!(
"metrics.path and docs.path must differ (both '{mp}')"
)));
}
if mp == &self.swagger.path {
return Err(AppError::Internal(format!(
"metrics.path and swagger.path must differ (both '{mp}')"
)));
}
}
if self.auth.enabled {
let a = &self.auth;
if a.issuer.trim().is_empty() {
return Err(AppError::Internal(
"auth.issuer must not be empty when auth.enabled = true".into(),
));
}
if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
return Err(AppError::Internal(format!(
"auth.issuer must be an absolute http(s) URL (got '{}')",
a.issuer
)));
}
for alg in &a.algorithms {
match alg.as_str() {
"RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
| "PS512" => {}
other => {
return Err(AppError::Internal(format!(
"auth.algorithms[{other}] is not allowed; pick one of \
RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
)));
}
}
}
if a.algorithms.is_empty() {
return Err(AppError::Internal(
"auth.algorithms must not be empty".into(),
));
}
if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
return Err(AppError::Internal(format!(
"auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
a.tenant_claim
)));
}
if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
return Err(AppError::Internal(
"auth.allowed_tenants is set but auth.tenant_claim is empty — \
can't enforce a tenant allow-list without a claim to extract from"
.into(),
));
}
}
let mut seen = HashSet::new();
for d in &self.datasets {
if !seen.insert(d.name.as_str()) {
return Err(AppError::Internal(format!(
"duplicate dataset name: {}",
d.name
)));
}
if d.name.is_empty() {
return Err(AppError::Internal("dataset name must not be empty".into()));
}
if !d
.name
.chars()
.all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
{
return Err(AppError::Internal(format!(
"dataset name '{}' must be alphanumeric (plus _ - .)",
d.name
)));
}
if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
return Err(AppError::Internal(format!(
"dataset '{}': index.mode = 'list' requires non-empty index.columns",
d.name
)));
}
if d.source.is_s3() {
d.source.s3_bucket()?;
if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
&& d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
&& std::env::var("AWS_REGION").is_err()
&& std::env::var("AWS_DEFAULT_REGION").is_err()
{
log::warn!(
"dataset '{}': S3 source without explicit region — \
relying on AWS_REGION env var",
d.name
);
}
} else {
match d.source.kind {
SourceKind::Parquet => {
d.resolve_local_parquet_files()?;
}
SourceKind::Delta => {
let p = Path::new(&d.source.location);
if !p.exists() {
return Err(AppError::Internal(format!(
"dataset '{}': delta location does not exist: {}",
d.name, d.source.location
)));
}
}
}
}
}
Ok(())
}
}
impl SourceConfig {
pub fn is_s3(&self) -> bool {
self.location.starts_with("s3://")
}
pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
let rest = self
.location
.strip_prefix("s3://")
.ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
let (bucket, key) = match rest.split_once('/') {
Some((b, k)) => (b, k),
None => (rest, ""),
};
if bucket.is_empty() {
return Err(AppError::Internal(format!(
"s3 URL missing bucket: {}",
self.location
)));
}
Ok((bucket, key))
}
}
impl DatasetConfig {
pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
if self.source.is_s3() {
return Err(AppError::Internal(format!(
"dataset '{}': resolve_local_parquet_files called on s3 source",
self.name
)));
}
let loc = &self.source.location;
if loc.contains('*') || loc.contains('?') || loc.contains('[') {
let mut files: Vec<PathBuf> = glob::glob(loc)
.map_err(|e| {
AppError::Internal(format!(
"dataset '{}': bad glob pattern '{loc}': {e}",
self.name
))
})?
.filter_map(|r| r.ok())
.filter(|p| {
p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
})
.collect();
files.sort();
if files.is_empty() {
return Err(AppError::Internal(format!(
"dataset '{}': glob '{loc}' matched no .parquet files",
self.name
)));
}
return Ok(files);
}
let path = Path::new(loc);
if !path.exists() {
return Err(AppError::Internal(format!(
"dataset '{}': source path does not exist: {loc}",
self.name
)));
}
if path.is_file() {
if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
return Err(AppError::Internal(format!(
"dataset '{}': source must be a .parquet file",
self.name
)));
}
return Ok(vec![path.to_path_buf()]);
}
let mut files: Vec<PathBuf> = std::fs::read_dir(path)
.map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
.filter_map(|entry| entry.ok().map(|e| e.path()))
.filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
.collect();
files.sort();
if files.is_empty() {
return Err(AppError::Internal(format!(
"dataset '{}': no *.parquet files found in {loc}",
self.name
)));
}
Ok(files)
}
pub fn env_prefix(&self) -> String {
self.name
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() {
c.to_ascii_uppercase()
} else {
'_'
}
})
.collect()
}
pub fn resolved_creds(&self) -> ResolvedCreds {
let prefix = self.env_prefix();
let from_env = |suffix: &str| {
std::env::var(format!("{prefix}_{suffix}"))
.ok()
.filter(|s| !s.is_empty())
};
let inline = self.s3.as_ref();
let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
ResolvedCreds {
access_key_id: from_env("AWS_ACCESS_KEY_ID")
.or_else(|| inline.and_then(|s| s.access_key_id.clone()))
.or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
.or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
.or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
session_token: from_env("AWS_SESSION_TOKEN")
.or_else(|| inline.and_then(|s| s.session_token.clone()))
.or_else(|| plain_env("AWS_SESSION_TOKEN")),
}
}
pub fn resolved_region(&self) -> String {
let prefix = self.env_prefix();
std::env::var(format!("{prefix}_AWS_REGION"))
.ok()
.filter(|s| !s.is_empty())
.or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
.or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
.or_else(|| {
std::env::var("AWS_DEFAULT_REGION")
.ok()
.filter(|s| !s.is_empty())
})
.unwrap_or_else(|| "us-east-1".to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn server_defaults() {
let s = ServerConfig::default();
assert_eq!(s.backend, Backend::Datafusion);
assert_eq!(s.port, 8080);
assert!(s.compress);
assert_eq!(s.max_body_bytes, 1024 * 1024);
assert_eq!(s.max_page_size, 100_000);
assert_eq!(s.request_timeout_ms, 30_000);
assert!(!s.quack.enabled);
assert_eq!(s.quack.uri, "quack:localhost");
assert!(s.quack.token.is_none());
assert!(!s.quack.allow_other_hostname);
assert!(s.quack.read_only);
assert_eq!(s.prefix, "");
assert!(s.listen.is_loopback());
}
#[test]
fn server_overrides_from_toml() {
let toml = r#"
[server]
backend = "duckdb"
port = 9000
prefix = "/datapress"
compress = false
max_body_bytes = 4096
max_page_size = 50000
request_timeout_ms = 0
[server.quack]
enabled = true
uri = "quack:localhost:9495"
token = "test-token"
read_only = false
[[dataset]]
name = "x"
source.kind = "parquet"
source.location = "/tmp/missing.parquet"
"#;
let cfg: AppConfig = toml::from_str(toml).unwrap();
assert_eq!(cfg.server.backend, Backend::Duckdb);
assert_eq!(cfg.server.port, 9000);
assert_eq!(cfg.server.prefix, "/datapress");
assert!(!cfg.server.compress);
assert_eq!(cfg.server.max_body_bytes, 4096);
assert_eq!(cfg.server.max_page_size, 50_000);
assert_eq!(cfg.server.request_timeout_ms, 0);
assert!(cfg.server.quack.enabled);
assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
assert!(!cfg.server.quack.read_only);
assert_eq!(cfg.datasets.len(), 1);
assert_eq!(cfg.datasets[0].name, "x");
assert!(cfg.datasets[0].dict_encode); }
#[test]
fn validate_rejects_bad_prefix() {
let bad = ["no-leading-slash", "/trailing/"];
for p in bad {
let cfg = AppConfig {
server: ServerConfig {
prefix: p.to_string(),
..Default::default()
},
docs: DocsConfig::default(),
swagger: SwaggerConfig::default(),
metrics: MetricsConfig::default(),
auth: AuthConfig::default(),
datasets: vec![],
};
assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
}
}
#[test]
fn normalize_lowercases_configured_scopes() {
let mut cfg = AppConfig {
server: ServerConfig::default(),
docs: DocsConfig::default(),
swagger: SwaggerConfig::default(),
metrics: MetricsConfig::default(),
auth: AuthConfig {
read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
reload_scopes: vec!["Datasets:Reload".into()],
..Default::default()
},
datasets: vec![],
};
cfg.normalize();
assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
}
#[test]
fn validate_rejects_no_datasets() {
let cfg = AppConfig {
server: ServerConfig::default(),
docs: DocsConfig::default(),
swagger: SwaggerConfig::default(),
metrics: MetricsConfig::default(),
auth: AuthConfig::default(),
datasets: vec![],
};
let err = cfg.validate().unwrap_err();
assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
}
#[cfg(feature = "auth")]
#[test]
fn validate_accepts_auth_issuer_with_trailing_slash() {
use std::io::Write;
let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let file = dir.join("a.parquet");
std::fs::File::create(&file)
.unwrap()
.write_all(b"x")
.unwrap();
let cfg = AppConfig {
server: ServerConfig::default(),
docs: DocsConfig::default(),
swagger: SwaggerConfig::default(),
metrics: MetricsConfig::default(),
auth: AuthConfig {
enabled: true,
issuer: "https://tenant.example.com/".into(),
..Default::default()
},
datasets: vec![DatasetConfig {
name: "x".into(),
source: SourceConfig {
kind: SourceKind::Parquet,
location: file.to_string_lossy().into_owned(),
},
s3: None,
index: IndexConfig::default(),
columns: vec![],
dict_encode: true,
lazy: false,
}],
};
assert!(cfg.validate().is_ok());
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn validate_rejects_quack_non_local_host_without_override() {
let cfg = AppConfig {
server: ServerConfig {
quack: QuackConfig {
enabled: true,
uri: "quack:127.0.0.1".into(),
token: Some("test-token".into()),
..Default::default()
},
..Default::default()
},
docs: DocsConfig::default(),
swagger: SwaggerConfig::default(),
metrics: MetricsConfig::default(),
auth: AuthConfig::default(),
datasets: vec![DatasetConfig {
name: "x".into(),
source: SourceConfig {
kind: SourceKind::Parquet,
location: "/tmp/missing.parquet".into(),
},
s3: None,
index: IndexConfig::default(),
columns: vec![],
dict_encode: true,
lazy: false,
}],
};
let err = cfg.validate().unwrap_err();
assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
}
#[test]
fn validate_rejects_bad_dataset_name() {
let cfg: AppConfig = toml::from_str(
r#"
[[dataset]]
name = "bad name!"
source.kind = "parquet"
source.location = "/tmp/whatever"
"#,
)
.unwrap();
let err = cfg.validate().unwrap_err();
assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
}
#[test]
fn validate_rejects_duplicate_names() {
use std::io::Write;
let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let f = dir.join("a.parquet");
std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
let path = f.to_str().unwrap();
let cfg: AppConfig = toml::from_str(&format!(
r#"
[[dataset]]
name = "a"
source.kind = "parquet"
source.location = "{path}"
[[dataset]]
name = "a"
source.kind = "parquet"
source.location = "{path}"
"#
))
.unwrap();
let err = cfg.validate().expect_err("expected error");
assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn s3_bucket_parsing() {
let mk = |loc: &str| SourceConfig {
kind: SourceKind::Parquet,
location: loc.into(),
};
let s1 = mk("s3://bucket/path/key");
assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
let s2 = mk("s3://only-bucket");
assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
assert!(mk("s3:///nokey").s3_bucket().is_err());
assert!(mk("/local/path").s3_bucket().is_err());
}
#[test]
fn env_prefix_sanitises_name() {
let mk = |name: &str| DatasetConfig {
name: name.into(),
source: SourceConfig {
kind: SourceKind::Parquet,
location: "x".into(),
},
s3: None,
index: IndexConfig::default(),
columns: vec![],
dict_encode: true,
lazy: false,
};
assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
}
#[test]
fn resolve_local_parquet_single_file_and_dir() {
use std::io::Write;
let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let f = dir.join("a.parquet");
let mut fh = std::fs::File::create(&f).unwrap();
fh.write_all(b"not really parquet").unwrap();
let mk = |loc: &str| DatasetConfig {
name: "ds".into(),
source: SourceConfig {
kind: SourceKind::Parquet,
location: loc.into(),
},
s3: None,
index: IndexConfig::default(),
columns: vec![],
dict_encode: true,
lazy: false,
};
let files = mk(f.to_str().unwrap())
.resolve_local_parquet_files()
.unwrap();
assert_eq!(files, vec![f.clone()]);
let files = mk(dir.to_str().unwrap())
.resolve_local_parquet_files()
.unwrap();
assert_eq!(files, vec![f.clone()]);
assert!(
mk("/no/such/place.parquet")
.resolve_local_parquet_files()
.is_err()
);
let _ = std::fs::remove_dir_all(&dir);
}
}