1use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::{Deserialize, Serialize};
29
30use crate::errors::AppError;
31
32static SOURCE_CONFIG_PATH: std::sync::OnceLock<PathBuf> = std::sync::OnceLock::new();
37
38pub fn source_config_path() -> Option<&'static std::path::Path> {
40 SOURCE_CONFIG_PATH.get().map(|p| p.as_path())
41}
42
43const RESERVED_MOUNTS: &[&str] = &[
47 "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
48];
49
50#[derive(Debug, Deserialize)]
55pub struct AppConfig {
56 #[serde(default)]
57 pub server: ServerConfig,
58 #[serde(default)]
59 pub docs: DocsConfig,
60 #[serde(default)]
61 pub swagger: SwaggerConfig,
62 #[serde(default)]
63 pub metrics: MetricsConfig,
64 #[serde(default)]
65 pub explorer: ExplorerConfig,
66 #[serde(default)]
67 pub sql: SqlConfig,
68 #[serde(default)]
69 pub datafusion: DataFusionConfig,
70 #[serde(default)]
71 pub auth: AuthConfig,
72 #[serde(rename = "dataset", default)]
73 pub datasets: Vec<DatasetConfig>,
74}
75
76#[derive(Debug, Deserialize)]
77#[serde(default)]
78pub struct ServerConfig {
79 pub backend: Backend,
81 pub listen: IpAddr,
84 pub port: u16,
86 pub workers: Option<usize>,
88 pub prefix: String,
94 pub compress: bool,
99 pub max_body_bytes: usize,
104 pub max_page_size: u64,
108 pub force_lazy_above_mb: u64,
118 pub request_timeout_ms: u64,
122 pub shutdown_timeout_secs: u64,
127 pub quack: QuackConfig,
130 pub pgwire: PgwireConfig,
134}
135
136impl Default for ServerConfig {
137 fn default() -> Self {
138 Self {
139 backend: Backend::default(),
140 listen: IpAddr::from([127, 0, 0, 1]),
141 port: 8080,
142 workers: None,
143 prefix: String::new(),
144 compress: true,
145 max_body_bytes: 1024 * 1024,
146 max_page_size: 100_000,
147 force_lazy_above_mb: 0,
148 request_timeout_ms: 30_000,
149 shutdown_timeout_secs: 30,
150 quack: QuackConfig::default(),
151 pgwire: PgwireConfig::default(),
152 }
153 }
154}
155
156#[derive(Debug, Clone, Deserialize)]
162#[serde(default)]
163pub struct QuackConfig {
164 pub enabled: bool,
167 pub uri: String,
170 pub token: Option<String>,
173 pub allow_other_hostname: bool,
176 pub read_only: bool,
179}
180
181impl Default for QuackConfig {
182 fn default() -> Self {
183 Self {
184 enabled: false,
185 uri: "quack:localhost".into(),
186 token: None,
187 allow_other_hostname: false,
188 read_only: true,
189 }
190 }
191}
192
193impl QuackConfig {
194 pub fn validate_enabled(&self) -> Result<(), AppError> {
198 if self.uri.trim().is_empty() {
199 return Err(AppError::Internal(
200 "server.quack.uri must not be empty when server.quack.enabled = true".into(),
201 ));
202 }
203 if !self.uri.starts_with("quack:") {
204 return Err(AppError::Internal(format!(
205 "server.quack.uri must start with 'quack:' (got '{}')",
206 self.uri
207 )));
208 }
209 if !self.allow_other_hostname {
210 let host = self.hostname().unwrap_or_default();
211 if host != "localhost" {
212 return Err(AppError::Internal(format!(
213 "server.quack.uri host must be 'localhost' unless \
214 server.quack.allow_other_hostname = true (got '{}')",
215 self.uri
216 )));
217 }
218 }
219 if let Some(token) = self.token.as_deref()
220 && token.len() < 4
221 {
222 return Err(AppError::Internal(
223 "server.quack.token must be at least 4 characters".into(),
224 ));
225 }
226 Ok(())
227 }
228
229 fn hostname(&self) -> Option<&str> {
230 let rest = self.uri.strip_prefix("quack:")?;
231 let rest = rest.strip_prefix("//").unwrap_or(rest);
232 let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
233 (!host.is_empty()).then_some(host)
234 }
235}
236
237#[derive(Debug, Clone, Deserialize)]
244#[serde(default)]
245pub struct PgwireConfig {
246 pub enabled: bool,
248 pub listen: IpAddr,
252 pub port: u16,
254 pub username: String,
256 pub password: Option<String>,
259 pub tls_cert: Option<PathBuf>,
261 pub tls_key: Option<PathBuf>,
263}
264
265impl Default for PgwireConfig {
266 fn default() -> Self {
267 Self {
268 enabled: false,
269 listen: IpAddr::from([127, 0, 0, 1]),
270 port: 5432,
271 username: "datapress".into(),
272 password: None,
273 tls_cert: None,
274 tls_key: None,
275 }
276 }
277}
278
279impl PgwireConfig {
280 pub fn validate_enabled(&self) -> Result<(), AppError> {
291 let is_loopback = self.listen.is_loopback();
292 let tls_configured = match (self.tls_cert.as_ref(), self.tls_key.as_ref()) {
293 (Some(_), Some(_)) => true,
294 (None, None) => false,
295 _ => {
296 return Err(AppError::Internal(
297 "server.pgwire.tls_cert and server.pgwire.tls_key must be set together \
298 (both or neither)"
299 .into(),
300 ));
301 }
302 };
303
304 if !is_loopback && self.password.is_none() {
305 return Err(AppError::Internal(format!(
306 "server.pgwire.password is required when server.pgwire.listen is not a \
307 loopback address (got '{}')",
308 self.listen
309 )));
310 }
311
312 if !is_loopback && !tls_configured {
313 return Err(AppError::Internal(format!(
314 "server.pgwire requires TLS (server.pgwire.tls_cert + tls_key) when \
315 server.pgwire.listen is not a loopback address (got '{}'): cleartext \
316 password auth must not cross a plaintext connection off the host",
317 self.listen
318 )));
319 }
320
321 Ok(())
322 }
323}
324
325#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
326#[serde(rename_all = "lowercase")]
327pub enum Backend {
328 #[default]
329 Datafusion,
330 Duckdb,
331}
332
333#[derive(Debug, Clone, Deserialize)]
343#[serde(default, deny_unknown_fields)]
344pub struct DocsConfig {
345 pub enabled: bool,
346 pub path: String,
347}
348
349impl Default for DocsConfig {
350 fn default() -> Self {
351 Self {
352 enabled: true,
353 path: "/mkdocs".into(),
354 }
355 }
356}
357
358#[derive(Debug, Clone, Deserialize)]
375#[serde(default, deny_unknown_fields)]
376pub struct SwaggerConfig {
377 pub enabled: bool,
378 pub path: String,
379 pub oauth2: Option<SwaggerOAuth2Config>,
380}
381
382impl Default for SwaggerConfig {
383 fn default() -> Self {
384 Self {
385 enabled: true,
386 path: "/docs".into(),
387 oauth2: None,
388 }
389 }
390}
391
392#[derive(Debug, Clone, Deserialize)]
402#[serde(deny_unknown_fields)]
403pub struct SwaggerOAuth2Config {
404 pub issuer: String,
408 pub client_id: String,
413 #[serde(default)]
417 pub scopes: Vec<String>,
418 #[serde(default = "default_true")]
421 pub pkce: bool,
422}
423
424#[derive(Debug, Clone, Deserialize)]
444#[serde(default, deny_unknown_fields)]
445pub struct MetricsConfig {
446 pub enabled: bool,
447 pub path: String,
448}
449
450impl Default for MetricsConfig {
451 fn default() -> Self {
452 Self {
453 enabled: false,
454 path: "/metrics".into(),
455 }
456 }
457}
458
459#[derive(Debug, Clone, Deserialize)]
472#[serde(default, deny_unknown_fields)]
473pub struct ExplorerConfig {
474 pub enabled: bool,
475 pub path: String,
476}
477
478impl Default for ExplorerConfig {
479 fn default() -> Self {
480 Self {
481 enabled: true,
482 path: "/explore".into(),
483 }
484 }
485}
486
487#[derive(Debug, Clone, Deserialize)]
505#[serde(default, deny_unknown_fields)]
506pub struct SqlConfig {
507 pub enabled: bool,
509 pub max_rows: u64,
513}
514
515impl Default for SqlConfig {
516 fn default() -> Self {
517 Self {
518 enabled: false,
519 max_rows: 100_000,
520 }
521 }
522}
523
524#[derive(Debug, Clone, Deserialize)]
531#[serde(default, deny_unknown_fields)]
532pub struct DataFusionConfig {
533 pub pushdown_filters: bool,
539 pub reorder_filters: bool,
543 pub list_files_cache: bool,
547 pub list_files_cache_mb: usize,
550 pub list_files_cache_ttl_secs: u64,
554}
555
556impl Default for DataFusionConfig {
557 fn default() -> Self {
558 Self {
559 pushdown_filters: false,
560 reorder_filters: false,
561 list_files_cache: false,
562 list_files_cache_mb: 64,
563 list_files_cache_ttl_secs: 60,
564 }
565 }
566}
567
568#[derive(Debug, Clone, Deserialize)]
585#[serde(default, deny_unknown_fields)]
586pub struct AuthConfig {
587 pub enabled: bool,
589 pub issuer: String,
592 pub audience: String,
595 pub read_scopes: Vec<String>,
599 pub reload_scopes: Vec<String>,
602 pub anonymous_read: bool,
605 pub start_degraded: bool,
610 pub algorithms: Vec<String>,
614 pub leeway_secs: u64,
616 pub jwks_refresh_secs: u64,
620 pub tenant_claim: String,
625 pub allowed_tenants: Vec<String>,
629 pub admin_token_fallback: bool,
634}
635
636impl Default for AuthConfig {
637 fn default() -> Self {
638 Self {
639 enabled: false,
640 issuer: String::new(),
641 audience: String::new(),
642 read_scopes: Vec::new(),
643 reload_scopes: Vec::new(),
644 anonymous_read: false,
645 start_degraded: true,
646 algorithms: vec!["RS256".into()],
647 leeway_secs: 60,
648 jwks_refresh_secs: 3600,
649 tenant_claim: String::new(),
650 allowed_tenants: Vec::new(),
651 admin_token_fallback: true,
652 }
653 }
654}
655
656impl Backend {
657 pub fn as_str(self) -> &'static str {
658 match self {
659 Backend::Datafusion => "datafusion",
660 Backend::Duckdb => "duckdb",
661 }
662 }
663}
664
665#[derive(Debug, Clone, Deserialize, Serialize)]
666pub struct DatasetConfig {
667 pub name: String,
668 pub source: SourceConfig,
669 #[serde(default)]
670 pub s3: Option<S3Config>,
671 #[serde(default)]
672 pub index: IndexConfig,
673 #[serde(default)]
679 pub columns: Vec<String>,
680 #[serde(default = "default_true")]
687 pub dict_encode: bool,
688 #[serde(default)]
696 pub lazy: bool,
697 #[serde(default)]
702 pub predicate_filter: ColumnFilter,
703 #[serde(default)]
710 pub projection_filter: ColumnFilter,
711}
712
713fn default_true() -> bool {
714 true
715}
716
717#[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq)]
726#[serde(default, deny_unknown_fields)]
727pub struct ColumnFilter {
728 #[serde(default)]
729 pub include: Vec<String>,
730 #[serde(default)]
731 pub exclude: Vec<String>,
732}
733
734impl ColumnFilter {
735 pub fn is_active(&self) -> bool {
737 !self.include.is_empty() || !self.exclude.is_empty()
738 }
739
740 pub fn allows(&self, col: &str) -> bool {
743 let lc = col.to_lowercase();
744 if !self.include.is_empty() {
745 return self.include.iter().any(|c| c.to_lowercase() == lc);
746 }
747 if !self.exclude.is_empty() {
748 return !self.exclude.iter().any(|c| c.to_lowercase() == lc);
749 }
750 true
751 }
752
753 pub fn validate(&self, dataset: &str, ctx: &str) -> Result<(), AppError> {
756 if !self.include.is_empty() && !self.exclude.is_empty() {
757 return Err(AppError::InvalidValue(format!(
758 "dataset '{dataset}': {ctx} may set 'include' or 'exclude', not both"
759 )));
760 }
761 Ok(())
762 }
763
764 pub fn listed(&self) -> &[String] {
768 if !self.include.is_empty() {
769 &self.include
770 } else {
771 &self.exclude
772 }
773 }
774}
775
776#[derive(Debug, Clone, Deserialize, Serialize)]
777pub struct SourceConfig {
778 pub kind: SourceKind,
779 pub location: String,
781}
782
783#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
784#[serde(rename_all = "lowercase")]
785pub enum SourceKind {
786 #[default]
787 Parquet,
788 Delta,
789}
790
791impl SourceKind {
792 pub fn as_str(self) -> &'static str {
793 match self {
794 SourceKind::Parquet => "parquet",
795 SourceKind::Delta => "delta",
796 }
797 }
798}
799
800#[derive(Debug, Clone, Deserialize, Serialize)]
803#[serde(default)]
804pub struct S3Config {
805 pub region: Option<String>,
806 pub endpoint: Option<String>,
808 pub addressing_style: AddressingStyle,
811 pub allow_http: bool,
813 pub access_key_id: Option<String>,
816 pub secret_access_key: Option<String>,
817 pub session_token: Option<String>,
818 pub partitioning: Partitioning,
821 pub endpoint_bucket_in_host: BucketInHost,
825}
826
827impl Default for S3Config {
828 fn default() -> Self {
829 Self {
830 region: None,
831 endpoint: None,
832 addressing_style: AddressingStyle::Virtual,
833 allow_http: false,
834 access_key_id: None,
835 secret_access_key: None,
836 session_token: None,
837 partitioning: Partitioning::Auto,
838 endpoint_bucket_in_host: BucketInHost::Auto,
839 }
840 }
841}
842
843impl S3Config {
844 pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
853 let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
854
855 let fold = match self.endpoint_bucket_in_host {
856 BucketInHost::False => false,
857 BucketInHost::True => true,
858 BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
859 };
860 if !fold {
861 return Some(ep.to_string());
862 }
863
864 let (scheme, host_and_path) = match ep.split_once("://") {
865 Some((s, rest)) => (Some(s), rest),
866 None => (None, ep),
867 };
868 let (host, path) = match host_and_path.split_once('/') {
870 Some((h, p)) => (h, Some(p)),
871 None => (host_and_path, None),
872 };
873 if host == bucket || host.starts_with(&format!("{bucket}.")) {
875 return Some(ep.to_string());
876 }
877 let new_host = format!("{bucket}.{host}");
878 let rebuilt = match (scheme, path) {
879 (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
880 (Some(s), None) => format!("{s}://{new_host}"),
881 (None, Some(p)) => format!("{new_host}/{p}"),
882 (None, None) => new_host,
883 };
884 Some(rebuilt)
885 }
886}
887
888#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
889#[serde(rename_all = "lowercase")]
890pub enum AddressingStyle {
891 #[default]
892 Virtual,
893 Path,
894}
895
896impl AddressingStyle {
897 pub fn as_str(self) -> &'static str {
898 match self {
899 AddressingStyle::Virtual => "virtual",
900 AddressingStyle::Path => "path",
901 }
902 }
903}
904
905#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
909#[serde(rename_all = "lowercase")]
910pub enum Partitioning {
911 #[default]
914 Auto,
915 Hive,
918 None,
921}
922
923impl Partitioning {
924 pub fn as_str(self) -> &'static str {
925 match self {
926 Partitioning::Auto => "auto",
927 Partitioning::Hive => "hive",
928 Partitioning::None => "none",
929 }
930 }
931}
932
933#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
938#[serde(rename_all = "lowercase")]
939pub enum BucketInHost {
940 #[default]
943 Auto,
944 True,
946 False,
948}
949
950impl BucketInHost {
951 pub fn as_str(self) -> &'static str {
952 match self {
953 BucketInHost::Auto => "auto",
954 BucketInHost::True => "true",
955 BucketInHost::False => "false",
956 }
957 }
958}
959
960#[derive(Debug, Clone, Deserialize, Serialize)]
961#[serde(default)]
962pub struct IndexConfig {
963 pub mode: IndexMode,
964 pub columns: Vec<String>,
965 pub max_cardinality: usize,
966}
967
968impl Default for IndexConfig {
969 fn default() -> Self {
970 Self {
971 mode: IndexMode::Auto,
972 columns: Vec::new(),
973 max_cardinality: 100_000,
974 }
975 }
976}
977
978#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
979#[serde(rename_all = "lowercase")]
980pub enum IndexMode {
981 #[default]
982 Auto,
983 None,
984 List,
985}
986
987#[derive(Debug, Clone, Default)]
990pub struct ResolvedCreds {
991 pub access_key_id: Option<String>,
992 pub secret_access_key: Option<String>,
993 pub session_token: Option<String>,
994}
995
996impl ResolvedCreds {
997 pub fn has_keypair(&self) -> bool {
998 self.access_key_id.is_some() && self.secret_access_key.is_some()
999 }
1000}
1001
1002impl AppConfig {
1007 pub fn load(path: &str) -> Result<Self, AppError> {
1009 let raw = std::fs::read_to_string(path)
1010 .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
1011 let mut cfg: AppConfig =
1012 toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
1013 cfg.normalize();
1014 cfg.validate()?;
1015 let _ = SOURCE_CONFIG_PATH.set(PathBuf::from(path));
1019 Ok(cfg)
1020 }
1021
1022 fn normalize(&mut self) {
1030 for s in self
1031 .auth
1032 .read_scopes
1033 .iter_mut()
1034 .chain(self.auth.reload_scopes.iter_mut())
1035 {
1036 *s = s.to_ascii_lowercase();
1037 }
1038 }
1039
1040 fn validate(&self) -> Result<(), AppError> {
1041 let p = &self.server.prefix;
1043 if !p.is_empty() {
1044 if !p.starts_with('/') {
1045 return Err(AppError::Internal(format!(
1046 "server.prefix must start with '/' (got '{p}')"
1047 )));
1048 }
1049 if p.ends_with('/') {
1050 return Err(AppError::Internal(format!(
1051 "server.prefix must not end with '/' (got '{p}')"
1052 )));
1053 }
1054 }
1055
1056 if self.datasets.is_empty() {
1057 return Err(AppError::Internal(
1058 "datasets.toml has no [[dataset]] entries".into(),
1059 ));
1060 }
1061
1062 if self.server.quack.enabled {
1063 self.server.quack.validate_enabled()?;
1064 }
1065
1066 if self.server.pgwire.enabled {
1067 self.server.pgwire.validate_enabled()?;
1068 }
1069
1070 {
1073 let dp = &self.docs.path;
1074 if !dp.starts_with('/') {
1075 return Err(AppError::Internal(format!(
1076 "docs.path must start with '/' (got '{dp}')"
1077 )));
1078 }
1079 if dp.len() > 1 && dp.ends_with('/') {
1080 return Err(AppError::Internal(format!(
1081 "docs.path must not end with '/' (got '{dp}')"
1082 )));
1083 }
1084 if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
1085 return Err(AppError::Internal(format!(
1086 "docs.path '{dp}' collides with a reserved route"
1087 )));
1088 }
1089 }
1090
1091 {
1093 let sp = &self.swagger.path;
1094 if !sp.starts_with('/') {
1095 return Err(AppError::Internal(format!(
1096 "swagger.path must start with '/' (got '{sp}')"
1097 )));
1098 }
1099 if sp.len() > 1 && sp.ends_with('/') {
1100 return Err(AppError::Internal(format!(
1101 "swagger.path must not end with '/' (got '{sp}')"
1102 )));
1103 }
1104 if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
1105 return Err(AppError::Internal(format!(
1106 "swagger.path '{sp}' collides with a reserved route"
1107 )));
1108 }
1109 if sp == &self.docs.path {
1110 return Err(AppError::Internal(format!(
1111 "swagger.path and docs.path must differ (both '{sp}')"
1112 )));
1113 }
1114 if let Some(o) = &self.swagger.oauth2 {
1115 if o.issuer.trim().is_empty() {
1116 return Err(AppError::Internal(
1117 "swagger.oauth2.issuer must not be empty".into(),
1118 ));
1119 }
1120 if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
1121 return Err(AppError::Internal(format!(
1122 "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
1123 o.issuer
1124 )));
1125 }
1126 if o.client_id.trim().is_empty() {
1127 return Err(AppError::Internal(
1128 "swagger.oauth2.client_id must not be empty".into(),
1129 ));
1130 }
1131 }
1132 }
1133
1134 {
1140 let mp = &self.metrics.path;
1141 if !mp.starts_with('/') {
1142 return Err(AppError::Internal(format!(
1143 "metrics.path must start with '/' (got '{mp}')"
1144 )));
1145 }
1146 if mp.len() > 1 && mp.ends_with('/') {
1147 return Err(AppError::Internal(format!(
1148 "metrics.path must not end with '/' (got '{mp}')"
1149 )));
1150 }
1151 if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
1152 return Err(AppError::Internal(format!(
1153 "metrics.path '{mp}' collides with a reserved route"
1154 )));
1155 }
1156 if mp == &self.docs.path {
1157 return Err(AppError::Internal(format!(
1158 "metrics.path and docs.path must differ (both '{mp}')"
1159 )));
1160 }
1161 if mp == &self.swagger.path {
1162 return Err(AppError::Internal(format!(
1163 "metrics.path and swagger.path must differ (both '{mp}')"
1164 )));
1165 }
1166 }
1167
1168 {
1171 let ep = &self.explorer.path;
1172 if !ep.starts_with('/') {
1173 return Err(AppError::Internal(format!(
1174 "explorer.path must start with '/' (got '{ep}')"
1175 )));
1176 }
1177 if ep.len() > 1 && ep.ends_with('/') {
1178 return Err(AppError::Internal(format!(
1179 "explorer.path must not end with '/' (got '{ep}')"
1180 )));
1181 }
1182 if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
1183 return Err(AppError::Internal(format!(
1184 "explorer.path '{ep}' collides with a reserved route"
1185 )));
1186 }
1187 if ep == &self.docs.path {
1188 return Err(AppError::Internal(format!(
1189 "explorer.path and docs.path must differ (both '{ep}')"
1190 )));
1191 }
1192 if ep == &self.swagger.path {
1193 return Err(AppError::Internal(format!(
1194 "explorer.path and swagger.path must differ (both '{ep}')"
1195 )));
1196 }
1197 if ep == &self.metrics.path {
1198 return Err(AppError::Internal(format!(
1199 "explorer.path and metrics.path must differ (both '{ep}')"
1200 )));
1201 }
1202 }
1203
1204 if self.auth.enabled {
1209 let a = &self.auth;
1210 if a.issuer.trim().is_empty() {
1211 return Err(AppError::Internal(
1212 "auth.issuer must not be empty when auth.enabled = true".into(),
1213 ));
1214 }
1215 if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
1216 return Err(AppError::Internal(format!(
1217 "auth.issuer must be an absolute http(s) URL (got '{}')",
1218 a.issuer
1219 )));
1220 }
1221 for alg in &a.algorithms {
1222 match alg.as_str() {
1223 "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
1224 | "PS512" => {}
1225 other => {
1226 return Err(AppError::Internal(format!(
1227 "auth.algorithms[{other}] is not allowed; pick one of \
1228 RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
1229 )));
1230 }
1231 }
1232 }
1233 if a.algorithms.is_empty() {
1234 return Err(AppError::Internal(
1235 "auth.algorithms must not be empty".into(),
1236 ));
1237 }
1238 if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
1239 return Err(AppError::Internal(format!(
1240 "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
1241 a.tenant_claim
1242 )));
1243 }
1244 if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
1245 return Err(AppError::Internal(
1246 "auth.allowed_tenants is set but auth.tenant_claim is empty — \
1247 can't enforce a tenant allow-list without a claim to extract from"
1248 .into(),
1249 ));
1250 }
1251 }
1252
1253 let mut seen = HashSet::new();
1254 for d in &self.datasets {
1255 if !seen.insert(d.name.as_str()) {
1256 return Err(AppError::Internal(format!(
1257 "duplicate dataset name: {}",
1258 d.name
1259 )));
1260 }
1261 if d.name.is_empty() {
1262 return Err(AppError::Internal("dataset name must not be empty".into()));
1263 }
1264 if !d
1266 .name
1267 .chars()
1268 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1269 {
1270 return Err(AppError::Internal(format!(
1271 "dataset name '{}' must be alphanumeric (plus _ - .)",
1272 d.name
1273 )));
1274 }
1275
1276 if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
1277 return Err(AppError::Internal(format!(
1278 "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1279 d.name
1280 )));
1281 }
1282
1283 if d.source.is_s3() {
1285 d.source.s3_bucket()?;
1286 if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1287 && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1288 && std::env::var("AWS_REGION").is_err()
1289 && std::env::var("AWS_DEFAULT_REGION").is_err()
1290 {
1291 log::warn!(
1292 "dataset '{}': S3 source without explicit region — \
1293 relying on AWS_REGION env var",
1294 d.name
1295 );
1296 }
1297 } else {
1298 match d.source.kind {
1302 SourceKind::Parquet => {
1303 d.resolve_local_parquet_files()?;
1304 }
1305 SourceKind::Delta => {
1306 let p = Path::new(&d.source.location);
1307 if !p.exists() {
1308 return Err(AppError::Internal(format!(
1309 "dataset '{}': delta location does not exist: {}",
1310 d.name, d.source.location
1311 )));
1312 }
1313 }
1314 }
1315 }
1316 }
1317 Ok(())
1318 }
1319}
1320
1321impl SourceConfig {
1322 pub fn is_s3(&self) -> bool {
1323 self.location.starts_with("s3://")
1324 }
1325
1326 pub fn has_glob(&self) -> bool {
1329 self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1330 }
1331
1332 pub fn s3_recursive_parquet_glob(&self) -> String {
1338 if !self.is_s3() || self.has_glob() {
1339 return self.location.clone();
1340 }
1341 let trimmed = self.location.trim_end_matches('/');
1342 format!("{trimmed}/**/*.parquet")
1343 }
1344
1345 pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1347 let rest = self
1348 .location
1349 .strip_prefix("s3://")
1350 .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1351 let (bucket, key) = match rest.split_once('/') {
1352 Some((b, k)) => (b, k),
1353 None => (rest, ""),
1354 };
1355 if bucket.is_empty() {
1356 return Err(AppError::Internal(format!(
1357 "s3 URL missing bucket: {}",
1358 self.location
1359 )));
1360 }
1361 Ok((bucket, key))
1362 }
1363}
1364
1365impl DatasetConfig {
1366 pub fn validate_for_register(&self) -> Result<(), AppError> {
1372 if self.name.is_empty() {
1373 return Err(AppError::InvalidValue(
1374 "dataset name must not be empty".into(),
1375 ));
1376 }
1377 if !self
1378 .name
1379 .chars()
1380 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1381 {
1382 return Err(AppError::InvalidValue(format!(
1383 "dataset name '{}' must be alphanumeric (plus _ - .)",
1384 self.name
1385 )));
1386 }
1387 if self.index.mode == IndexMode::List && self.index.columns.is_empty() {
1388 return Err(AppError::InvalidValue(format!(
1389 "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1390 self.name
1391 )));
1392 }
1393 self.predicate_filter.validate(&self.name, "predicate_filter")?;
1394 self.projection_filter
1395 .validate(&self.name, "projection_filter")?;
1396 if self.source.is_s3() {
1397 self.source.s3_bucket()?;
1398 }
1399 Ok(())
1400 }
1401
1402 pub fn to_toml_block(&self) -> Result<String, AppError> {
1408 #[derive(Serialize)]
1409 struct Block {
1410 name: String,
1411 #[serde(skip_serializing_if = "Vec::is_empty")]
1412 columns: Vec<String>,
1413 dict_encode: bool,
1414 lazy: bool,
1415 source: SourceConfig,
1416 #[serde(skip_serializing_if = "Option::is_none")]
1417 s3: Option<S3Config>,
1418 #[serde(skip_serializing_if = "Option::is_none")]
1419 index: Option<IndexConfig>,
1420 #[serde(skip_serializing_if = "Option::is_none")]
1421 predicate_filter: Option<ColumnFilter>,
1422 #[serde(skip_serializing_if = "Option::is_none")]
1423 projection_filter: Option<ColumnFilter>,
1424 }
1425 #[derive(Serialize)]
1426 struct Doc {
1427 dataset: [Block; 1],
1428 }
1429 let doc = Doc {
1430 dataset: [Block {
1431 name: self.name.clone(),
1432 columns: self.columns.clone(),
1433 dict_encode: self.dict_encode,
1434 lazy: self.lazy,
1435 source: self.source.clone(),
1436 s3: self.s3.clone(),
1437 index: if self.index.is_default() {
1438 None
1439 } else {
1440 Some(self.index.clone())
1441 },
1442 predicate_filter: self
1443 .predicate_filter
1444 .is_active()
1445 .then(|| self.predicate_filter.clone()),
1446 projection_filter: self
1447 .projection_filter
1448 .is_active()
1449 .then(|| self.projection_filter.clone()),
1450 }],
1451 };
1452 toml::to_string_pretty(&doc)
1453 .map_err(|e| AppError::Internal(format!("failed to render dataset TOML: {e}")))
1454 }
1455
1456 pub fn persist_to_source_config(&self) -> Result<PathBuf, AppError> {
1466 use std::io::Write;
1467 let path = source_config_path().ok_or_else(|| {
1468 AppError::InvalidValue("server has no on-disk config file to append to".into())
1469 })?;
1470 let block = self.to_toml_block()?;
1471 let mut file = std::fs::OpenOptions::new()
1472 .append(true)
1473 .open(path)
1474 .map_err(|e| {
1475 AppError::Internal(format!("failed to open config {}: {e}", path.display()))
1476 })?;
1477 write!(file, "\n{block}").map_err(|e| {
1479 AppError::Internal(format!("failed to write config {}: {e}", path.display()))
1480 })?;
1481 Ok(path.to_path_buf())
1482 }
1483}
1484
1485impl IndexConfig {
1486 fn is_default(&self) -> bool {
1489 self.mode == IndexMode::Auto && self.columns.is_empty() && self.max_cardinality == 100_000
1490 }
1491}
1492
1493impl DatasetConfig {
1494 pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1504 if self.source.is_s3() {
1505 return Err(AppError::Internal(format!(
1506 "dataset '{}': resolve_local_parquet_files called on s3 source",
1507 self.name
1508 )));
1509 }
1510 let loc = &self.source.location;
1511
1512 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1514 let mut files: Vec<PathBuf> = glob::glob(loc)
1515 .map_err(|e| {
1516 AppError::Internal(format!(
1517 "dataset '{}': bad glob pattern '{loc}': {e}",
1518 self.name
1519 ))
1520 })?
1521 .filter_map(|r| r.ok())
1522 .filter(|p| {
1523 p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1524 })
1525 .collect();
1526 files.sort();
1527 if files.is_empty() {
1528 return Err(AppError::EmptyDataset(format!(
1529 "dataset '{}': glob '{loc}' matched no .parquet files",
1530 self.name
1531 )));
1532 }
1533 return Ok(files);
1534 }
1535
1536 let path = Path::new(loc);
1537 if !path.exists() {
1538 return Err(AppError::Internal(format!(
1539 "dataset '{}': source path does not exist: {loc}",
1540 self.name
1541 )));
1542 }
1543
1544 if path.is_file() {
1545 if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1546 return Err(AppError::Internal(format!(
1547 "dataset '{}': source must be a .parquet file",
1548 self.name
1549 )));
1550 }
1551 return Ok(vec![path.to_path_buf()]);
1552 }
1553
1554 let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1555 .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1556 .filter_map(|entry| entry.ok().map(|e| e.path()))
1557 .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1558 .collect();
1559 files.sort();
1560 if files.is_empty() {
1561 return Err(AppError::EmptyDataset(format!(
1562 "dataset '{}': no *.parquet files found in {loc}",
1563 self.name
1564 )));
1565 }
1566 Ok(files)
1567 }
1568
1569 pub fn estimate_local_bytes(&self) -> Option<u64> {
1579 if self.source.is_s3() {
1580 return None;
1581 }
1582 match self.source.kind {
1583 SourceKind::Parquet => {
1584 let files = self.resolve_local_parquet_files().ok()?;
1585 Some(
1586 files
1587 .iter()
1588 .filter_map(|p| std::fs::metadata(p).ok())
1589 .map(|m| m.len())
1590 .sum(),
1591 )
1592 }
1593 SourceKind::Delta => {
1594 let root = self.source.location.trim_end_matches('/');
1595 let pattern = format!("{root}/**/*.parquet");
1596 let paths = glob::glob(&pattern).ok()?;
1597 Some(
1598 paths
1599 .filter_map(Result::ok)
1600 .filter_map(|p| std::fs::metadata(&p).ok())
1601 .filter(|m| m.is_file())
1602 .map(|m| m.len())
1603 .sum(),
1604 )
1605 }
1606 }
1607 }
1608
1609 pub fn force_lazy_bytes(&self, server: &ServerConfig) -> Option<u64> {
1616 if self.lazy || server.force_lazy_above_mb == 0 {
1617 return None;
1618 }
1619 let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1620 match self.estimate_local_bytes() {
1621 Some(bytes) if bytes > threshold => Some(bytes),
1622 _ => None,
1623 }
1624 }
1625
1626 pub fn env_prefix(&self) -> String {
1630 self.name
1631 .chars()
1632 .map(|c| {
1633 if c.is_ascii_alphanumeric() {
1634 c.to_ascii_uppercase()
1635 } else {
1636 '_'
1637 }
1638 })
1639 .collect()
1640 }
1641
1642 pub fn resolved_creds(&self) -> ResolvedCreds {
1647 let prefix = self.env_prefix();
1648 let from_env = |suffix: &str| {
1649 std::env::var(format!("{prefix}_{suffix}"))
1650 .ok()
1651 .filter(|s| !s.is_empty())
1652 };
1653 let inline = self.s3.as_ref();
1654 let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1655
1656 ResolvedCreds {
1657 access_key_id: from_env("AWS_ACCESS_KEY_ID")
1658 .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1659 .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1660 secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1661 .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1662 .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1663 session_token: from_env("AWS_SESSION_TOKEN")
1664 .or_else(|| inline.and_then(|s| s.session_token.clone()))
1665 .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1666 }
1667 }
1668
1669 pub fn resolved_region(&self) -> String {
1672 let prefix = self.env_prefix();
1673 std::env::var(format!("{prefix}_AWS_REGION"))
1674 .ok()
1675 .filter(|s| !s.is_empty())
1676 .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1677 .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1678 .or_else(|| {
1679 std::env::var("AWS_DEFAULT_REGION")
1680 .ok()
1681 .filter(|s| !s.is_empty())
1682 })
1683 .unwrap_or_else(|| "us-east-1".to_string())
1684 }
1685}
1686
1687#[cfg(test)]
1688mod tests {
1689 use super::*;
1690
1691 #[test]
1692 fn server_defaults() {
1693 let s = ServerConfig::default();
1694 assert_eq!(s.backend, Backend::Datafusion);
1695 assert_eq!(s.port, 8080);
1696 assert!(s.compress);
1697 assert_eq!(s.max_body_bytes, 1024 * 1024);
1698 assert_eq!(s.max_page_size, 100_000);
1699 assert_eq!(s.force_lazy_above_mb, 0);
1700 assert_eq!(s.request_timeout_ms, 30_000);
1701 assert!(!s.quack.enabled);
1702 assert_eq!(s.quack.uri, "quack:localhost");
1703 assert!(s.quack.token.is_none());
1704 assert!(!s.quack.allow_other_hostname);
1705 assert!(s.quack.read_only);
1706 assert_eq!(s.prefix, "");
1707 assert!(s.listen.is_loopback());
1708 }
1709
1710 #[test]
1711 fn server_overrides_from_toml() {
1712 let toml = r#"
1713 [server]
1714 backend = "duckdb"
1715 port = 9000
1716 prefix = "/datapress"
1717 compress = false
1718 max_body_bytes = 4096
1719 max_page_size = 50000
1720 force_lazy_above_mb = 256
1721 request_timeout_ms = 0
1722
1723 [server.quack]
1724 enabled = true
1725 uri = "quack:localhost:9495"
1726 token = "test-token"
1727 read_only = false
1728 [[dataset]]
1729 name = "x"
1730 source.kind = "parquet"
1731 source.location = "/tmp/missing.parquet"
1732 "#;
1733 let cfg: AppConfig = toml::from_str(toml).unwrap();
1734 assert_eq!(cfg.server.backend, Backend::Duckdb);
1735 assert_eq!(cfg.server.port, 9000);
1736 assert_eq!(cfg.server.prefix, "/datapress");
1737 assert!(!cfg.server.compress);
1738 assert_eq!(cfg.server.max_body_bytes, 4096);
1739 assert_eq!(cfg.server.max_page_size, 50_000);
1740 assert_eq!(cfg.server.force_lazy_above_mb, 256);
1741 assert_eq!(cfg.server.request_timeout_ms, 0);
1742 assert!(cfg.server.quack.enabled);
1743 assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1744 assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1745 assert!(!cfg.server.quack.read_only);
1746 assert_eq!(cfg.datasets.len(), 1);
1747 assert_eq!(cfg.datasets[0].name, "x");
1748 assert!(cfg.datasets[0].dict_encode); }
1750
1751 #[test]
1752 fn force_lazy_bytes_logic() {
1753 let dir = std::env::temp_dir().join(format!(
1756 "dp-force-lazy-{}-{}",
1757 std::process::id(),
1758 std::time::SystemTime::now()
1759 .duration_since(std::time::UNIX_EPOCH)
1760 .unwrap()
1761 .as_nanos()
1762 ));
1763 std::fs::create_dir_all(&dir).unwrap();
1764 let two_mib = 2 * 1024 * 1024;
1765 let file = dir.join("data.parquet");
1766 std::fs::write(&file, vec![0u8; two_mib]).unwrap();
1767
1768 let mk = |kind: SourceKind, location: &str, lazy: bool| DatasetConfig {
1769 name: "t".into(),
1770 source: SourceConfig {
1771 kind,
1772 location: location.to_string(),
1773 },
1774 s3: None,
1775 index: IndexConfig::default(),
1776 columns: vec![],
1777 dict_encode: true,
1778 lazy,
1779 predicate_filter: Default::default(),
1780 projection_filter: Default::default(),
1781 };
1782 let server = |mb: u64| ServerConfig {
1783 force_lazy_above_mb: mb,
1784 ..ServerConfig::default()
1785 };
1786
1787 let file_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), false);
1789 assert_eq!(file_ds.estimate_local_bytes(), Some(two_mib as u64));
1790 let dir_ds = mk(SourceKind::Parquet, dir.to_str().unwrap(), false);
1791 assert_eq!(dir_ds.estimate_local_bytes(), Some(two_mib as u64));
1792 let delta_ds = mk(SourceKind::Delta, dir.to_str().unwrap(), false);
1793 assert_eq!(delta_ds.estimate_local_bytes(), Some(two_mib as u64));
1794
1795 assert_eq!(file_ds.force_lazy_bytes(&server(0)), None);
1797 assert_eq!(file_ds.force_lazy_bytes(&server(1)), Some(two_mib as u64));
1799 assert_eq!(file_ds.force_lazy_bytes(&server(4)), None);
1801
1802 let lazy_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), true);
1804 assert_eq!(lazy_ds.force_lazy_bytes(&server(1)), None);
1805
1806 let s3_ds = mk(SourceKind::Parquet, "s3://bucket/data.parquet", false);
1808 assert_eq!(s3_ds.estimate_local_bytes(), None);
1809 assert_eq!(s3_ds.force_lazy_bytes(&server(1)), None);
1810
1811 std::fs::remove_dir_all(&dir).ok();
1812 }
1813
1814 #[test]
1815 fn validate_rejects_bad_prefix() {
1816 let bad = ["no-leading-slash", "/trailing/"];
1817 for p in bad {
1818 let cfg = AppConfig {
1819 server: ServerConfig {
1820 prefix: p.to_string(),
1821 ..Default::default()
1822 },
1823 docs: DocsConfig::default(),
1824 swagger: SwaggerConfig::default(),
1825 metrics: MetricsConfig::default(),
1826 explorer: ExplorerConfig::default(),
1827 sql: SqlConfig::default(),
1828 datafusion: DataFusionConfig::default(),
1829 auth: AuthConfig::default(),
1830 datasets: vec![],
1831 };
1832 assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1833 }
1834 }
1835
1836 #[test]
1837 fn normalize_lowercases_configured_scopes() {
1838 let mut cfg = AppConfig {
1839 server: ServerConfig::default(),
1840 docs: DocsConfig::default(),
1841 swagger: SwaggerConfig::default(),
1842 metrics: MetricsConfig::default(),
1843 explorer: ExplorerConfig::default(),
1844 sql: SqlConfig::default(),
1845 datafusion: DataFusionConfig::default(),
1846 auth: AuthConfig {
1847 read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1848 reload_scopes: vec!["Datasets:Reload".into()],
1849 ..Default::default()
1850 },
1851 datasets: vec![],
1852 };
1853 cfg.normalize();
1854 assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1855 assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1856 }
1857
1858 #[test]
1859 fn validate_rejects_no_datasets() {
1860 let cfg = AppConfig {
1861 server: ServerConfig::default(),
1862 docs: DocsConfig::default(),
1863 swagger: SwaggerConfig::default(),
1864 metrics: MetricsConfig::default(),
1865 explorer: ExplorerConfig::default(),
1866 sql: SqlConfig::default(),
1867 datafusion: DataFusionConfig::default(),
1868 auth: AuthConfig::default(),
1869 datasets: vec![],
1870 };
1871 let err = cfg.validate().unwrap_err();
1872 assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1873 }
1874
1875 #[cfg(feature = "auth")]
1876 #[test]
1877 fn validate_accepts_auth_issuer_with_trailing_slash() {
1878 use std::io::Write;
1879
1880 let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1881 let _ = std::fs::remove_dir_all(&dir);
1882 std::fs::create_dir_all(&dir).unwrap();
1883 let file = dir.join("a.parquet");
1884 std::fs::File::create(&file)
1885 .unwrap()
1886 .write_all(b"x")
1887 .unwrap();
1888
1889 let cfg = AppConfig {
1890 server: ServerConfig::default(),
1891 docs: DocsConfig::default(),
1892 swagger: SwaggerConfig::default(),
1893 metrics: MetricsConfig::default(),
1894 explorer: ExplorerConfig::default(),
1895 sql: SqlConfig::default(),
1896 datafusion: DataFusionConfig::default(),
1897 auth: AuthConfig {
1898 enabled: true,
1899 issuer: "https://tenant.example.com/".into(),
1900 ..Default::default()
1901 },
1902 datasets: vec![DatasetConfig {
1903 name: "x".into(),
1904 source: SourceConfig {
1905 kind: SourceKind::Parquet,
1906 location: file.to_string_lossy().into_owned(),
1907 },
1908 s3: None,
1909 index: IndexConfig::default(),
1910 columns: vec![],
1911 dict_encode: true,
1912 lazy: false,
1913 predicate_filter: Default::default(),
1914 projection_filter: Default::default(),
1915 }],
1916 };
1917
1918 assert!(cfg.validate().is_ok());
1919 let _ = std::fs::remove_dir_all(&dir);
1920 }
1921
1922 #[test]
1923 fn validate_rejects_quack_non_local_host_without_override() {
1924 let cfg = AppConfig {
1925 server: ServerConfig {
1926 quack: QuackConfig {
1927 enabled: true,
1928 uri: "quack:127.0.0.1".into(),
1929 token: Some("test-token".into()),
1930 ..Default::default()
1931 },
1932 ..Default::default()
1933 },
1934 docs: DocsConfig::default(),
1935 swagger: SwaggerConfig::default(),
1936 metrics: MetricsConfig::default(),
1937 explorer: ExplorerConfig::default(),
1938 sql: SqlConfig::default(),
1939 datafusion: DataFusionConfig::default(),
1940 auth: AuthConfig::default(),
1941 datasets: vec![DatasetConfig {
1942 name: "x".into(),
1943 source: SourceConfig {
1944 kind: SourceKind::Parquet,
1945 location: "/tmp/missing.parquet".into(),
1946 },
1947 s3: None,
1948 index: IndexConfig::default(),
1949 columns: vec![],
1950 dict_encode: true,
1951 lazy: false,
1952 predicate_filter: Default::default(),
1953 projection_filter: Default::default(),
1954 }],
1955 };
1956 let err = cfg.validate().unwrap_err();
1957 assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1958 }
1959
1960 #[test]
1961 fn validate_rejects_bad_dataset_name() {
1962 let cfg: AppConfig = toml::from_str(
1963 r#"
1964 [[dataset]]
1965 name = "bad name!"
1966 source.kind = "parquet"
1967 source.location = "/tmp/whatever"
1968 "#,
1969 )
1970 .unwrap();
1971 let err = cfg.validate().unwrap_err();
1972 assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1973 }
1974
1975 #[test]
1976 fn validate_rejects_duplicate_names() {
1977 use std::io::Write;
1978 let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1979 let _ = std::fs::remove_dir_all(&dir);
1980 std::fs::create_dir_all(&dir).unwrap();
1981 let f = dir.join("a.parquet");
1982 std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1983 let path = f.to_str().unwrap();
1984
1985 let cfg: AppConfig = toml::from_str(&format!(
1986 r#"
1987 [[dataset]]
1988 name = "a"
1989 source.kind = "parquet"
1990 source.location = "{path}"
1991 [[dataset]]
1992 name = "a"
1993 source.kind = "parquet"
1994 source.location = "{path}"
1995 "#
1996 ))
1997 .unwrap();
1998 let err = cfg.validate().expect_err("expected error");
1999 assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
2000
2001 let _ = std::fs::remove_dir_all(&dir);
2002 }
2003
2004 #[test]
2005 fn s3_bucket_parsing() {
2006 let mk = |loc: &str| SourceConfig {
2007 kind: SourceKind::Parquet,
2008 location: loc.into(),
2009 };
2010 let s1 = mk("s3://bucket/path/key");
2011 assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
2012 let s2 = mk("s3://only-bucket");
2013 assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
2014 assert!(mk("s3:///nokey").s3_bucket().is_err());
2015 assert!(mk("/local/path").s3_bucket().is_err());
2016 }
2017
2018 #[test]
2019 fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
2020 let mk = |loc: &str| SourceConfig {
2021 kind: SourceKind::Parquet,
2022 location: loc.into(),
2023 };
2024 assert_eq!(
2026 mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
2027 "s3://bucket/logs/**/*.parquet"
2028 );
2029 assert_eq!(
2030 mk("s3://bucket/logs").s3_recursive_parquet_glob(),
2031 "s3://bucket/logs/**/*.parquet"
2032 );
2033 assert_eq!(
2035 mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
2036 "s3://bucket/logs/*.parquet"
2037 );
2038 assert_eq!(
2040 mk("/local/logs").s3_recursive_parquet_glob(),
2041 "/local/logs"
2042 );
2043 }
2044
2045 #[test]
2046 fn effective_endpoint_folds_bucket_per_mode() {
2047 let virt = S3Config {
2048 endpoint: Some("https://s3.example.com".into()),
2049 addressing_style: AddressingStyle::Virtual,
2050 ..Default::default()
2051 };
2052 assert_eq!(
2054 virt.effective_endpoint("mybucket").as_deref(),
2055 Some("https://mybucket.s3.example.com")
2056 );
2057 let prefixed = S3Config {
2059 endpoint: Some("https://mybucket.s3.example.com".into()),
2060 ..virt.clone()
2061 };
2062 assert_eq!(
2063 prefixed.effective_endpoint("mybucket").as_deref(),
2064 Some("https://mybucket.s3.example.com")
2065 );
2066 let path = S3Config {
2068 addressing_style: AddressingStyle::Path,
2069 ..virt.clone()
2070 };
2071 assert_eq!(
2072 path.effective_endpoint("mybucket").as_deref(),
2073 Some("https://s3.example.com")
2074 );
2075 let forced_off = S3Config {
2077 endpoint_bucket_in_host: BucketInHost::False,
2078 ..virt.clone()
2079 };
2080 assert_eq!(
2081 forced_off.effective_endpoint("mybucket").as_deref(),
2082 Some("https://s3.example.com")
2083 );
2084 let forced_on = S3Config {
2085 endpoint_bucket_in_host: BucketInHost::True,
2086 ..path.clone()
2087 };
2088 assert_eq!(
2089 forced_on.effective_endpoint("mybucket").as_deref(),
2090 Some("https://mybucket.s3.example.com")
2091 );
2092 assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
2094 }
2095
2096 #[test]
2097 fn env_prefix_sanitises_name() {
2098 let mk = |name: &str| DatasetConfig {
2099 name: name.into(),
2100 source: SourceConfig {
2101 kind: SourceKind::Parquet,
2102 location: "x".into(),
2103 },
2104 s3: None,
2105 index: IndexConfig::default(),
2106 columns: vec![],
2107 dict_encode: true,
2108 lazy: false,
2109 predicate_filter: Default::default(),
2110 projection_filter: Default::default(),
2111 };
2112 assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
2113 assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
2114 assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
2115 }
2116
2117 #[test]
2118 fn resolve_local_parquet_single_file_and_dir() {
2119 use std::io::Write;
2120 let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
2121 let _ = std::fs::remove_dir_all(&dir);
2122 std::fs::create_dir_all(&dir).unwrap();
2123 let f = dir.join("a.parquet");
2124 let mut fh = std::fs::File::create(&f).unwrap();
2125 fh.write_all(b"not really parquet").unwrap();
2126
2127 let mk = |loc: &str| DatasetConfig {
2128 name: "ds".into(),
2129 source: SourceConfig {
2130 kind: SourceKind::Parquet,
2131 location: loc.into(),
2132 },
2133 s3: None,
2134 index: IndexConfig::default(),
2135 columns: vec![],
2136 dict_encode: true,
2137 lazy: false,
2138 predicate_filter: Default::default(),
2139 projection_filter: Default::default(),
2140 };
2141
2142 let files = mk(f.to_str().unwrap())
2144 .resolve_local_parquet_files()
2145 .unwrap();
2146 assert_eq!(files, vec![f.clone()]);
2147
2148 let files = mk(dir.to_str().unwrap())
2150 .resolve_local_parquet_files()
2151 .unwrap();
2152 assert_eq!(files, vec![f.clone()]);
2153
2154 assert!(
2156 mk("/no/such/place.parquet")
2157 .resolve_local_parquet_files()
2158 .is_err()
2159 );
2160
2161 let _ = std::fs::remove_dir_all(&dir);
2162 }
2163
2164 #[test]
2165 fn pgwire_loopback_without_password_is_allowed() {
2166 let cfg = PgwireConfig {
2167 enabled: true,
2168 ..Default::default()
2169 };
2170 assert!(cfg.validate_enabled().is_ok());
2171 }
2172
2173 #[test]
2174 fn pgwire_non_loopback_without_password_is_rejected() {
2175 let cfg = PgwireConfig {
2176 enabled: true,
2177 listen: IpAddr::from([0, 0, 0, 0]),
2178 password: None,
2179 ..Default::default()
2180 };
2181 let err = cfg.validate_enabled().unwrap_err();
2182 assert!(matches!(err, AppError::Internal(m) if m.contains("password is required")));
2183 }
2184
2185 #[test]
2186 fn pgwire_non_loopback_with_password_but_no_tls_is_rejected() {
2187 let cfg = PgwireConfig {
2188 enabled: true,
2189 listen: IpAddr::from([0, 0, 0, 0]),
2190 password: Some("pw".into()),
2191 ..Default::default()
2192 };
2193 let err = cfg.validate_enabled().unwrap_err();
2194 assert!(matches!(err, AppError::Internal(m) if m.contains("requires TLS")));
2195 }
2196
2197 #[test]
2198 fn pgwire_tls_cert_without_key_is_rejected() {
2199 let cfg = PgwireConfig {
2200 enabled: true,
2201 tls_cert: Some(PathBuf::from("/tmp/server.crt")),
2202 tls_key: None,
2203 ..Default::default()
2204 };
2205 let err = cfg.validate_enabled().unwrap_err();
2206 assert!(matches!(err, AppError::Internal(m) if m.contains("must be set together")));
2207 }
2208
2209 #[test]
2210 fn pgwire_non_loopback_with_password_and_tls_is_allowed() {
2211 let cfg = PgwireConfig {
2212 enabled: true,
2213 listen: IpAddr::from([0, 0, 0, 0]),
2214 password: Some("pw".into()),
2215 tls_cert: Some(PathBuf::from("/tmp/server.crt")),
2216 tls_key: Some(PathBuf::from("/tmp/server.key")),
2217 ..Default::default()
2218 };
2219 assert!(cfg.validate_enabled().is_ok());
2220 }
2221}
2222