1use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::{Deserialize, Serialize};
29
30use crate::errors::AppError;
31
32static SOURCE_CONFIG_PATH: std::sync::OnceLock<PathBuf> = std::sync::OnceLock::new();
37
38pub fn source_config_path() -> Option<&'static std::path::Path> {
40 SOURCE_CONFIG_PATH.get().map(|p| p.as_path())
41}
42
43const RESERVED_MOUNTS: &[&str] = &[
47 "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
48];
49
50#[derive(Debug, Deserialize)]
55pub struct AppConfig {
56 #[serde(default)]
57 pub server: ServerConfig,
58 #[serde(default)]
59 pub docs: DocsConfig,
60 #[serde(default)]
61 pub swagger: SwaggerConfig,
62 #[serde(default)]
63 pub metrics: MetricsConfig,
64 #[serde(default)]
65 pub explorer: ExplorerConfig,
66 #[serde(default)]
67 pub sql: SqlConfig,
68 #[serde(default)]
69 pub datafusion: DataFusionConfig,
70 #[serde(default)]
71 pub auth: AuthConfig,
72 #[serde(rename = "dataset", default)]
73 pub datasets: Vec<DatasetConfig>,
74}
75
76#[derive(Debug, Deserialize)]
77#[serde(default)]
78pub struct ServerConfig {
79 pub backend: Backend,
81 pub listen: IpAddr,
84 pub port: u16,
86 pub workers: Option<usize>,
88 pub prefix: String,
94 pub compress: bool,
99 pub max_body_bytes: usize,
104 pub max_page_size: u64,
108 pub force_lazy_above_mb: u64,
118 pub request_timeout_ms: u64,
122 pub shutdown_timeout_secs: u64,
127 pub quack: QuackConfig,
130}
131
132impl Default for ServerConfig {
133 fn default() -> Self {
134 Self {
135 backend: Backend::default(),
136 listen: IpAddr::from([127, 0, 0, 1]),
137 port: 8080,
138 workers: None,
139 prefix: String::new(),
140 compress: true,
141 max_body_bytes: 1024 * 1024,
142 max_page_size: 100_000,
143 force_lazy_above_mb: 0,
144 request_timeout_ms: 30_000,
145 shutdown_timeout_secs: 30,
146 quack: QuackConfig::default(),
147 }
148 }
149}
150
151#[derive(Debug, Clone, Deserialize)]
157#[serde(default)]
158pub struct QuackConfig {
159 pub enabled: bool,
162 pub uri: String,
165 pub token: Option<String>,
168 pub allow_other_hostname: bool,
171 pub read_only: bool,
174}
175
176impl Default for QuackConfig {
177 fn default() -> Self {
178 Self {
179 enabled: false,
180 uri: "quack:localhost".into(),
181 token: None,
182 allow_other_hostname: false,
183 read_only: true,
184 }
185 }
186}
187
188impl QuackConfig {
189 pub fn validate_enabled(&self) -> Result<(), AppError> {
193 if self.uri.trim().is_empty() {
194 return Err(AppError::Internal(
195 "server.quack.uri must not be empty when server.quack.enabled = true".into(),
196 ));
197 }
198 if !self.uri.starts_with("quack:") {
199 return Err(AppError::Internal(format!(
200 "server.quack.uri must start with 'quack:' (got '{}')",
201 self.uri
202 )));
203 }
204 if !self.allow_other_hostname {
205 let host = self.hostname().unwrap_or_default();
206 if host != "localhost" {
207 return Err(AppError::Internal(format!(
208 "server.quack.uri host must be 'localhost' unless \
209 server.quack.allow_other_hostname = true (got '{}')",
210 self.uri
211 )));
212 }
213 }
214 if let Some(token) = self.token.as_deref()
215 && token.len() < 4
216 {
217 return Err(AppError::Internal(
218 "server.quack.token must be at least 4 characters".into(),
219 ));
220 }
221 Ok(())
222 }
223
224 fn hostname(&self) -> Option<&str> {
225 let rest = self.uri.strip_prefix("quack:")?;
226 let rest = rest.strip_prefix("//").unwrap_or(rest);
227 let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
228 (!host.is_empty()).then_some(host)
229 }
230}
231
232#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
233#[serde(rename_all = "lowercase")]
234pub enum Backend {
235 #[default]
236 Datafusion,
237 Duckdb,
238}
239
240#[derive(Debug, Clone, Deserialize)]
250#[serde(default, deny_unknown_fields)]
251pub struct DocsConfig {
252 pub enabled: bool,
253 pub path: String,
254}
255
256impl Default for DocsConfig {
257 fn default() -> Self {
258 Self {
259 enabled: true,
260 path: "/mkdocs".into(),
261 }
262 }
263}
264
265#[derive(Debug, Clone, Deserialize)]
282#[serde(default, deny_unknown_fields)]
283pub struct SwaggerConfig {
284 pub enabled: bool,
285 pub path: String,
286 pub oauth2: Option<SwaggerOAuth2Config>,
287}
288
289impl Default for SwaggerConfig {
290 fn default() -> Self {
291 Self {
292 enabled: true,
293 path: "/docs".into(),
294 oauth2: None,
295 }
296 }
297}
298
299#[derive(Debug, Clone, Deserialize)]
309#[serde(deny_unknown_fields)]
310pub struct SwaggerOAuth2Config {
311 pub issuer: String,
315 pub client_id: String,
320 #[serde(default)]
324 pub scopes: Vec<String>,
325 #[serde(default = "default_true")]
328 pub pkce: bool,
329}
330
331#[derive(Debug, Clone, Deserialize)]
351#[serde(default, deny_unknown_fields)]
352pub struct MetricsConfig {
353 pub enabled: bool,
354 pub path: String,
355}
356
357impl Default for MetricsConfig {
358 fn default() -> Self {
359 Self {
360 enabled: false,
361 path: "/metrics".into(),
362 }
363 }
364}
365
366#[derive(Debug, Clone, Deserialize)]
379#[serde(default, deny_unknown_fields)]
380pub struct ExplorerConfig {
381 pub enabled: bool,
382 pub path: String,
383}
384
385impl Default for ExplorerConfig {
386 fn default() -> Self {
387 Self {
388 enabled: true,
389 path: "/explore".into(),
390 }
391 }
392}
393
394#[derive(Debug, Clone, Deserialize)]
412#[serde(default, deny_unknown_fields)]
413pub struct SqlConfig {
414 pub enabled: bool,
416 pub max_rows: u64,
420}
421
422impl Default for SqlConfig {
423 fn default() -> Self {
424 Self {
425 enabled: false,
426 max_rows: 100_000,
427 }
428 }
429}
430
431#[derive(Debug, Clone, Deserialize)]
438#[serde(default, deny_unknown_fields)]
439pub struct DataFusionConfig {
440 pub pushdown_filters: bool,
446 pub reorder_filters: bool,
450 pub list_files_cache: bool,
454 pub list_files_cache_mb: usize,
457 pub list_files_cache_ttl_secs: u64,
461}
462
463impl Default for DataFusionConfig {
464 fn default() -> Self {
465 Self {
466 pushdown_filters: false,
467 reorder_filters: false,
468 list_files_cache: false,
469 list_files_cache_mb: 64,
470 list_files_cache_ttl_secs: 60,
471 }
472 }
473}
474
475#[derive(Debug, Clone, Deserialize)]
492#[serde(default, deny_unknown_fields)]
493pub struct AuthConfig {
494 pub enabled: bool,
496 pub issuer: String,
499 pub audience: String,
502 pub read_scopes: Vec<String>,
506 pub reload_scopes: Vec<String>,
509 pub anonymous_read: bool,
512 pub start_degraded: bool,
517 pub algorithms: Vec<String>,
521 pub leeway_secs: u64,
523 pub jwks_refresh_secs: u64,
527 pub tenant_claim: String,
532 pub allowed_tenants: Vec<String>,
536 pub admin_token_fallback: bool,
541}
542
543impl Default for AuthConfig {
544 fn default() -> Self {
545 Self {
546 enabled: false,
547 issuer: String::new(),
548 audience: String::new(),
549 read_scopes: Vec::new(),
550 reload_scopes: Vec::new(),
551 anonymous_read: false,
552 start_degraded: true,
553 algorithms: vec!["RS256".into()],
554 leeway_secs: 60,
555 jwks_refresh_secs: 3600,
556 tenant_claim: String::new(),
557 allowed_tenants: Vec::new(),
558 admin_token_fallback: true,
559 }
560 }
561}
562
563impl Backend {
564 pub fn as_str(self) -> &'static str {
565 match self {
566 Backend::Datafusion => "datafusion",
567 Backend::Duckdb => "duckdb",
568 }
569 }
570}
571
572#[derive(Debug, Clone, Deserialize, Serialize)]
573pub struct DatasetConfig {
574 pub name: String,
575 pub source: SourceConfig,
576 #[serde(default)]
577 pub s3: Option<S3Config>,
578 #[serde(default)]
579 pub index: IndexConfig,
580 #[serde(default)]
586 pub columns: Vec<String>,
587 #[serde(default = "default_true")]
594 pub dict_encode: bool,
595 #[serde(default)]
603 pub lazy: bool,
604 #[serde(default)]
609 pub predicate_filter: ColumnFilter,
610 #[serde(default)]
617 pub projection_filter: ColumnFilter,
618}
619
620fn default_true() -> bool {
621 true
622}
623
624#[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq)]
633#[serde(default, deny_unknown_fields)]
634pub struct ColumnFilter {
635 #[serde(default)]
636 pub include: Vec<String>,
637 #[serde(default)]
638 pub exclude: Vec<String>,
639}
640
641impl ColumnFilter {
642 pub fn is_active(&self) -> bool {
644 !self.include.is_empty() || !self.exclude.is_empty()
645 }
646
647 pub fn allows(&self, col: &str) -> bool {
650 let lc = col.to_lowercase();
651 if !self.include.is_empty() {
652 return self.include.iter().any(|c| c.to_lowercase() == lc);
653 }
654 if !self.exclude.is_empty() {
655 return !self.exclude.iter().any(|c| c.to_lowercase() == lc);
656 }
657 true
658 }
659
660 pub fn validate(&self, dataset: &str, ctx: &str) -> Result<(), AppError> {
663 if !self.include.is_empty() && !self.exclude.is_empty() {
664 return Err(AppError::InvalidValue(format!(
665 "dataset '{dataset}': {ctx} may set 'include' or 'exclude', not both"
666 )));
667 }
668 Ok(())
669 }
670
671 pub fn listed(&self) -> &[String] {
675 if !self.include.is_empty() {
676 &self.include
677 } else {
678 &self.exclude
679 }
680 }
681}
682
683#[derive(Debug, Clone, Deserialize, Serialize)]
684pub struct SourceConfig {
685 pub kind: SourceKind,
686 pub location: String,
688}
689
690#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
691#[serde(rename_all = "lowercase")]
692pub enum SourceKind {
693 #[default]
694 Parquet,
695 Delta,
696}
697
698impl SourceKind {
699 pub fn as_str(self) -> &'static str {
700 match self {
701 SourceKind::Parquet => "parquet",
702 SourceKind::Delta => "delta",
703 }
704 }
705}
706
707#[derive(Debug, Clone, Deserialize, Serialize)]
710#[serde(default)]
711pub struct S3Config {
712 pub region: Option<String>,
713 pub endpoint: Option<String>,
715 pub addressing_style: AddressingStyle,
718 pub allow_http: bool,
720 pub access_key_id: Option<String>,
723 pub secret_access_key: Option<String>,
724 pub session_token: Option<String>,
725 pub partitioning: Partitioning,
728 pub endpoint_bucket_in_host: BucketInHost,
732}
733
734impl Default for S3Config {
735 fn default() -> Self {
736 Self {
737 region: None,
738 endpoint: None,
739 addressing_style: AddressingStyle::Virtual,
740 allow_http: false,
741 access_key_id: None,
742 secret_access_key: None,
743 session_token: None,
744 partitioning: Partitioning::Auto,
745 endpoint_bucket_in_host: BucketInHost::Auto,
746 }
747 }
748}
749
750impl S3Config {
751 pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
760 let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
761
762 let fold = match self.endpoint_bucket_in_host {
763 BucketInHost::False => false,
764 BucketInHost::True => true,
765 BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
766 };
767 if !fold {
768 return Some(ep.to_string());
769 }
770
771 let (scheme, host_and_path) = match ep.split_once("://") {
772 Some((s, rest)) => (Some(s), rest),
773 None => (None, ep),
774 };
775 let (host, path) = match host_and_path.split_once('/') {
777 Some((h, p)) => (h, Some(p)),
778 None => (host_and_path, None),
779 };
780 if host == bucket || host.starts_with(&format!("{bucket}.")) {
782 return Some(ep.to_string());
783 }
784 let new_host = format!("{bucket}.{host}");
785 let rebuilt = match (scheme, path) {
786 (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
787 (Some(s), None) => format!("{s}://{new_host}"),
788 (None, Some(p)) => format!("{new_host}/{p}"),
789 (None, None) => new_host,
790 };
791 Some(rebuilt)
792 }
793}
794
795#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
796#[serde(rename_all = "lowercase")]
797pub enum AddressingStyle {
798 #[default]
799 Virtual,
800 Path,
801}
802
803impl AddressingStyle {
804 pub fn as_str(self) -> &'static str {
805 match self {
806 AddressingStyle::Virtual => "virtual",
807 AddressingStyle::Path => "path",
808 }
809 }
810}
811
812#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
816#[serde(rename_all = "lowercase")]
817pub enum Partitioning {
818 #[default]
821 Auto,
822 Hive,
825 None,
828}
829
830impl Partitioning {
831 pub fn as_str(self) -> &'static str {
832 match self {
833 Partitioning::Auto => "auto",
834 Partitioning::Hive => "hive",
835 Partitioning::None => "none",
836 }
837 }
838}
839
840#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
845#[serde(rename_all = "lowercase")]
846pub enum BucketInHost {
847 #[default]
850 Auto,
851 True,
853 False,
855}
856
857impl BucketInHost {
858 pub fn as_str(self) -> &'static str {
859 match self {
860 BucketInHost::Auto => "auto",
861 BucketInHost::True => "true",
862 BucketInHost::False => "false",
863 }
864 }
865}
866
867#[derive(Debug, Clone, Deserialize, Serialize)]
868#[serde(default)]
869pub struct IndexConfig {
870 pub mode: IndexMode,
871 pub columns: Vec<String>,
872 pub max_cardinality: usize,
873}
874
875impl Default for IndexConfig {
876 fn default() -> Self {
877 Self {
878 mode: IndexMode::Auto,
879 columns: Vec::new(),
880 max_cardinality: 100_000,
881 }
882 }
883}
884
885#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
886#[serde(rename_all = "lowercase")]
887pub enum IndexMode {
888 #[default]
889 Auto,
890 None,
891 List,
892}
893
894#[derive(Debug, Clone, Default)]
897pub struct ResolvedCreds {
898 pub access_key_id: Option<String>,
899 pub secret_access_key: Option<String>,
900 pub session_token: Option<String>,
901}
902
903impl ResolvedCreds {
904 pub fn has_keypair(&self) -> bool {
905 self.access_key_id.is_some() && self.secret_access_key.is_some()
906 }
907}
908
909impl AppConfig {
914 pub fn load(path: &str) -> Result<Self, AppError> {
916 let raw = std::fs::read_to_string(path)
917 .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
918 let mut cfg: AppConfig =
919 toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
920 cfg.normalize();
921 cfg.validate()?;
922 let _ = SOURCE_CONFIG_PATH.set(PathBuf::from(path));
926 Ok(cfg)
927 }
928
929 fn normalize(&mut self) {
937 for s in self
938 .auth
939 .read_scopes
940 .iter_mut()
941 .chain(self.auth.reload_scopes.iter_mut())
942 {
943 *s = s.to_ascii_lowercase();
944 }
945 }
946
947 fn validate(&self) -> Result<(), AppError> {
948 let p = &self.server.prefix;
950 if !p.is_empty() {
951 if !p.starts_with('/') {
952 return Err(AppError::Internal(format!(
953 "server.prefix must start with '/' (got '{p}')"
954 )));
955 }
956 if p.ends_with('/') {
957 return Err(AppError::Internal(format!(
958 "server.prefix must not end with '/' (got '{p}')"
959 )));
960 }
961 }
962
963 if self.datasets.is_empty() {
964 return Err(AppError::Internal(
965 "datasets.toml has no [[dataset]] entries".into(),
966 ));
967 }
968
969 if self.server.quack.enabled {
970 self.server.quack.validate_enabled()?;
971 }
972
973 {
976 let dp = &self.docs.path;
977 if !dp.starts_with('/') {
978 return Err(AppError::Internal(format!(
979 "docs.path must start with '/' (got '{dp}')"
980 )));
981 }
982 if dp.len() > 1 && dp.ends_with('/') {
983 return Err(AppError::Internal(format!(
984 "docs.path must not end with '/' (got '{dp}')"
985 )));
986 }
987 if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
988 return Err(AppError::Internal(format!(
989 "docs.path '{dp}' collides with a reserved route"
990 )));
991 }
992 }
993
994 {
996 let sp = &self.swagger.path;
997 if !sp.starts_with('/') {
998 return Err(AppError::Internal(format!(
999 "swagger.path must start with '/' (got '{sp}')"
1000 )));
1001 }
1002 if sp.len() > 1 && sp.ends_with('/') {
1003 return Err(AppError::Internal(format!(
1004 "swagger.path must not end with '/' (got '{sp}')"
1005 )));
1006 }
1007 if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
1008 return Err(AppError::Internal(format!(
1009 "swagger.path '{sp}' collides with a reserved route"
1010 )));
1011 }
1012 if sp == &self.docs.path {
1013 return Err(AppError::Internal(format!(
1014 "swagger.path and docs.path must differ (both '{sp}')"
1015 )));
1016 }
1017 if let Some(o) = &self.swagger.oauth2 {
1018 if o.issuer.trim().is_empty() {
1019 return Err(AppError::Internal(
1020 "swagger.oauth2.issuer must not be empty".into(),
1021 ));
1022 }
1023 if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
1024 return Err(AppError::Internal(format!(
1025 "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
1026 o.issuer
1027 )));
1028 }
1029 if o.client_id.trim().is_empty() {
1030 return Err(AppError::Internal(
1031 "swagger.oauth2.client_id must not be empty".into(),
1032 ));
1033 }
1034 }
1035 }
1036
1037 {
1043 let mp = &self.metrics.path;
1044 if !mp.starts_with('/') {
1045 return Err(AppError::Internal(format!(
1046 "metrics.path must start with '/' (got '{mp}')"
1047 )));
1048 }
1049 if mp.len() > 1 && mp.ends_with('/') {
1050 return Err(AppError::Internal(format!(
1051 "metrics.path must not end with '/' (got '{mp}')"
1052 )));
1053 }
1054 if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
1055 return Err(AppError::Internal(format!(
1056 "metrics.path '{mp}' collides with a reserved route"
1057 )));
1058 }
1059 if mp == &self.docs.path {
1060 return Err(AppError::Internal(format!(
1061 "metrics.path and docs.path must differ (both '{mp}')"
1062 )));
1063 }
1064 if mp == &self.swagger.path {
1065 return Err(AppError::Internal(format!(
1066 "metrics.path and swagger.path must differ (both '{mp}')"
1067 )));
1068 }
1069 }
1070
1071 {
1074 let ep = &self.explorer.path;
1075 if !ep.starts_with('/') {
1076 return Err(AppError::Internal(format!(
1077 "explorer.path must start with '/' (got '{ep}')"
1078 )));
1079 }
1080 if ep.len() > 1 && ep.ends_with('/') {
1081 return Err(AppError::Internal(format!(
1082 "explorer.path must not end with '/' (got '{ep}')"
1083 )));
1084 }
1085 if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
1086 return Err(AppError::Internal(format!(
1087 "explorer.path '{ep}' collides with a reserved route"
1088 )));
1089 }
1090 if ep == &self.docs.path {
1091 return Err(AppError::Internal(format!(
1092 "explorer.path and docs.path must differ (both '{ep}')"
1093 )));
1094 }
1095 if ep == &self.swagger.path {
1096 return Err(AppError::Internal(format!(
1097 "explorer.path and swagger.path must differ (both '{ep}')"
1098 )));
1099 }
1100 if ep == &self.metrics.path {
1101 return Err(AppError::Internal(format!(
1102 "explorer.path and metrics.path must differ (both '{ep}')"
1103 )));
1104 }
1105 }
1106
1107 if self.auth.enabled {
1112 let a = &self.auth;
1113 if a.issuer.trim().is_empty() {
1114 return Err(AppError::Internal(
1115 "auth.issuer must not be empty when auth.enabled = true".into(),
1116 ));
1117 }
1118 if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
1119 return Err(AppError::Internal(format!(
1120 "auth.issuer must be an absolute http(s) URL (got '{}')",
1121 a.issuer
1122 )));
1123 }
1124 for alg in &a.algorithms {
1125 match alg.as_str() {
1126 "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
1127 | "PS512" => {}
1128 other => {
1129 return Err(AppError::Internal(format!(
1130 "auth.algorithms[{other}] is not allowed; pick one of \
1131 RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
1132 )));
1133 }
1134 }
1135 }
1136 if a.algorithms.is_empty() {
1137 return Err(AppError::Internal(
1138 "auth.algorithms must not be empty".into(),
1139 ));
1140 }
1141 if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
1142 return Err(AppError::Internal(format!(
1143 "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
1144 a.tenant_claim
1145 )));
1146 }
1147 if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
1148 return Err(AppError::Internal(
1149 "auth.allowed_tenants is set but auth.tenant_claim is empty — \
1150 can't enforce a tenant allow-list without a claim to extract from"
1151 .into(),
1152 ));
1153 }
1154 }
1155
1156 let mut seen = HashSet::new();
1157 for d in &self.datasets {
1158 if !seen.insert(d.name.as_str()) {
1159 return Err(AppError::Internal(format!(
1160 "duplicate dataset name: {}",
1161 d.name
1162 )));
1163 }
1164 if d.name.is_empty() {
1165 return Err(AppError::Internal("dataset name must not be empty".into()));
1166 }
1167 if !d
1169 .name
1170 .chars()
1171 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1172 {
1173 return Err(AppError::Internal(format!(
1174 "dataset name '{}' must be alphanumeric (plus _ - .)",
1175 d.name
1176 )));
1177 }
1178
1179 if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
1180 return Err(AppError::Internal(format!(
1181 "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1182 d.name
1183 )));
1184 }
1185
1186 if d.source.is_s3() {
1188 d.source.s3_bucket()?;
1189 if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1190 && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1191 && std::env::var("AWS_REGION").is_err()
1192 && std::env::var("AWS_DEFAULT_REGION").is_err()
1193 {
1194 log::warn!(
1195 "dataset '{}': S3 source without explicit region — \
1196 relying on AWS_REGION env var",
1197 d.name
1198 );
1199 }
1200 } else {
1201 match d.source.kind {
1205 SourceKind::Parquet => {
1206 d.resolve_local_parquet_files()?;
1207 }
1208 SourceKind::Delta => {
1209 let p = Path::new(&d.source.location);
1210 if !p.exists() {
1211 return Err(AppError::Internal(format!(
1212 "dataset '{}': delta location does not exist: {}",
1213 d.name, d.source.location
1214 )));
1215 }
1216 }
1217 }
1218 }
1219 }
1220 Ok(())
1221 }
1222}
1223
1224impl SourceConfig {
1225 pub fn is_s3(&self) -> bool {
1226 self.location.starts_with("s3://")
1227 }
1228
1229 pub fn has_glob(&self) -> bool {
1232 self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1233 }
1234
1235 pub fn s3_recursive_parquet_glob(&self) -> String {
1241 if !self.is_s3() || self.has_glob() {
1242 return self.location.clone();
1243 }
1244 let trimmed = self.location.trim_end_matches('/');
1245 format!("{trimmed}/**/*.parquet")
1246 }
1247
1248 pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1250 let rest = self
1251 .location
1252 .strip_prefix("s3://")
1253 .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1254 let (bucket, key) = match rest.split_once('/') {
1255 Some((b, k)) => (b, k),
1256 None => (rest, ""),
1257 };
1258 if bucket.is_empty() {
1259 return Err(AppError::Internal(format!(
1260 "s3 URL missing bucket: {}",
1261 self.location
1262 )));
1263 }
1264 Ok((bucket, key))
1265 }
1266}
1267
1268impl DatasetConfig {
1269 pub fn validate_for_register(&self) -> Result<(), AppError> {
1275 if self.name.is_empty() {
1276 return Err(AppError::InvalidValue(
1277 "dataset name must not be empty".into(),
1278 ));
1279 }
1280 if !self
1281 .name
1282 .chars()
1283 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1284 {
1285 return Err(AppError::InvalidValue(format!(
1286 "dataset name '{}' must be alphanumeric (plus _ - .)",
1287 self.name
1288 )));
1289 }
1290 if self.index.mode == IndexMode::List && self.index.columns.is_empty() {
1291 return Err(AppError::InvalidValue(format!(
1292 "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1293 self.name
1294 )));
1295 }
1296 self.predicate_filter.validate(&self.name, "predicate_filter")?;
1297 self.projection_filter
1298 .validate(&self.name, "projection_filter")?;
1299 if self.source.is_s3() {
1300 self.source.s3_bucket()?;
1301 }
1302 Ok(())
1303 }
1304
1305 pub fn to_toml_block(&self) -> Result<String, AppError> {
1311 #[derive(Serialize)]
1312 struct Block {
1313 name: String,
1314 #[serde(skip_serializing_if = "Vec::is_empty")]
1315 columns: Vec<String>,
1316 dict_encode: bool,
1317 lazy: bool,
1318 source: SourceConfig,
1319 #[serde(skip_serializing_if = "Option::is_none")]
1320 s3: Option<S3Config>,
1321 #[serde(skip_serializing_if = "Option::is_none")]
1322 index: Option<IndexConfig>,
1323 #[serde(skip_serializing_if = "Option::is_none")]
1324 predicate_filter: Option<ColumnFilter>,
1325 #[serde(skip_serializing_if = "Option::is_none")]
1326 projection_filter: Option<ColumnFilter>,
1327 }
1328 #[derive(Serialize)]
1329 struct Doc {
1330 dataset: [Block; 1],
1331 }
1332 let doc = Doc {
1333 dataset: [Block {
1334 name: self.name.clone(),
1335 columns: self.columns.clone(),
1336 dict_encode: self.dict_encode,
1337 lazy: self.lazy,
1338 source: self.source.clone(),
1339 s3: self.s3.clone(),
1340 index: if self.index.is_default() {
1341 None
1342 } else {
1343 Some(self.index.clone())
1344 },
1345 predicate_filter: self
1346 .predicate_filter
1347 .is_active()
1348 .then(|| self.predicate_filter.clone()),
1349 projection_filter: self
1350 .projection_filter
1351 .is_active()
1352 .then(|| self.projection_filter.clone()),
1353 }],
1354 };
1355 toml::to_string_pretty(&doc)
1356 .map_err(|e| AppError::Internal(format!("failed to render dataset TOML: {e}")))
1357 }
1358
1359 pub fn persist_to_source_config(&self) -> Result<PathBuf, AppError> {
1369 use std::io::Write;
1370 let path = source_config_path().ok_or_else(|| {
1371 AppError::InvalidValue("server has no on-disk config file to append to".into())
1372 })?;
1373 let block = self.to_toml_block()?;
1374 let mut file = std::fs::OpenOptions::new()
1375 .append(true)
1376 .open(path)
1377 .map_err(|e| {
1378 AppError::Internal(format!("failed to open config {}: {e}", path.display()))
1379 })?;
1380 write!(file, "\n{block}").map_err(|e| {
1382 AppError::Internal(format!("failed to write config {}: {e}", path.display()))
1383 })?;
1384 Ok(path.to_path_buf())
1385 }
1386}
1387
1388impl IndexConfig {
1389 fn is_default(&self) -> bool {
1392 self.mode == IndexMode::Auto && self.columns.is_empty() && self.max_cardinality == 100_000
1393 }
1394}
1395
1396impl DatasetConfig {
1397 pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1407 if self.source.is_s3() {
1408 return Err(AppError::Internal(format!(
1409 "dataset '{}': resolve_local_parquet_files called on s3 source",
1410 self.name
1411 )));
1412 }
1413 let loc = &self.source.location;
1414
1415 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1417 let mut files: Vec<PathBuf> = glob::glob(loc)
1418 .map_err(|e| {
1419 AppError::Internal(format!(
1420 "dataset '{}': bad glob pattern '{loc}': {e}",
1421 self.name
1422 ))
1423 })?
1424 .filter_map(|r| r.ok())
1425 .filter(|p| {
1426 p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1427 })
1428 .collect();
1429 files.sort();
1430 if files.is_empty() {
1431 return Err(AppError::EmptyDataset(format!(
1432 "dataset '{}': glob '{loc}' matched no .parquet files",
1433 self.name
1434 )));
1435 }
1436 return Ok(files);
1437 }
1438
1439 let path = Path::new(loc);
1440 if !path.exists() {
1441 return Err(AppError::Internal(format!(
1442 "dataset '{}': source path does not exist: {loc}",
1443 self.name
1444 )));
1445 }
1446
1447 if path.is_file() {
1448 if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1449 return Err(AppError::Internal(format!(
1450 "dataset '{}': source must be a .parquet file",
1451 self.name
1452 )));
1453 }
1454 return Ok(vec![path.to_path_buf()]);
1455 }
1456
1457 let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1458 .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1459 .filter_map(|entry| entry.ok().map(|e| e.path()))
1460 .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1461 .collect();
1462 files.sort();
1463 if files.is_empty() {
1464 return Err(AppError::EmptyDataset(format!(
1465 "dataset '{}': no *.parquet files found in {loc}",
1466 self.name
1467 )));
1468 }
1469 Ok(files)
1470 }
1471
1472 pub fn estimate_local_bytes(&self) -> Option<u64> {
1482 if self.source.is_s3() {
1483 return None;
1484 }
1485 match self.source.kind {
1486 SourceKind::Parquet => {
1487 let files = self.resolve_local_parquet_files().ok()?;
1488 Some(
1489 files
1490 .iter()
1491 .filter_map(|p| std::fs::metadata(p).ok())
1492 .map(|m| m.len())
1493 .sum(),
1494 )
1495 }
1496 SourceKind::Delta => {
1497 let root = self.source.location.trim_end_matches('/');
1498 let pattern = format!("{root}/**/*.parquet");
1499 let paths = glob::glob(&pattern).ok()?;
1500 Some(
1501 paths
1502 .filter_map(Result::ok)
1503 .filter_map(|p| std::fs::metadata(&p).ok())
1504 .filter(|m| m.is_file())
1505 .map(|m| m.len())
1506 .sum(),
1507 )
1508 }
1509 }
1510 }
1511
1512 pub fn force_lazy_bytes(&self, server: &ServerConfig) -> Option<u64> {
1519 if self.lazy || server.force_lazy_above_mb == 0 {
1520 return None;
1521 }
1522 let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1523 match self.estimate_local_bytes() {
1524 Some(bytes) if bytes > threshold => Some(bytes),
1525 _ => None,
1526 }
1527 }
1528
1529 pub fn env_prefix(&self) -> String {
1533 self.name
1534 .chars()
1535 .map(|c| {
1536 if c.is_ascii_alphanumeric() {
1537 c.to_ascii_uppercase()
1538 } else {
1539 '_'
1540 }
1541 })
1542 .collect()
1543 }
1544
1545 pub fn resolved_creds(&self) -> ResolvedCreds {
1550 let prefix = self.env_prefix();
1551 let from_env = |suffix: &str| {
1552 std::env::var(format!("{prefix}_{suffix}"))
1553 .ok()
1554 .filter(|s| !s.is_empty())
1555 };
1556 let inline = self.s3.as_ref();
1557 let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1558
1559 ResolvedCreds {
1560 access_key_id: from_env("AWS_ACCESS_KEY_ID")
1561 .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1562 .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1563 secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1564 .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1565 .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1566 session_token: from_env("AWS_SESSION_TOKEN")
1567 .or_else(|| inline.and_then(|s| s.session_token.clone()))
1568 .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1569 }
1570 }
1571
1572 pub fn resolved_region(&self) -> String {
1575 let prefix = self.env_prefix();
1576 std::env::var(format!("{prefix}_AWS_REGION"))
1577 .ok()
1578 .filter(|s| !s.is_empty())
1579 .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1580 .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1581 .or_else(|| {
1582 std::env::var("AWS_DEFAULT_REGION")
1583 .ok()
1584 .filter(|s| !s.is_empty())
1585 })
1586 .unwrap_or_else(|| "us-east-1".to_string())
1587 }
1588}
1589
1590#[cfg(test)]
1591mod tests {
1592 use super::*;
1593
1594 #[test]
1595 fn server_defaults() {
1596 let s = ServerConfig::default();
1597 assert_eq!(s.backend, Backend::Datafusion);
1598 assert_eq!(s.port, 8080);
1599 assert!(s.compress);
1600 assert_eq!(s.max_body_bytes, 1024 * 1024);
1601 assert_eq!(s.max_page_size, 100_000);
1602 assert_eq!(s.force_lazy_above_mb, 0);
1603 assert_eq!(s.request_timeout_ms, 30_000);
1604 assert!(!s.quack.enabled);
1605 assert_eq!(s.quack.uri, "quack:localhost");
1606 assert!(s.quack.token.is_none());
1607 assert!(!s.quack.allow_other_hostname);
1608 assert!(s.quack.read_only);
1609 assert_eq!(s.prefix, "");
1610 assert!(s.listen.is_loopback());
1611 }
1612
1613 #[test]
1614 fn server_overrides_from_toml() {
1615 let toml = r#"
1616 [server]
1617 backend = "duckdb"
1618 port = 9000
1619 prefix = "/datapress"
1620 compress = false
1621 max_body_bytes = 4096
1622 max_page_size = 50000
1623 force_lazy_above_mb = 256
1624 request_timeout_ms = 0
1625
1626 [server.quack]
1627 enabled = true
1628 uri = "quack:localhost:9495"
1629 token = "test-token"
1630 read_only = false
1631 [[dataset]]
1632 name = "x"
1633 source.kind = "parquet"
1634 source.location = "/tmp/missing.parquet"
1635 "#;
1636 let cfg: AppConfig = toml::from_str(toml).unwrap();
1637 assert_eq!(cfg.server.backend, Backend::Duckdb);
1638 assert_eq!(cfg.server.port, 9000);
1639 assert_eq!(cfg.server.prefix, "/datapress");
1640 assert!(!cfg.server.compress);
1641 assert_eq!(cfg.server.max_body_bytes, 4096);
1642 assert_eq!(cfg.server.max_page_size, 50_000);
1643 assert_eq!(cfg.server.force_lazy_above_mb, 256);
1644 assert_eq!(cfg.server.request_timeout_ms, 0);
1645 assert!(cfg.server.quack.enabled);
1646 assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1647 assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1648 assert!(!cfg.server.quack.read_only);
1649 assert_eq!(cfg.datasets.len(), 1);
1650 assert_eq!(cfg.datasets[0].name, "x");
1651 assert!(cfg.datasets[0].dict_encode); }
1653
1654 #[test]
1655 fn force_lazy_bytes_logic() {
1656 let dir = std::env::temp_dir().join(format!(
1659 "dp-force-lazy-{}-{}",
1660 std::process::id(),
1661 std::time::SystemTime::now()
1662 .duration_since(std::time::UNIX_EPOCH)
1663 .unwrap()
1664 .as_nanos()
1665 ));
1666 std::fs::create_dir_all(&dir).unwrap();
1667 let two_mib = 2 * 1024 * 1024;
1668 let file = dir.join("data.parquet");
1669 std::fs::write(&file, vec![0u8; two_mib]).unwrap();
1670
1671 let mk = |kind: SourceKind, location: &str, lazy: bool| DatasetConfig {
1672 name: "t".into(),
1673 source: SourceConfig {
1674 kind,
1675 location: location.to_string(),
1676 },
1677 s3: None,
1678 index: IndexConfig::default(),
1679 columns: vec![],
1680 dict_encode: true,
1681 lazy,
1682 predicate_filter: Default::default(),
1683 projection_filter: Default::default(),
1684 };
1685 let server = |mb: u64| ServerConfig {
1686 force_lazy_above_mb: mb,
1687 ..ServerConfig::default()
1688 };
1689
1690 let file_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), false);
1692 assert_eq!(file_ds.estimate_local_bytes(), Some(two_mib as u64));
1693 let dir_ds = mk(SourceKind::Parquet, dir.to_str().unwrap(), false);
1694 assert_eq!(dir_ds.estimate_local_bytes(), Some(two_mib as u64));
1695 let delta_ds = mk(SourceKind::Delta, dir.to_str().unwrap(), false);
1696 assert_eq!(delta_ds.estimate_local_bytes(), Some(two_mib as u64));
1697
1698 assert_eq!(file_ds.force_lazy_bytes(&server(0)), None);
1700 assert_eq!(file_ds.force_lazy_bytes(&server(1)), Some(two_mib as u64));
1702 assert_eq!(file_ds.force_lazy_bytes(&server(4)), None);
1704
1705 let lazy_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), true);
1707 assert_eq!(lazy_ds.force_lazy_bytes(&server(1)), None);
1708
1709 let s3_ds = mk(SourceKind::Parquet, "s3://bucket/data.parquet", false);
1711 assert_eq!(s3_ds.estimate_local_bytes(), None);
1712 assert_eq!(s3_ds.force_lazy_bytes(&server(1)), None);
1713
1714 std::fs::remove_dir_all(&dir).ok();
1715 }
1716
1717 #[test]
1718 fn validate_rejects_bad_prefix() {
1719 let bad = ["no-leading-slash", "/trailing/"];
1720 for p in bad {
1721 let cfg = AppConfig {
1722 server: ServerConfig {
1723 prefix: p.to_string(),
1724 ..Default::default()
1725 },
1726 docs: DocsConfig::default(),
1727 swagger: SwaggerConfig::default(),
1728 metrics: MetricsConfig::default(),
1729 explorer: ExplorerConfig::default(),
1730 sql: SqlConfig::default(),
1731 datafusion: DataFusionConfig::default(),
1732 auth: AuthConfig::default(),
1733 datasets: vec![],
1734 };
1735 assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1736 }
1737 }
1738
1739 #[test]
1740 fn normalize_lowercases_configured_scopes() {
1741 let mut cfg = AppConfig {
1742 server: ServerConfig::default(),
1743 docs: DocsConfig::default(),
1744 swagger: SwaggerConfig::default(),
1745 metrics: MetricsConfig::default(),
1746 explorer: ExplorerConfig::default(),
1747 sql: SqlConfig::default(),
1748 datafusion: DataFusionConfig::default(),
1749 auth: AuthConfig {
1750 read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1751 reload_scopes: vec!["Datasets:Reload".into()],
1752 ..Default::default()
1753 },
1754 datasets: vec![],
1755 };
1756 cfg.normalize();
1757 assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1758 assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1759 }
1760
1761 #[test]
1762 fn validate_rejects_no_datasets() {
1763 let cfg = AppConfig {
1764 server: ServerConfig::default(),
1765 docs: DocsConfig::default(),
1766 swagger: SwaggerConfig::default(),
1767 metrics: MetricsConfig::default(),
1768 explorer: ExplorerConfig::default(),
1769 sql: SqlConfig::default(),
1770 datafusion: DataFusionConfig::default(),
1771 auth: AuthConfig::default(),
1772 datasets: vec![],
1773 };
1774 let err = cfg.validate().unwrap_err();
1775 assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1776 }
1777
1778 #[cfg(feature = "auth")]
1779 #[test]
1780 fn validate_accepts_auth_issuer_with_trailing_slash() {
1781 use std::io::Write;
1782
1783 let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1784 let _ = std::fs::remove_dir_all(&dir);
1785 std::fs::create_dir_all(&dir).unwrap();
1786 let file = dir.join("a.parquet");
1787 std::fs::File::create(&file)
1788 .unwrap()
1789 .write_all(b"x")
1790 .unwrap();
1791
1792 let cfg = AppConfig {
1793 server: ServerConfig::default(),
1794 docs: DocsConfig::default(),
1795 swagger: SwaggerConfig::default(),
1796 metrics: MetricsConfig::default(),
1797 explorer: ExplorerConfig::default(),
1798 sql: SqlConfig::default(),
1799 datafusion: DataFusionConfig::default(),
1800 auth: AuthConfig {
1801 enabled: true,
1802 issuer: "https://tenant.example.com/".into(),
1803 ..Default::default()
1804 },
1805 datasets: vec![DatasetConfig {
1806 name: "x".into(),
1807 source: SourceConfig {
1808 kind: SourceKind::Parquet,
1809 location: file.to_string_lossy().into_owned(),
1810 },
1811 s3: None,
1812 index: IndexConfig::default(),
1813 columns: vec![],
1814 dict_encode: true,
1815 lazy: false,
1816 predicate_filter: Default::default(),
1817 projection_filter: Default::default(),
1818 }],
1819 };
1820
1821 assert!(cfg.validate().is_ok());
1822 let _ = std::fs::remove_dir_all(&dir);
1823 }
1824
1825 #[test]
1826 fn validate_rejects_quack_non_local_host_without_override() {
1827 let cfg = AppConfig {
1828 server: ServerConfig {
1829 quack: QuackConfig {
1830 enabled: true,
1831 uri: "quack:127.0.0.1".into(),
1832 token: Some("test-token".into()),
1833 ..Default::default()
1834 },
1835 ..Default::default()
1836 },
1837 docs: DocsConfig::default(),
1838 swagger: SwaggerConfig::default(),
1839 metrics: MetricsConfig::default(),
1840 explorer: ExplorerConfig::default(),
1841 sql: SqlConfig::default(),
1842 datafusion: DataFusionConfig::default(),
1843 auth: AuthConfig::default(),
1844 datasets: vec![DatasetConfig {
1845 name: "x".into(),
1846 source: SourceConfig {
1847 kind: SourceKind::Parquet,
1848 location: "/tmp/missing.parquet".into(),
1849 },
1850 s3: None,
1851 index: IndexConfig::default(),
1852 columns: vec![],
1853 dict_encode: true,
1854 lazy: false,
1855 predicate_filter: Default::default(),
1856 projection_filter: Default::default(),
1857 }],
1858 };
1859 let err = cfg.validate().unwrap_err();
1860 assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1861 }
1862
1863 #[test]
1864 fn validate_rejects_bad_dataset_name() {
1865 let cfg: AppConfig = toml::from_str(
1866 r#"
1867 [[dataset]]
1868 name = "bad name!"
1869 source.kind = "parquet"
1870 source.location = "/tmp/whatever"
1871 "#,
1872 )
1873 .unwrap();
1874 let err = cfg.validate().unwrap_err();
1875 assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1876 }
1877
1878 #[test]
1879 fn validate_rejects_duplicate_names() {
1880 use std::io::Write;
1881 let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1882 let _ = std::fs::remove_dir_all(&dir);
1883 std::fs::create_dir_all(&dir).unwrap();
1884 let f = dir.join("a.parquet");
1885 std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1886 let path = f.to_str().unwrap();
1887
1888 let cfg: AppConfig = toml::from_str(&format!(
1889 r#"
1890 [[dataset]]
1891 name = "a"
1892 source.kind = "parquet"
1893 source.location = "{path}"
1894 [[dataset]]
1895 name = "a"
1896 source.kind = "parquet"
1897 source.location = "{path}"
1898 "#
1899 ))
1900 .unwrap();
1901 let err = cfg.validate().expect_err("expected error");
1902 assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1903
1904 let _ = std::fs::remove_dir_all(&dir);
1905 }
1906
1907 #[test]
1908 fn s3_bucket_parsing() {
1909 let mk = |loc: &str| SourceConfig {
1910 kind: SourceKind::Parquet,
1911 location: loc.into(),
1912 };
1913 let s1 = mk("s3://bucket/path/key");
1914 assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1915 let s2 = mk("s3://only-bucket");
1916 assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1917 assert!(mk("s3:///nokey").s3_bucket().is_err());
1918 assert!(mk("/local/path").s3_bucket().is_err());
1919 }
1920
1921 #[test]
1922 fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1923 let mk = |loc: &str| SourceConfig {
1924 kind: SourceKind::Parquet,
1925 location: loc.into(),
1926 };
1927 assert_eq!(
1929 mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1930 "s3://bucket/logs/**/*.parquet"
1931 );
1932 assert_eq!(
1933 mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1934 "s3://bucket/logs/**/*.parquet"
1935 );
1936 assert_eq!(
1938 mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1939 "s3://bucket/logs/*.parquet"
1940 );
1941 assert_eq!(
1943 mk("/local/logs").s3_recursive_parquet_glob(),
1944 "/local/logs"
1945 );
1946 }
1947
1948 #[test]
1949 fn effective_endpoint_folds_bucket_per_mode() {
1950 let virt = S3Config {
1951 endpoint: Some("https://s3.example.com".into()),
1952 addressing_style: AddressingStyle::Virtual,
1953 ..Default::default()
1954 };
1955 assert_eq!(
1957 virt.effective_endpoint("mybucket").as_deref(),
1958 Some("https://mybucket.s3.example.com")
1959 );
1960 let prefixed = S3Config {
1962 endpoint: Some("https://mybucket.s3.example.com".into()),
1963 ..virt.clone()
1964 };
1965 assert_eq!(
1966 prefixed.effective_endpoint("mybucket").as_deref(),
1967 Some("https://mybucket.s3.example.com")
1968 );
1969 let path = S3Config {
1971 addressing_style: AddressingStyle::Path,
1972 ..virt.clone()
1973 };
1974 assert_eq!(
1975 path.effective_endpoint("mybucket").as_deref(),
1976 Some("https://s3.example.com")
1977 );
1978 let forced_off = S3Config {
1980 endpoint_bucket_in_host: BucketInHost::False,
1981 ..virt.clone()
1982 };
1983 assert_eq!(
1984 forced_off.effective_endpoint("mybucket").as_deref(),
1985 Some("https://s3.example.com")
1986 );
1987 let forced_on = S3Config {
1988 endpoint_bucket_in_host: BucketInHost::True,
1989 ..path.clone()
1990 };
1991 assert_eq!(
1992 forced_on.effective_endpoint("mybucket").as_deref(),
1993 Some("https://mybucket.s3.example.com")
1994 );
1995 assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1997 }
1998
1999 #[test]
2000 fn env_prefix_sanitises_name() {
2001 let mk = |name: &str| DatasetConfig {
2002 name: name.into(),
2003 source: SourceConfig {
2004 kind: SourceKind::Parquet,
2005 location: "x".into(),
2006 },
2007 s3: None,
2008 index: IndexConfig::default(),
2009 columns: vec![],
2010 dict_encode: true,
2011 lazy: false,
2012 predicate_filter: Default::default(),
2013 projection_filter: Default::default(),
2014 };
2015 assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
2016 assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
2017 assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
2018 }
2019
2020 #[test]
2021 fn resolve_local_parquet_single_file_and_dir() {
2022 use std::io::Write;
2023 let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
2024 let _ = std::fs::remove_dir_all(&dir);
2025 std::fs::create_dir_all(&dir).unwrap();
2026 let f = dir.join("a.parquet");
2027 let mut fh = std::fs::File::create(&f).unwrap();
2028 fh.write_all(b"not really parquet").unwrap();
2029
2030 let mk = |loc: &str| DatasetConfig {
2031 name: "ds".into(),
2032 source: SourceConfig {
2033 kind: SourceKind::Parquet,
2034 location: loc.into(),
2035 },
2036 s3: None,
2037 index: IndexConfig::default(),
2038 columns: vec![],
2039 dict_encode: true,
2040 lazy: false,
2041 predicate_filter: Default::default(),
2042 projection_filter: Default::default(),
2043 };
2044
2045 let files = mk(f.to_str().unwrap())
2047 .resolve_local_parquet_files()
2048 .unwrap();
2049 assert_eq!(files, vec![f.clone()]);
2050
2051 let files = mk(dir.to_str().unwrap())
2053 .resolve_local_parquet_files()
2054 .unwrap();
2055 assert_eq!(files, vec![f.clone()]);
2056
2057 assert!(
2059 mk("/no/such/place.parquet")
2060 .resolve_local_parquet_files()
2061 .is_err()
2062 );
2063
2064 let _ = std::fs::remove_dir_all(&dir);
2065 }
2066}