1use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32const RESERVED_MOUNTS: &[&str] = &[
36 "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45 #[serde(default)]
46 pub server: ServerConfig,
47 #[serde(default)]
48 pub docs: DocsConfig,
49 #[serde(default)]
50 pub swagger: SwaggerConfig,
51 #[serde(default)]
52 pub metrics: MetricsConfig,
53 #[serde(default)]
54 pub explorer: ExplorerConfig,
55 #[serde(default)]
56 pub sql: SqlConfig,
57 #[serde(default)]
58 pub datafusion: DataFusionConfig,
59 #[serde(default)]
60 pub auth: AuthConfig,
61 #[serde(rename = "dataset", default)]
62 pub datasets: Vec<DatasetConfig>,
63}
64
65#[derive(Debug, Deserialize)]
66#[serde(default)]
67pub struct ServerConfig {
68 pub backend: Backend,
70 pub listen: IpAddr,
73 pub port: u16,
75 pub workers: Option<usize>,
77 pub prefix: String,
83 pub compress: bool,
88 pub max_body_bytes: usize,
93 pub max_page_size: u64,
97 pub force_lazy_above_mb: u64,
107 pub request_timeout_ms: u64,
111 pub shutdown_timeout_secs: u64,
116 pub quack: QuackConfig,
119}
120
121impl Default for ServerConfig {
122 fn default() -> Self {
123 Self {
124 backend: Backend::default(),
125 listen: IpAddr::from([127, 0, 0, 1]),
126 port: 8080,
127 workers: None,
128 prefix: String::new(),
129 compress: true,
130 max_body_bytes: 1024 * 1024,
131 max_page_size: 100_000,
132 force_lazy_above_mb: 0,
133 request_timeout_ms: 30_000,
134 shutdown_timeout_secs: 30,
135 quack: QuackConfig::default(),
136 }
137 }
138}
139
140#[derive(Debug, Clone, Deserialize)]
146#[serde(default)]
147pub struct QuackConfig {
148 pub enabled: bool,
151 pub uri: String,
154 pub token: Option<String>,
157 pub allow_other_hostname: bool,
160 pub read_only: bool,
163}
164
165impl Default for QuackConfig {
166 fn default() -> Self {
167 Self {
168 enabled: false,
169 uri: "quack:localhost".into(),
170 token: None,
171 allow_other_hostname: false,
172 read_only: true,
173 }
174 }
175}
176
177impl QuackConfig {
178 pub fn validate_enabled(&self) -> Result<(), AppError> {
182 if self.uri.trim().is_empty() {
183 return Err(AppError::Internal(
184 "server.quack.uri must not be empty when server.quack.enabled = true".into(),
185 ));
186 }
187 if !self.uri.starts_with("quack:") {
188 return Err(AppError::Internal(format!(
189 "server.quack.uri must start with 'quack:' (got '{}')",
190 self.uri
191 )));
192 }
193 if !self.allow_other_hostname {
194 let host = self.hostname().unwrap_or_default();
195 if host != "localhost" {
196 return Err(AppError::Internal(format!(
197 "server.quack.uri host must be 'localhost' unless \
198 server.quack.allow_other_hostname = true (got '{}')",
199 self.uri
200 )));
201 }
202 }
203 if let Some(token) = self.token.as_deref()
204 && token.len() < 4
205 {
206 return Err(AppError::Internal(
207 "server.quack.token must be at least 4 characters".into(),
208 ));
209 }
210 Ok(())
211 }
212
213 fn hostname(&self) -> Option<&str> {
214 let rest = self.uri.strip_prefix("quack:")?;
215 let rest = rest.strip_prefix("//").unwrap_or(rest);
216 let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
217 (!host.is_empty()).then_some(host)
218 }
219}
220
221#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
222#[serde(rename_all = "lowercase")]
223pub enum Backend {
224 #[default]
225 Datafusion,
226 Duckdb,
227}
228
229#[derive(Debug, Clone, Deserialize)]
239#[serde(default, deny_unknown_fields)]
240pub struct DocsConfig {
241 pub enabled: bool,
242 pub path: String,
243}
244
245impl Default for DocsConfig {
246 fn default() -> Self {
247 Self {
248 enabled: true,
249 path: "/mkdocs".into(),
250 }
251 }
252}
253
254#[derive(Debug, Clone, Deserialize)]
271#[serde(default, deny_unknown_fields)]
272pub struct SwaggerConfig {
273 pub enabled: bool,
274 pub path: String,
275 pub oauth2: Option<SwaggerOAuth2Config>,
276}
277
278impl Default for SwaggerConfig {
279 fn default() -> Self {
280 Self {
281 enabled: true,
282 path: "/docs".into(),
283 oauth2: None,
284 }
285 }
286}
287
288#[derive(Debug, Clone, Deserialize)]
298#[serde(deny_unknown_fields)]
299pub struct SwaggerOAuth2Config {
300 pub issuer: String,
304 pub client_id: String,
309 #[serde(default)]
313 pub scopes: Vec<String>,
314 #[serde(default = "default_true")]
317 pub pkce: bool,
318}
319
320#[derive(Debug, Clone, Deserialize)]
340#[serde(default, deny_unknown_fields)]
341pub struct MetricsConfig {
342 pub enabled: bool,
343 pub path: String,
344}
345
346impl Default for MetricsConfig {
347 fn default() -> Self {
348 Self {
349 enabled: false,
350 path: "/metrics".into(),
351 }
352 }
353}
354
355#[derive(Debug, Clone, Deserialize)]
368#[serde(default, deny_unknown_fields)]
369pub struct ExplorerConfig {
370 pub enabled: bool,
371 pub path: String,
372}
373
374impl Default for ExplorerConfig {
375 fn default() -> Self {
376 Self {
377 enabled: true,
378 path: "/explore".into(),
379 }
380 }
381}
382
383#[derive(Debug, Clone, Deserialize)]
401#[serde(default, deny_unknown_fields)]
402pub struct SqlConfig {
403 pub enabled: bool,
405 pub max_rows: u64,
409}
410
411impl Default for SqlConfig {
412 fn default() -> Self {
413 Self {
414 enabled: false,
415 max_rows: 100_000,
416 }
417 }
418}
419
420#[derive(Debug, Clone, Deserialize)]
427#[serde(default, deny_unknown_fields)]
428pub struct DataFusionConfig {
429 pub pushdown_filters: bool,
435 pub reorder_filters: bool,
439 pub list_files_cache: bool,
443 pub list_files_cache_mb: usize,
446 pub list_files_cache_ttl_secs: u64,
450}
451
452impl Default for DataFusionConfig {
453 fn default() -> Self {
454 Self {
455 pushdown_filters: false,
456 reorder_filters: false,
457 list_files_cache: false,
458 list_files_cache_mb: 64,
459 list_files_cache_ttl_secs: 60,
460 }
461 }
462}
463
464#[derive(Debug, Clone, Deserialize)]
481#[serde(default, deny_unknown_fields)]
482pub struct AuthConfig {
483 pub enabled: bool,
485 pub issuer: String,
488 pub audience: String,
491 pub read_scopes: Vec<String>,
495 pub reload_scopes: Vec<String>,
498 pub anonymous_read: bool,
501 pub start_degraded: bool,
506 pub algorithms: Vec<String>,
510 pub leeway_secs: u64,
512 pub jwks_refresh_secs: u64,
516 pub tenant_claim: String,
521 pub allowed_tenants: Vec<String>,
525 pub admin_token_fallback: bool,
530}
531
532impl Default for AuthConfig {
533 fn default() -> Self {
534 Self {
535 enabled: false,
536 issuer: String::new(),
537 audience: String::new(),
538 read_scopes: Vec::new(),
539 reload_scopes: Vec::new(),
540 anonymous_read: false,
541 start_degraded: true,
542 algorithms: vec!["RS256".into()],
543 leeway_secs: 60,
544 jwks_refresh_secs: 3600,
545 tenant_claim: String::new(),
546 allowed_tenants: Vec::new(),
547 admin_token_fallback: true,
548 }
549 }
550}
551
552impl Backend {
553 pub fn as_str(self) -> &'static str {
554 match self {
555 Backend::Datafusion => "datafusion",
556 Backend::Duckdb => "duckdb",
557 }
558 }
559}
560
561#[derive(Debug, Clone, Deserialize)]
562pub struct DatasetConfig {
563 pub name: String,
564 pub source: SourceConfig,
565 #[serde(default)]
566 pub s3: Option<S3Config>,
567 #[serde(default)]
568 pub index: IndexConfig,
569 #[serde(default)]
575 pub columns: Vec<String>,
576 #[serde(default = "default_true")]
583 pub dict_encode: bool,
584 #[serde(default)]
592 pub lazy: bool,
593}
594
595fn default_true() -> bool {
596 true
597}
598
599#[derive(Debug, Clone, Deserialize)]
600pub struct SourceConfig {
601 pub kind: SourceKind,
602 pub location: String,
604}
605
606#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
607#[serde(rename_all = "lowercase")]
608pub enum SourceKind {
609 #[default]
610 Parquet,
611 Delta,
612}
613
614impl SourceKind {
615 pub fn as_str(self) -> &'static str {
616 match self {
617 SourceKind::Parquet => "parquet",
618 SourceKind::Delta => "delta",
619 }
620 }
621}
622
623#[derive(Debug, Clone, Deserialize)]
626#[serde(default)]
627pub struct S3Config {
628 pub region: Option<String>,
629 pub endpoint: Option<String>,
631 pub addressing_style: AddressingStyle,
634 pub allow_http: bool,
636 pub access_key_id: Option<String>,
639 pub secret_access_key: Option<String>,
640 pub session_token: Option<String>,
641 pub partitioning: Partitioning,
644 pub endpoint_bucket_in_host: BucketInHost,
648}
649
650impl Default for S3Config {
651 fn default() -> Self {
652 Self {
653 region: None,
654 endpoint: None,
655 addressing_style: AddressingStyle::Virtual,
656 allow_http: false,
657 access_key_id: None,
658 secret_access_key: None,
659 session_token: None,
660 partitioning: Partitioning::Auto,
661 endpoint_bucket_in_host: BucketInHost::Auto,
662 }
663 }
664}
665
666impl S3Config {
667 pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
676 let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
677
678 let fold = match self.endpoint_bucket_in_host {
679 BucketInHost::False => false,
680 BucketInHost::True => true,
681 BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
682 };
683 if !fold {
684 return Some(ep.to_string());
685 }
686
687 let (scheme, host_and_path) = match ep.split_once("://") {
688 Some((s, rest)) => (Some(s), rest),
689 None => (None, ep),
690 };
691 let (host, path) = match host_and_path.split_once('/') {
693 Some((h, p)) => (h, Some(p)),
694 None => (host_and_path, None),
695 };
696 if host == bucket || host.starts_with(&format!("{bucket}.")) {
698 return Some(ep.to_string());
699 }
700 let new_host = format!("{bucket}.{host}");
701 let rebuilt = match (scheme, path) {
702 (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
703 (Some(s), None) => format!("{s}://{new_host}"),
704 (None, Some(p)) => format!("{new_host}/{p}"),
705 (None, None) => new_host,
706 };
707 Some(rebuilt)
708 }
709}
710
711#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
712#[serde(rename_all = "lowercase")]
713pub enum AddressingStyle {
714 #[default]
715 Virtual,
716 Path,
717}
718
719impl AddressingStyle {
720 pub fn as_str(self) -> &'static str {
721 match self {
722 AddressingStyle::Virtual => "virtual",
723 AddressingStyle::Path => "path",
724 }
725 }
726}
727
728#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
732#[serde(rename_all = "lowercase")]
733pub enum Partitioning {
734 #[default]
737 Auto,
738 Hive,
741 None,
744}
745
746impl Partitioning {
747 pub fn as_str(self) -> &'static str {
748 match self {
749 Partitioning::Auto => "auto",
750 Partitioning::Hive => "hive",
751 Partitioning::None => "none",
752 }
753 }
754}
755
756#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
761#[serde(rename_all = "lowercase")]
762pub enum BucketInHost {
763 #[default]
766 Auto,
767 True,
769 False,
771}
772
773impl BucketInHost {
774 pub fn as_str(self) -> &'static str {
775 match self {
776 BucketInHost::Auto => "auto",
777 BucketInHost::True => "true",
778 BucketInHost::False => "false",
779 }
780 }
781}
782
783#[derive(Debug, Clone, Deserialize)]
784#[serde(default)]
785pub struct IndexConfig {
786 pub mode: IndexMode,
787 pub columns: Vec<String>,
788 pub max_cardinality: usize,
789}
790
791impl Default for IndexConfig {
792 fn default() -> Self {
793 Self {
794 mode: IndexMode::Auto,
795 columns: Vec::new(),
796 max_cardinality: 100_000,
797 }
798 }
799}
800
801#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
802#[serde(rename_all = "lowercase")]
803pub enum IndexMode {
804 #[default]
805 Auto,
806 None,
807 List,
808}
809
810#[derive(Debug, Clone, Default)]
813pub struct ResolvedCreds {
814 pub access_key_id: Option<String>,
815 pub secret_access_key: Option<String>,
816 pub session_token: Option<String>,
817}
818
819impl ResolvedCreds {
820 pub fn has_keypair(&self) -> bool {
821 self.access_key_id.is_some() && self.secret_access_key.is_some()
822 }
823}
824
825impl AppConfig {
830 pub fn load(path: &str) -> Result<Self, AppError> {
832 let raw = std::fs::read_to_string(path)
833 .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
834 let mut cfg: AppConfig =
835 toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
836 cfg.normalize();
837 cfg.validate()?;
838 Ok(cfg)
839 }
840
841 fn normalize(&mut self) {
849 for s in self
850 .auth
851 .read_scopes
852 .iter_mut()
853 .chain(self.auth.reload_scopes.iter_mut())
854 {
855 *s = s.to_ascii_lowercase();
856 }
857 }
858
859 fn validate(&self) -> Result<(), AppError> {
860 let p = &self.server.prefix;
862 if !p.is_empty() {
863 if !p.starts_with('/') {
864 return Err(AppError::Internal(format!(
865 "server.prefix must start with '/' (got '{p}')"
866 )));
867 }
868 if p.ends_with('/') {
869 return Err(AppError::Internal(format!(
870 "server.prefix must not end with '/' (got '{p}')"
871 )));
872 }
873 }
874
875 if self.datasets.is_empty() {
876 return Err(AppError::Internal(
877 "datasets.toml has no [[dataset]] entries".into(),
878 ));
879 }
880
881 if self.server.quack.enabled {
882 self.server.quack.validate_enabled()?;
883 }
884
885 {
888 let dp = &self.docs.path;
889 if !dp.starts_with('/') {
890 return Err(AppError::Internal(format!(
891 "docs.path must start with '/' (got '{dp}')"
892 )));
893 }
894 if dp.len() > 1 && dp.ends_with('/') {
895 return Err(AppError::Internal(format!(
896 "docs.path must not end with '/' (got '{dp}')"
897 )));
898 }
899 if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
900 return Err(AppError::Internal(format!(
901 "docs.path '{dp}' collides with a reserved route"
902 )));
903 }
904 }
905
906 {
908 let sp = &self.swagger.path;
909 if !sp.starts_with('/') {
910 return Err(AppError::Internal(format!(
911 "swagger.path must start with '/' (got '{sp}')"
912 )));
913 }
914 if sp.len() > 1 && sp.ends_with('/') {
915 return Err(AppError::Internal(format!(
916 "swagger.path must not end with '/' (got '{sp}')"
917 )));
918 }
919 if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
920 return Err(AppError::Internal(format!(
921 "swagger.path '{sp}' collides with a reserved route"
922 )));
923 }
924 if sp == &self.docs.path {
925 return Err(AppError::Internal(format!(
926 "swagger.path and docs.path must differ (both '{sp}')"
927 )));
928 }
929 if let Some(o) = &self.swagger.oauth2 {
930 if o.issuer.trim().is_empty() {
931 return Err(AppError::Internal(
932 "swagger.oauth2.issuer must not be empty".into(),
933 ));
934 }
935 if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
936 return Err(AppError::Internal(format!(
937 "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
938 o.issuer
939 )));
940 }
941 if o.client_id.trim().is_empty() {
942 return Err(AppError::Internal(
943 "swagger.oauth2.client_id must not be empty".into(),
944 ));
945 }
946 }
947 }
948
949 {
955 let mp = &self.metrics.path;
956 if !mp.starts_with('/') {
957 return Err(AppError::Internal(format!(
958 "metrics.path must start with '/' (got '{mp}')"
959 )));
960 }
961 if mp.len() > 1 && mp.ends_with('/') {
962 return Err(AppError::Internal(format!(
963 "metrics.path must not end with '/' (got '{mp}')"
964 )));
965 }
966 if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
967 return Err(AppError::Internal(format!(
968 "metrics.path '{mp}' collides with a reserved route"
969 )));
970 }
971 if mp == &self.docs.path {
972 return Err(AppError::Internal(format!(
973 "metrics.path and docs.path must differ (both '{mp}')"
974 )));
975 }
976 if mp == &self.swagger.path {
977 return Err(AppError::Internal(format!(
978 "metrics.path and swagger.path must differ (both '{mp}')"
979 )));
980 }
981 }
982
983 {
986 let ep = &self.explorer.path;
987 if !ep.starts_with('/') {
988 return Err(AppError::Internal(format!(
989 "explorer.path must start with '/' (got '{ep}')"
990 )));
991 }
992 if ep.len() > 1 && ep.ends_with('/') {
993 return Err(AppError::Internal(format!(
994 "explorer.path must not end with '/' (got '{ep}')"
995 )));
996 }
997 if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
998 return Err(AppError::Internal(format!(
999 "explorer.path '{ep}' collides with a reserved route"
1000 )));
1001 }
1002 if ep == &self.docs.path {
1003 return Err(AppError::Internal(format!(
1004 "explorer.path and docs.path must differ (both '{ep}')"
1005 )));
1006 }
1007 if ep == &self.swagger.path {
1008 return Err(AppError::Internal(format!(
1009 "explorer.path and swagger.path must differ (both '{ep}')"
1010 )));
1011 }
1012 if ep == &self.metrics.path {
1013 return Err(AppError::Internal(format!(
1014 "explorer.path and metrics.path must differ (both '{ep}')"
1015 )));
1016 }
1017 }
1018
1019 if self.auth.enabled {
1024 let a = &self.auth;
1025 if a.issuer.trim().is_empty() {
1026 return Err(AppError::Internal(
1027 "auth.issuer must not be empty when auth.enabled = true".into(),
1028 ));
1029 }
1030 if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
1031 return Err(AppError::Internal(format!(
1032 "auth.issuer must be an absolute http(s) URL (got '{}')",
1033 a.issuer
1034 )));
1035 }
1036 for alg in &a.algorithms {
1037 match alg.as_str() {
1038 "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
1039 | "PS512" => {}
1040 other => {
1041 return Err(AppError::Internal(format!(
1042 "auth.algorithms[{other}] is not allowed; pick one of \
1043 RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
1044 )));
1045 }
1046 }
1047 }
1048 if a.algorithms.is_empty() {
1049 return Err(AppError::Internal(
1050 "auth.algorithms must not be empty".into(),
1051 ));
1052 }
1053 if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
1054 return Err(AppError::Internal(format!(
1055 "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
1056 a.tenant_claim
1057 )));
1058 }
1059 if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
1060 return Err(AppError::Internal(
1061 "auth.allowed_tenants is set but auth.tenant_claim is empty — \
1062 can't enforce a tenant allow-list without a claim to extract from"
1063 .into(),
1064 ));
1065 }
1066 }
1067
1068 let mut seen = HashSet::new();
1069 for d in &self.datasets {
1070 if !seen.insert(d.name.as_str()) {
1071 return Err(AppError::Internal(format!(
1072 "duplicate dataset name: {}",
1073 d.name
1074 )));
1075 }
1076 if d.name.is_empty() {
1077 return Err(AppError::Internal("dataset name must not be empty".into()));
1078 }
1079 if !d
1081 .name
1082 .chars()
1083 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1084 {
1085 return Err(AppError::Internal(format!(
1086 "dataset name '{}' must be alphanumeric (plus _ - .)",
1087 d.name
1088 )));
1089 }
1090
1091 if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
1092 return Err(AppError::Internal(format!(
1093 "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1094 d.name
1095 )));
1096 }
1097
1098 if d.source.is_s3() {
1100 d.source.s3_bucket()?;
1101 if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1102 && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1103 && std::env::var("AWS_REGION").is_err()
1104 && std::env::var("AWS_DEFAULT_REGION").is_err()
1105 {
1106 log::warn!(
1107 "dataset '{}': S3 source without explicit region — \
1108 relying on AWS_REGION env var",
1109 d.name
1110 );
1111 }
1112 } else {
1113 match d.source.kind {
1117 SourceKind::Parquet => {
1118 d.resolve_local_parquet_files()?;
1119 }
1120 SourceKind::Delta => {
1121 let p = Path::new(&d.source.location);
1122 if !p.exists() {
1123 return Err(AppError::Internal(format!(
1124 "dataset '{}': delta location does not exist: {}",
1125 d.name, d.source.location
1126 )));
1127 }
1128 }
1129 }
1130 }
1131 }
1132 Ok(())
1133 }
1134}
1135
1136impl SourceConfig {
1137 pub fn is_s3(&self) -> bool {
1138 self.location.starts_with("s3://")
1139 }
1140
1141 pub fn has_glob(&self) -> bool {
1144 self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1145 }
1146
1147 pub fn s3_recursive_parquet_glob(&self) -> String {
1153 if !self.is_s3() || self.has_glob() {
1154 return self.location.clone();
1155 }
1156 let trimmed = self.location.trim_end_matches('/');
1157 format!("{trimmed}/**/*.parquet")
1158 }
1159
1160 pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1162 let rest = self
1163 .location
1164 .strip_prefix("s3://")
1165 .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1166 let (bucket, key) = match rest.split_once('/') {
1167 Some((b, k)) => (b, k),
1168 None => (rest, ""),
1169 };
1170 if bucket.is_empty() {
1171 return Err(AppError::Internal(format!(
1172 "s3 URL missing bucket: {}",
1173 self.location
1174 )));
1175 }
1176 Ok((bucket, key))
1177 }
1178}
1179
1180impl DatasetConfig {
1181 pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1191 if self.source.is_s3() {
1192 return Err(AppError::Internal(format!(
1193 "dataset '{}': resolve_local_parquet_files called on s3 source",
1194 self.name
1195 )));
1196 }
1197 let loc = &self.source.location;
1198
1199 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1201 let mut files: Vec<PathBuf> = glob::glob(loc)
1202 .map_err(|e| {
1203 AppError::Internal(format!(
1204 "dataset '{}': bad glob pattern '{loc}': {e}",
1205 self.name
1206 ))
1207 })?
1208 .filter_map(|r| r.ok())
1209 .filter(|p| {
1210 p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1211 })
1212 .collect();
1213 files.sort();
1214 if files.is_empty() {
1215 return Err(AppError::Internal(format!(
1216 "dataset '{}': glob '{loc}' matched no .parquet files",
1217 self.name
1218 )));
1219 }
1220 return Ok(files);
1221 }
1222
1223 let path = Path::new(loc);
1224 if !path.exists() {
1225 return Err(AppError::Internal(format!(
1226 "dataset '{}': source path does not exist: {loc}",
1227 self.name
1228 )));
1229 }
1230
1231 if path.is_file() {
1232 if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1233 return Err(AppError::Internal(format!(
1234 "dataset '{}': source must be a .parquet file",
1235 self.name
1236 )));
1237 }
1238 return Ok(vec![path.to_path_buf()]);
1239 }
1240
1241 let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1242 .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1243 .filter_map(|entry| entry.ok().map(|e| e.path()))
1244 .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1245 .collect();
1246 files.sort();
1247 if files.is_empty() {
1248 return Err(AppError::Internal(format!(
1249 "dataset '{}': no *.parquet files found in {loc}",
1250 self.name
1251 )));
1252 }
1253 Ok(files)
1254 }
1255
1256 pub fn estimate_local_bytes(&self) -> Option<u64> {
1266 if self.source.is_s3() {
1267 return None;
1268 }
1269 match self.source.kind {
1270 SourceKind::Parquet => {
1271 let files = self.resolve_local_parquet_files().ok()?;
1272 Some(
1273 files
1274 .iter()
1275 .filter_map(|p| std::fs::metadata(p).ok())
1276 .map(|m| m.len())
1277 .sum(),
1278 )
1279 }
1280 SourceKind::Delta => {
1281 let root = self.source.location.trim_end_matches('/');
1282 let pattern = format!("{root}/**/*.parquet");
1283 let paths = glob::glob(&pattern).ok()?;
1284 Some(
1285 paths
1286 .filter_map(Result::ok)
1287 .filter_map(|p| std::fs::metadata(&p).ok())
1288 .filter(|m| m.is_file())
1289 .map(|m| m.len())
1290 .sum(),
1291 )
1292 }
1293 }
1294 }
1295
1296 pub fn force_lazy_bytes(&self, server: &ServerConfig) -> Option<u64> {
1303 if self.lazy || server.force_lazy_above_mb == 0 {
1304 return None;
1305 }
1306 let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1307 match self.estimate_local_bytes() {
1308 Some(bytes) if bytes > threshold => Some(bytes),
1309 _ => None,
1310 }
1311 }
1312
1313 pub fn env_prefix(&self) -> String {
1317 self.name
1318 .chars()
1319 .map(|c| {
1320 if c.is_ascii_alphanumeric() {
1321 c.to_ascii_uppercase()
1322 } else {
1323 '_'
1324 }
1325 })
1326 .collect()
1327 }
1328
1329 pub fn resolved_creds(&self) -> ResolvedCreds {
1334 let prefix = self.env_prefix();
1335 let from_env = |suffix: &str| {
1336 std::env::var(format!("{prefix}_{suffix}"))
1337 .ok()
1338 .filter(|s| !s.is_empty())
1339 };
1340 let inline = self.s3.as_ref();
1341 let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1342
1343 ResolvedCreds {
1344 access_key_id: from_env("AWS_ACCESS_KEY_ID")
1345 .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1346 .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1347 secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1348 .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1349 .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1350 session_token: from_env("AWS_SESSION_TOKEN")
1351 .or_else(|| inline.and_then(|s| s.session_token.clone()))
1352 .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1353 }
1354 }
1355
1356 pub fn resolved_region(&self) -> String {
1359 let prefix = self.env_prefix();
1360 std::env::var(format!("{prefix}_AWS_REGION"))
1361 .ok()
1362 .filter(|s| !s.is_empty())
1363 .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1364 .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1365 .or_else(|| {
1366 std::env::var("AWS_DEFAULT_REGION")
1367 .ok()
1368 .filter(|s| !s.is_empty())
1369 })
1370 .unwrap_or_else(|| "us-east-1".to_string())
1371 }
1372}
1373
1374#[cfg(test)]
1375mod tests {
1376 use super::*;
1377
1378 #[test]
1379 fn server_defaults() {
1380 let s = ServerConfig::default();
1381 assert_eq!(s.backend, Backend::Datafusion);
1382 assert_eq!(s.port, 8080);
1383 assert!(s.compress);
1384 assert_eq!(s.max_body_bytes, 1024 * 1024);
1385 assert_eq!(s.max_page_size, 100_000);
1386 assert_eq!(s.force_lazy_above_mb, 0);
1387 assert_eq!(s.request_timeout_ms, 30_000);
1388 assert!(!s.quack.enabled);
1389 assert_eq!(s.quack.uri, "quack:localhost");
1390 assert!(s.quack.token.is_none());
1391 assert!(!s.quack.allow_other_hostname);
1392 assert!(s.quack.read_only);
1393 assert_eq!(s.prefix, "");
1394 assert!(s.listen.is_loopback());
1395 }
1396
1397 #[test]
1398 fn server_overrides_from_toml() {
1399 let toml = r#"
1400 [server]
1401 backend = "duckdb"
1402 port = 9000
1403 prefix = "/datapress"
1404 compress = false
1405 max_body_bytes = 4096
1406 max_page_size = 50000
1407 force_lazy_above_mb = 256
1408 request_timeout_ms = 0
1409
1410 [server.quack]
1411 enabled = true
1412 uri = "quack:localhost:9495"
1413 token = "test-token"
1414 read_only = false
1415 [[dataset]]
1416 name = "x"
1417 source.kind = "parquet"
1418 source.location = "/tmp/missing.parquet"
1419 "#;
1420 let cfg: AppConfig = toml::from_str(toml).unwrap();
1421 assert_eq!(cfg.server.backend, Backend::Duckdb);
1422 assert_eq!(cfg.server.port, 9000);
1423 assert_eq!(cfg.server.prefix, "/datapress");
1424 assert!(!cfg.server.compress);
1425 assert_eq!(cfg.server.max_body_bytes, 4096);
1426 assert_eq!(cfg.server.max_page_size, 50_000);
1427 assert_eq!(cfg.server.force_lazy_above_mb, 256);
1428 assert_eq!(cfg.server.request_timeout_ms, 0);
1429 assert!(cfg.server.quack.enabled);
1430 assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1431 assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1432 assert!(!cfg.server.quack.read_only);
1433 assert_eq!(cfg.datasets.len(), 1);
1434 assert_eq!(cfg.datasets[0].name, "x");
1435 assert!(cfg.datasets[0].dict_encode); }
1437
1438 #[test]
1439 fn force_lazy_bytes_logic() {
1440 let dir = std::env::temp_dir().join(format!(
1443 "dp-force-lazy-{}-{}",
1444 std::process::id(),
1445 std::time::SystemTime::now()
1446 .duration_since(std::time::UNIX_EPOCH)
1447 .unwrap()
1448 .as_nanos()
1449 ));
1450 std::fs::create_dir_all(&dir).unwrap();
1451 let two_mib = 2 * 1024 * 1024;
1452 let file = dir.join("data.parquet");
1453 std::fs::write(&file, vec![0u8; two_mib]).unwrap();
1454
1455 let mk = |kind: SourceKind, location: &str, lazy: bool| DatasetConfig {
1456 name: "t".into(),
1457 source: SourceConfig {
1458 kind,
1459 location: location.to_string(),
1460 },
1461 s3: None,
1462 index: IndexConfig::default(),
1463 columns: vec![],
1464 dict_encode: true,
1465 lazy,
1466 };
1467 let server = |mb: u64| ServerConfig {
1468 force_lazy_above_mb: mb,
1469 ..ServerConfig::default()
1470 };
1471
1472 let file_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), false);
1474 assert_eq!(file_ds.estimate_local_bytes(), Some(two_mib as u64));
1475 let dir_ds = mk(SourceKind::Parquet, dir.to_str().unwrap(), false);
1476 assert_eq!(dir_ds.estimate_local_bytes(), Some(two_mib as u64));
1477 let delta_ds = mk(SourceKind::Delta, dir.to_str().unwrap(), false);
1478 assert_eq!(delta_ds.estimate_local_bytes(), Some(two_mib as u64));
1479
1480 assert_eq!(file_ds.force_lazy_bytes(&server(0)), None);
1482 assert_eq!(file_ds.force_lazy_bytes(&server(1)), Some(two_mib as u64));
1484 assert_eq!(file_ds.force_lazy_bytes(&server(4)), None);
1486
1487 let lazy_ds = mk(SourceKind::Parquet, file.to_str().unwrap(), true);
1489 assert_eq!(lazy_ds.force_lazy_bytes(&server(1)), None);
1490
1491 let s3_ds = mk(SourceKind::Parquet, "s3://bucket/data.parquet", false);
1493 assert_eq!(s3_ds.estimate_local_bytes(), None);
1494 assert_eq!(s3_ds.force_lazy_bytes(&server(1)), None);
1495
1496 std::fs::remove_dir_all(&dir).ok();
1497 }
1498
1499 #[test]
1500 fn validate_rejects_bad_prefix() {
1501 let bad = ["no-leading-slash", "/trailing/"];
1502 for p in bad {
1503 let cfg = AppConfig {
1504 server: ServerConfig {
1505 prefix: p.to_string(),
1506 ..Default::default()
1507 },
1508 docs: DocsConfig::default(),
1509 swagger: SwaggerConfig::default(),
1510 metrics: MetricsConfig::default(),
1511 explorer: ExplorerConfig::default(),
1512 sql: SqlConfig::default(),
1513 datafusion: DataFusionConfig::default(),
1514 auth: AuthConfig::default(),
1515 datasets: vec![],
1516 };
1517 assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1518 }
1519 }
1520
1521 #[test]
1522 fn normalize_lowercases_configured_scopes() {
1523 let mut cfg = AppConfig {
1524 server: ServerConfig::default(),
1525 docs: DocsConfig::default(),
1526 swagger: SwaggerConfig::default(),
1527 metrics: MetricsConfig::default(),
1528 explorer: ExplorerConfig::default(),
1529 sql: SqlConfig::default(),
1530 datafusion: DataFusionConfig::default(),
1531 auth: AuthConfig {
1532 read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1533 reload_scopes: vec!["Datasets:Reload".into()],
1534 ..Default::default()
1535 },
1536 datasets: vec![],
1537 };
1538 cfg.normalize();
1539 assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1540 assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1541 }
1542
1543 #[test]
1544 fn validate_rejects_no_datasets() {
1545 let cfg = AppConfig {
1546 server: ServerConfig::default(),
1547 docs: DocsConfig::default(),
1548 swagger: SwaggerConfig::default(),
1549 metrics: MetricsConfig::default(),
1550 explorer: ExplorerConfig::default(),
1551 sql: SqlConfig::default(),
1552 datafusion: DataFusionConfig::default(),
1553 auth: AuthConfig::default(),
1554 datasets: vec![],
1555 };
1556 let err = cfg.validate().unwrap_err();
1557 assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1558 }
1559
1560 #[cfg(feature = "auth")]
1561 #[test]
1562 fn validate_accepts_auth_issuer_with_trailing_slash() {
1563 use std::io::Write;
1564
1565 let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1566 let _ = std::fs::remove_dir_all(&dir);
1567 std::fs::create_dir_all(&dir).unwrap();
1568 let file = dir.join("a.parquet");
1569 std::fs::File::create(&file)
1570 .unwrap()
1571 .write_all(b"x")
1572 .unwrap();
1573
1574 let cfg = AppConfig {
1575 server: ServerConfig::default(),
1576 docs: DocsConfig::default(),
1577 swagger: SwaggerConfig::default(),
1578 metrics: MetricsConfig::default(),
1579 explorer: ExplorerConfig::default(),
1580 sql: SqlConfig::default(),
1581 datafusion: DataFusionConfig::default(),
1582 auth: AuthConfig {
1583 enabled: true,
1584 issuer: "https://tenant.example.com/".into(),
1585 ..Default::default()
1586 },
1587 datasets: vec![DatasetConfig {
1588 name: "x".into(),
1589 source: SourceConfig {
1590 kind: SourceKind::Parquet,
1591 location: file.to_string_lossy().into_owned(),
1592 },
1593 s3: None,
1594 index: IndexConfig::default(),
1595 columns: vec![],
1596 dict_encode: true,
1597 lazy: false,
1598 }],
1599 };
1600
1601 assert!(cfg.validate().is_ok());
1602 let _ = std::fs::remove_dir_all(&dir);
1603 }
1604
1605 #[test]
1606 fn validate_rejects_quack_non_local_host_without_override() {
1607 let cfg = AppConfig {
1608 server: ServerConfig {
1609 quack: QuackConfig {
1610 enabled: true,
1611 uri: "quack:127.0.0.1".into(),
1612 token: Some("test-token".into()),
1613 ..Default::default()
1614 },
1615 ..Default::default()
1616 },
1617 docs: DocsConfig::default(),
1618 swagger: SwaggerConfig::default(),
1619 metrics: MetricsConfig::default(),
1620 explorer: ExplorerConfig::default(),
1621 sql: SqlConfig::default(),
1622 datafusion: DataFusionConfig::default(),
1623 auth: AuthConfig::default(),
1624 datasets: vec![DatasetConfig {
1625 name: "x".into(),
1626 source: SourceConfig {
1627 kind: SourceKind::Parquet,
1628 location: "/tmp/missing.parquet".into(),
1629 },
1630 s3: None,
1631 index: IndexConfig::default(),
1632 columns: vec![],
1633 dict_encode: true,
1634 lazy: false,
1635 }],
1636 };
1637 let err = cfg.validate().unwrap_err();
1638 assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1639 }
1640
1641 #[test]
1642 fn validate_rejects_bad_dataset_name() {
1643 let cfg: AppConfig = toml::from_str(
1644 r#"
1645 [[dataset]]
1646 name = "bad name!"
1647 source.kind = "parquet"
1648 source.location = "/tmp/whatever"
1649 "#,
1650 )
1651 .unwrap();
1652 let err = cfg.validate().unwrap_err();
1653 assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1654 }
1655
1656 #[test]
1657 fn validate_rejects_duplicate_names() {
1658 use std::io::Write;
1659 let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1660 let _ = std::fs::remove_dir_all(&dir);
1661 std::fs::create_dir_all(&dir).unwrap();
1662 let f = dir.join("a.parquet");
1663 std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1664 let path = f.to_str().unwrap();
1665
1666 let cfg: AppConfig = toml::from_str(&format!(
1667 r#"
1668 [[dataset]]
1669 name = "a"
1670 source.kind = "parquet"
1671 source.location = "{path}"
1672 [[dataset]]
1673 name = "a"
1674 source.kind = "parquet"
1675 source.location = "{path}"
1676 "#
1677 ))
1678 .unwrap();
1679 let err = cfg.validate().expect_err("expected error");
1680 assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1681
1682 let _ = std::fs::remove_dir_all(&dir);
1683 }
1684
1685 #[test]
1686 fn s3_bucket_parsing() {
1687 let mk = |loc: &str| SourceConfig {
1688 kind: SourceKind::Parquet,
1689 location: loc.into(),
1690 };
1691 let s1 = mk("s3://bucket/path/key");
1692 assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1693 let s2 = mk("s3://only-bucket");
1694 assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1695 assert!(mk("s3:///nokey").s3_bucket().is_err());
1696 assert!(mk("/local/path").s3_bucket().is_err());
1697 }
1698
1699 #[test]
1700 fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1701 let mk = |loc: &str| SourceConfig {
1702 kind: SourceKind::Parquet,
1703 location: loc.into(),
1704 };
1705 assert_eq!(
1707 mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1708 "s3://bucket/logs/**/*.parquet"
1709 );
1710 assert_eq!(
1711 mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1712 "s3://bucket/logs/**/*.parquet"
1713 );
1714 assert_eq!(
1716 mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1717 "s3://bucket/logs/*.parquet"
1718 );
1719 assert_eq!(
1721 mk("/local/logs").s3_recursive_parquet_glob(),
1722 "/local/logs"
1723 );
1724 }
1725
1726 #[test]
1727 fn effective_endpoint_folds_bucket_per_mode() {
1728 let virt = S3Config {
1729 endpoint: Some("https://s3.example.com".into()),
1730 addressing_style: AddressingStyle::Virtual,
1731 ..Default::default()
1732 };
1733 assert_eq!(
1735 virt.effective_endpoint("mybucket").as_deref(),
1736 Some("https://mybucket.s3.example.com")
1737 );
1738 let prefixed = S3Config {
1740 endpoint: Some("https://mybucket.s3.example.com".into()),
1741 ..virt.clone()
1742 };
1743 assert_eq!(
1744 prefixed.effective_endpoint("mybucket").as_deref(),
1745 Some("https://mybucket.s3.example.com")
1746 );
1747 let path = S3Config {
1749 addressing_style: AddressingStyle::Path,
1750 ..virt.clone()
1751 };
1752 assert_eq!(
1753 path.effective_endpoint("mybucket").as_deref(),
1754 Some("https://s3.example.com")
1755 );
1756 let forced_off = S3Config {
1758 endpoint_bucket_in_host: BucketInHost::False,
1759 ..virt.clone()
1760 };
1761 assert_eq!(
1762 forced_off.effective_endpoint("mybucket").as_deref(),
1763 Some("https://s3.example.com")
1764 );
1765 let forced_on = S3Config {
1766 endpoint_bucket_in_host: BucketInHost::True,
1767 ..path.clone()
1768 };
1769 assert_eq!(
1770 forced_on.effective_endpoint("mybucket").as_deref(),
1771 Some("https://mybucket.s3.example.com")
1772 );
1773 assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1775 }
1776
1777 #[test]
1778 fn env_prefix_sanitises_name() {
1779 let mk = |name: &str| DatasetConfig {
1780 name: name.into(),
1781 source: SourceConfig {
1782 kind: SourceKind::Parquet,
1783 location: "x".into(),
1784 },
1785 s3: None,
1786 index: IndexConfig::default(),
1787 columns: vec![],
1788 dict_encode: true,
1789 lazy: false,
1790 };
1791 assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1792 assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1793 assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1794 }
1795
1796 #[test]
1797 fn resolve_local_parquet_single_file_and_dir() {
1798 use std::io::Write;
1799 let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1800 let _ = std::fs::remove_dir_all(&dir);
1801 std::fs::create_dir_all(&dir).unwrap();
1802 let f = dir.join("a.parquet");
1803 let mut fh = std::fs::File::create(&f).unwrap();
1804 fh.write_all(b"not really parquet").unwrap();
1805
1806 let mk = |loc: &str| DatasetConfig {
1807 name: "ds".into(),
1808 source: SourceConfig {
1809 kind: SourceKind::Parquet,
1810 location: loc.into(),
1811 },
1812 s3: None,
1813 index: IndexConfig::default(),
1814 columns: vec![],
1815 dict_encode: true,
1816 lazy: false,
1817 };
1818
1819 let files = mk(f.to_str().unwrap())
1821 .resolve_local_parquet_files()
1822 .unwrap();
1823 assert_eq!(files, vec![f.clone()]);
1824
1825 let files = mk(dir.to_str().unwrap())
1827 .resolve_local_parquet_files()
1828 .unwrap();
1829 assert_eq!(files, vec![f.clone()]);
1830
1831 assert!(
1833 mk("/no/such/place.parquet")
1834 .resolve_local_parquet_files()
1835 .is_err()
1836 );
1837
1838 let _ = std::fs::remove_dir_all(&dir);
1839 }
1840}