1use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32const RESERVED_MOUNTS: &[&str] = &[
36 "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45 #[serde(default)]
46 pub server: ServerConfig,
47 #[serde(default)]
48 pub docs: DocsConfig,
49 #[serde(default)]
50 pub swagger: SwaggerConfig,
51 #[serde(default)]
52 pub metrics: MetricsConfig,
53 #[serde(default)]
54 pub explorer: ExplorerConfig,
55 #[serde(default)]
56 pub sql: SqlConfig,
57 #[serde(default)]
58 pub auth: AuthConfig,
59 #[serde(rename = "dataset", default)]
60 pub datasets: Vec<DatasetConfig>,
61}
62
63#[derive(Debug, Deserialize)]
64#[serde(default)]
65pub struct ServerConfig {
66 pub backend: Backend,
68 pub listen: IpAddr,
71 pub port: u16,
73 pub workers: Option<usize>,
75 pub prefix: String,
81 pub compress: bool,
86 pub max_body_bytes: usize,
91 pub max_page_size: u64,
95 pub request_timeout_ms: u64,
99 pub shutdown_timeout_secs: u64,
104 pub quack: QuackConfig,
107}
108
109impl Default for ServerConfig {
110 fn default() -> Self {
111 Self {
112 backend: Backend::default(),
113 listen: IpAddr::from([127, 0, 0, 1]),
114 port: 8080,
115 workers: None,
116 prefix: String::new(),
117 compress: true,
118 max_body_bytes: 1024 * 1024,
119 max_page_size: 100_000,
120 request_timeout_ms: 30_000,
121 shutdown_timeout_secs: 30,
122 quack: QuackConfig::default(),
123 }
124 }
125}
126
127#[derive(Debug, Clone, Deserialize)]
133#[serde(default)]
134pub struct QuackConfig {
135 pub enabled: bool,
138 pub uri: String,
141 pub token: Option<String>,
144 pub allow_other_hostname: bool,
147 pub read_only: bool,
150}
151
152impl Default for QuackConfig {
153 fn default() -> Self {
154 Self {
155 enabled: false,
156 uri: "quack:localhost".into(),
157 token: None,
158 allow_other_hostname: false,
159 read_only: true,
160 }
161 }
162}
163
164impl QuackConfig {
165 pub fn validate_enabled(&self) -> Result<(), AppError> {
169 if self.uri.trim().is_empty() {
170 return Err(AppError::Internal(
171 "server.quack.uri must not be empty when server.quack.enabled = true".into(),
172 ));
173 }
174 if !self.uri.starts_with("quack:") {
175 return Err(AppError::Internal(format!(
176 "server.quack.uri must start with 'quack:' (got '{}')",
177 self.uri
178 )));
179 }
180 if !self.allow_other_hostname {
181 let host = self.hostname().unwrap_or_default();
182 if host != "localhost" {
183 return Err(AppError::Internal(format!(
184 "server.quack.uri host must be 'localhost' unless \
185 server.quack.allow_other_hostname = true (got '{}')",
186 self.uri
187 )));
188 }
189 }
190 if let Some(token) = self.token.as_deref()
191 && token.len() < 4
192 {
193 return Err(AppError::Internal(
194 "server.quack.token must be at least 4 characters".into(),
195 ));
196 }
197 Ok(())
198 }
199
200 fn hostname(&self) -> Option<&str> {
201 let rest = self.uri.strip_prefix("quack:")?;
202 let rest = rest.strip_prefix("//").unwrap_or(rest);
203 let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
204 (!host.is_empty()).then_some(host)
205 }
206}
207
208#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
209#[serde(rename_all = "lowercase")]
210pub enum Backend {
211 #[default]
212 Datafusion,
213 Duckdb,
214}
215
216#[derive(Debug, Clone, Deserialize)]
226#[serde(default, deny_unknown_fields)]
227pub struct DocsConfig {
228 pub enabled: bool,
229 pub path: String,
230}
231
232impl Default for DocsConfig {
233 fn default() -> Self {
234 Self {
235 enabled: true,
236 path: "/mkdocs".into(),
237 }
238 }
239}
240
241#[derive(Debug, Clone, Deserialize)]
258#[serde(default, deny_unknown_fields)]
259pub struct SwaggerConfig {
260 pub enabled: bool,
261 pub path: String,
262 pub oauth2: Option<SwaggerOAuth2Config>,
263}
264
265impl Default for SwaggerConfig {
266 fn default() -> Self {
267 Self {
268 enabled: true,
269 path: "/docs".into(),
270 oauth2: None,
271 }
272 }
273}
274
275#[derive(Debug, Clone, Deserialize)]
285#[serde(deny_unknown_fields)]
286pub struct SwaggerOAuth2Config {
287 pub issuer: String,
291 pub client_id: String,
296 #[serde(default)]
300 pub scopes: Vec<String>,
301 #[serde(default = "default_true")]
304 pub pkce: bool,
305}
306
307#[derive(Debug, Clone, Deserialize)]
327#[serde(default, deny_unknown_fields)]
328pub struct MetricsConfig {
329 pub enabled: bool,
330 pub path: String,
331}
332
333impl Default for MetricsConfig {
334 fn default() -> Self {
335 Self {
336 enabled: false,
337 path: "/metrics".into(),
338 }
339 }
340}
341
342#[derive(Debug, Clone, Deserialize)]
355#[serde(default, deny_unknown_fields)]
356pub struct ExplorerConfig {
357 pub enabled: bool,
358 pub path: String,
359}
360
361impl Default for ExplorerConfig {
362 fn default() -> Self {
363 Self {
364 enabled: true,
365 path: "/explore".into(),
366 }
367 }
368}
369
370#[derive(Debug, Clone, Deserialize)]
388#[serde(default, deny_unknown_fields)]
389pub struct SqlConfig {
390 pub enabled: bool,
392 pub max_rows: u64,
396}
397
398impl Default for SqlConfig {
399 fn default() -> Self {
400 Self {
401 enabled: false,
402 max_rows: 100_000,
403 }
404 }
405}
406
407#[derive(Debug, Clone, Deserialize)]
424#[serde(default, deny_unknown_fields)]
425pub struct AuthConfig {
426 pub enabled: bool,
428 pub issuer: String,
431 pub audience: String,
434 pub read_scopes: Vec<String>,
438 pub reload_scopes: Vec<String>,
441 pub anonymous_read: bool,
444 pub start_degraded: bool,
449 pub algorithms: Vec<String>,
453 pub leeway_secs: u64,
455 pub jwks_refresh_secs: u64,
459 pub tenant_claim: String,
464 pub allowed_tenants: Vec<String>,
468 pub admin_token_fallback: bool,
473}
474
475impl Default for AuthConfig {
476 fn default() -> Self {
477 Self {
478 enabled: false,
479 issuer: String::new(),
480 audience: String::new(),
481 read_scopes: Vec::new(),
482 reload_scopes: Vec::new(),
483 anonymous_read: false,
484 start_degraded: true,
485 algorithms: vec!["RS256".into()],
486 leeway_secs: 60,
487 jwks_refresh_secs: 3600,
488 tenant_claim: String::new(),
489 allowed_tenants: Vec::new(),
490 admin_token_fallback: true,
491 }
492 }
493}
494
495impl Backend {
496 pub fn as_str(self) -> &'static str {
497 match self {
498 Backend::Datafusion => "datafusion",
499 Backend::Duckdb => "duckdb",
500 }
501 }
502}
503
504#[derive(Debug, Clone, Deserialize)]
505pub struct DatasetConfig {
506 pub name: String,
507 pub source: SourceConfig,
508 #[serde(default)]
509 pub s3: Option<S3Config>,
510 #[serde(default)]
511 pub index: IndexConfig,
512 #[serde(default)]
518 pub columns: Vec<String>,
519 #[serde(default = "default_true")]
526 pub dict_encode: bool,
527 #[serde(default)]
535 pub lazy: bool,
536}
537
538fn default_true() -> bool {
539 true
540}
541
542#[derive(Debug, Clone, Deserialize)]
543pub struct SourceConfig {
544 pub kind: SourceKind,
545 pub location: String,
547}
548
549#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
550#[serde(rename_all = "lowercase")]
551pub enum SourceKind {
552 #[default]
553 Parquet,
554 Delta,
555}
556
557impl SourceKind {
558 pub fn as_str(self) -> &'static str {
559 match self {
560 SourceKind::Parquet => "parquet",
561 SourceKind::Delta => "delta",
562 }
563 }
564}
565
566#[derive(Debug, Clone, Deserialize)]
569#[serde(default)]
570pub struct S3Config {
571 pub region: Option<String>,
572 pub endpoint: Option<String>,
574 pub addressing_style: AddressingStyle,
577 pub allow_http: bool,
579 pub access_key_id: Option<String>,
582 pub secret_access_key: Option<String>,
583 pub session_token: Option<String>,
584 pub partitioning: Partitioning,
587 pub endpoint_bucket_in_host: BucketInHost,
591}
592
593impl Default for S3Config {
594 fn default() -> Self {
595 Self {
596 region: None,
597 endpoint: None,
598 addressing_style: AddressingStyle::Virtual,
599 allow_http: false,
600 access_key_id: None,
601 secret_access_key: None,
602 session_token: None,
603 partitioning: Partitioning::Auto,
604 endpoint_bucket_in_host: BucketInHost::Auto,
605 }
606 }
607}
608
609impl S3Config {
610 pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
619 let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
620
621 let fold = match self.endpoint_bucket_in_host {
622 BucketInHost::False => false,
623 BucketInHost::True => true,
624 BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
625 };
626 if !fold {
627 return Some(ep.to_string());
628 }
629
630 let (scheme, host_and_path) = match ep.split_once("://") {
631 Some((s, rest)) => (Some(s), rest),
632 None => (None, ep),
633 };
634 let (host, path) = match host_and_path.split_once('/') {
636 Some((h, p)) => (h, Some(p)),
637 None => (host_and_path, None),
638 };
639 if host == bucket || host.starts_with(&format!("{bucket}.")) {
641 return Some(ep.to_string());
642 }
643 let new_host = format!("{bucket}.{host}");
644 let rebuilt = match (scheme, path) {
645 (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
646 (Some(s), None) => format!("{s}://{new_host}"),
647 (None, Some(p)) => format!("{new_host}/{p}"),
648 (None, None) => new_host,
649 };
650 Some(rebuilt)
651 }
652}
653
654#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
655#[serde(rename_all = "lowercase")]
656pub enum AddressingStyle {
657 #[default]
658 Virtual,
659 Path,
660}
661
662impl AddressingStyle {
663 pub fn as_str(self) -> &'static str {
664 match self {
665 AddressingStyle::Virtual => "virtual",
666 AddressingStyle::Path => "path",
667 }
668 }
669}
670
671#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
675#[serde(rename_all = "lowercase")]
676pub enum Partitioning {
677 #[default]
680 Auto,
681 Hive,
684 None,
687}
688
689impl Partitioning {
690 pub fn as_str(self) -> &'static str {
691 match self {
692 Partitioning::Auto => "auto",
693 Partitioning::Hive => "hive",
694 Partitioning::None => "none",
695 }
696 }
697}
698
699#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
704#[serde(rename_all = "lowercase")]
705pub enum BucketInHost {
706 #[default]
709 Auto,
710 True,
712 False,
714}
715
716impl BucketInHost {
717 pub fn as_str(self) -> &'static str {
718 match self {
719 BucketInHost::Auto => "auto",
720 BucketInHost::True => "true",
721 BucketInHost::False => "false",
722 }
723 }
724}
725
726#[derive(Debug, Clone, Deserialize)]
727#[serde(default)]
728pub struct IndexConfig {
729 pub mode: IndexMode,
730 pub columns: Vec<String>,
731 pub max_cardinality: usize,
732}
733
734impl Default for IndexConfig {
735 fn default() -> Self {
736 Self {
737 mode: IndexMode::Auto,
738 columns: Vec::new(),
739 max_cardinality: 100_000,
740 }
741 }
742}
743
744#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
745#[serde(rename_all = "lowercase")]
746pub enum IndexMode {
747 #[default]
748 Auto,
749 None,
750 List,
751}
752
753#[derive(Debug, Clone, Default)]
756pub struct ResolvedCreds {
757 pub access_key_id: Option<String>,
758 pub secret_access_key: Option<String>,
759 pub session_token: Option<String>,
760}
761
762impl ResolvedCreds {
763 pub fn has_keypair(&self) -> bool {
764 self.access_key_id.is_some() && self.secret_access_key.is_some()
765 }
766}
767
768impl AppConfig {
773 pub fn load(path: &str) -> Result<Self, AppError> {
775 let raw = std::fs::read_to_string(path)
776 .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
777 let mut cfg: AppConfig =
778 toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
779 cfg.normalize();
780 cfg.validate()?;
781 Ok(cfg)
782 }
783
784 fn normalize(&mut self) {
792 for s in self
793 .auth
794 .read_scopes
795 .iter_mut()
796 .chain(self.auth.reload_scopes.iter_mut())
797 {
798 *s = s.to_ascii_lowercase();
799 }
800 }
801
802 fn validate(&self) -> Result<(), AppError> {
803 let p = &self.server.prefix;
805 if !p.is_empty() {
806 if !p.starts_with('/') {
807 return Err(AppError::Internal(format!(
808 "server.prefix must start with '/' (got '{p}')"
809 )));
810 }
811 if p.ends_with('/') {
812 return Err(AppError::Internal(format!(
813 "server.prefix must not end with '/' (got '{p}')"
814 )));
815 }
816 }
817
818 if self.datasets.is_empty() {
819 return Err(AppError::Internal(
820 "datasets.toml has no [[dataset]] entries".into(),
821 ));
822 }
823
824 if self.server.quack.enabled {
825 self.server.quack.validate_enabled()?;
826 }
827
828 {
831 let dp = &self.docs.path;
832 if !dp.starts_with('/') {
833 return Err(AppError::Internal(format!(
834 "docs.path must start with '/' (got '{dp}')"
835 )));
836 }
837 if dp.len() > 1 && dp.ends_with('/') {
838 return Err(AppError::Internal(format!(
839 "docs.path must not end with '/' (got '{dp}')"
840 )));
841 }
842 if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
843 return Err(AppError::Internal(format!(
844 "docs.path '{dp}' collides with a reserved route"
845 )));
846 }
847 }
848
849 {
851 let sp = &self.swagger.path;
852 if !sp.starts_with('/') {
853 return Err(AppError::Internal(format!(
854 "swagger.path must start with '/' (got '{sp}')"
855 )));
856 }
857 if sp.len() > 1 && sp.ends_with('/') {
858 return Err(AppError::Internal(format!(
859 "swagger.path must not end with '/' (got '{sp}')"
860 )));
861 }
862 if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
863 return Err(AppError::Internal(format!(
864 "swagger.path '{sp}' collides with a reserved route"
865 )));
866 }
867 if sp == &self.docs.path {
868 return Err(AppError::Internal(format!(
869 "swagger.path and docs.path must differ (both '{sp}')"
870 )));
871 }
872 if let Some(o) = &self.swagger.oauth2 {
873 if o.issuer.trim().is_empty() {
874 return Err(AppError::Internal(
875 "swagger.oauth2.issuer must not be empty".into(),
876 ));
877 }
878 if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
879 return Err(AppError::Internal(format!(
880 "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
881 o.issuer
882 )));
883 }
884 if o.client_id.trim().is_empty() {
885 return Err(AppError::Internal(
886 "swagger.oauth2.client_id must not be empty".into(),
887 ));
888 }
889 }
890 }
891
892 {
898 let mp = &self.metrics.path;
899 if !mp.starts_with('/') {
900 return Err(AppError::Internal(format!(
901 "metrics.path must start with '/' (got '{mp}')"
902 )));
903 }
904 if mp.len() > 1 && mp.ends_with('/') {
905 return Err(AppError::Internal(format!(
906 "metrics.path must not end with '/' (got '{mp}')"
907 )));
908 }
909 if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
910 return Err(AppError::Internal(format!(
911 "metrics.path '{mp}' collides with a reserved route"
912 )));
913 }
914 if mp == &self.docs.path {
915 return Err(AppError::Internal(format!(
916 "metrics.path and docs.path must differ (both '{mp}')"
917 )));
918 }
919 if mp == &self.swagger.path {
920 return Err(AppError::Internal(format!(
921 "metrics.path and swagger.path must differ (both '{mp}')"
922 )));
923 }
924 }
925
926 {
929 let ep = &self.explorer.path;
930 if !ep.starts_with('/') {
931 return Err(AppError::Internal(format!(
932 "explorer.path must start with '/' (got '{ep}')"
933 )));
934 }
935 if ep.len() > 1 && ep.ends_with('/') {
936 return Err(AppError::Internal(format!(
937 "explorer.path must not end with '/' (got '{ep}')"
938 )));
939 }
940 if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
941 return Err(AppError::Internal(format!(
942 "explorer.path '{ep}' collides with a reserved route"
943 )));
944 }
945 if ep == &self.docs.path {
946 return Err(AppError::Internal(format!(
947 "explorer.path and docs.path must differ (both '{ep}')"
948 )));
949 }
950 if ep == &self.swagger.path {
951 return Err(AppError::Internal(format!(
952 "explorer.path and swagger.path must differ (both '{ep}')"
953 )));
954 }
955 if ep == &self.metrics.path {
956 return Err(AppError::Internal(format!(
957 "explorer.path and metrics.path must differ (both '{ep}')"
958 )));
959 }
960 }
961
962 if self.auth.enabled {
967 let a = &self.auth;
968 if a.issuer.trim().is_empty() {
969 return Err(AppError::Internal(
970 "auth.issuer must not be empty when auth.enabled = true".into(),
971 ));
972 }
973 if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
974 return Err(AppError::Internal(format!(
975 "auth.issuer must be an absolute http(s) URL (got '{}')",
976 a.issuer
977 )));
978 }
979 for alg in &a.algorithms {
980 match alg.as_str() {
981 "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
982 | "PS512" => {}
983 other => {
984 return Err(AppError::Internal(format!(
985 "auth.algorithms[{other}] is not allowed; pick one of \
986 RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
987 )));
988 }
989 }
990 }
991 if a.algorithms.is_empty() {
992 return Err(AppError::Internal(
993 "auth.algorithms must not be empty".into(),
994 ));
995 }
996 if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
997 return Err(AppError::Internal(format!(
998 "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
999 a.tenant_claim
1000 )));
1001 }
1002 if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
1003 return Err(AppError::Internal(
1004 "auth.allowed_tenants is set but auth.tenant_claim is empty — \
1005 can't enforce a tenant allow-list without a claim to extract from"
1006 .into(),
1007 ));
1008 }
1009 }
1010
1011 let mut seen = HashSet::new();
1012 for d in &self.datasets {
1013 if !seen.insert(d.name.as_str()) {
1014 return Err(AppError::Internal(format!(
1015 "duplicate dataset name: {}",
1016 d.name
1017 )));
1018 }
1019 if d.name.is_empty() {
1020 return Err(AppError::Internal("dataset name must not be empty".into()));
1021 }
1022 if !d
1024 .name
1025 .chars()
1026 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
1027 {
1028 return Err(AppError::Internal(format!(
1029 "dataset name '{}' must be alphanumeric (plus _ - .)",
1030 d.name
1031 )));
1032 }
1033
1034 if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
1035 return Err(AppError::Internal(format!(
1036 "dataset '{}': index.mode = 'list' requires non-empty index.columns",
1037 d.name
1038 )));
1039 }
1040
1041 if d.source.is_s3() {
1043 d.source.s3_bucket()?;
1044 if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1045 && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1046 && std::env::var("AWS_REGION").is_err()
1047 && std::env::var("AWS_DEFAULT_REGION").is_err()
1048 {
1049 log::warn!(
1050 "dataset '{}': S3 source without explicit region — \
1051 relying on AWS_REGION env var",
1052 d.name
1053 );
1054 }
1055 } else {
1056 match d.source.kind {
1060 SourceKind::Parquet => {
1061 d.resolve_local_parquet_files()?;
1062 }
1063 SourceKind::Delta => {
1064 let p = Path::new(&d.source.location);
1065 if !p.exists() {
1066 return Err(AppError::Internal(format!(
1067 "dataset '{}': delta location does not exist: {}",
1068 d.name, d.source.location
1069 )));
1070 }
1071 }
1072 }
1073 }
1074 }
1075 Ok(())
1076 }
1077}
1078
1079impl SourceConfig {
1080 pub fn is_s3(&self) -> bool {
1081 self.location.starts_with("s3://")
1082 }
1083
1084 pub fn has_glob(&self) -> bool {
1087 self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1088 }
1089
1090 pub fn s3_recursive_parquet_glob(&self) -> String {
1096 if !self.is_s3() || self.has_glob() {
1097 return self.location.clone();
1098 }
1099 let trimmed = self.location.trim_end_matches('/');
1100 format!("{trimmed}/**/*.parquet")
1101 }
1102
1103 pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1105 let rest = self
1106 .location
1107 .strip_prefix("s3://")
1108 .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1109 let (bucket, key) = match rest.split_once('/') {
1110 Some((b, k)) => (b, k),
1111 None => (rest, ""),
1112 };
1113 if bucket.is_empty() {
1114 return Err(AppError::Internal(format!(
1115 "s3 URL missing bucket: {}",
1116 self.location
1117 )));
1118 }
1119 Ok((bucket, key))
1120 }
1121}
1122
1123impl DatasetConfig {
1124 pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1134 if self.source.is_s3() {
1135 return Err(AppError::Internal(format!(
1136 "dataset '{}': resolve_local_parquet_files called on s3 source",
1137 self.name
1138 )));
1139 }
1140 let loc = &self.source.location;
1141
1142 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1144 let mut files: Vec<PathBuf> = glob::glob(loc)
1145 .map_err(|e| {
1146 AppError::Internal(format!(
1147 "dataset '{}': bad glob pattern '{loc}': {e}",
1148 self.name
1149 ))
1150 })?
1151 .filter_map(|r| r.ok())
1152 .filter(|p| {
1153 p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1154 })
1155 .collect();
1156 files.sort();
1157 if files.is_empty() {
1158 return Err(AppError::Internal(format!(
1159 "dataset '{}': glob '{loc}' matched no .parquet files",
1160 self.name
1161 )));
1162 }
1163 return Ok(files);
1164 }
1165
1166 let path = Path::new(loc);
1167 if !path.exists() {
1168 return Err(AppError::Internal(format!(
1169 "dataset '{}': source path does not exist: {loc}",
1170 self.name
1171 )));
1172 }
1173
1174 if path.is_file() {
1175 if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1176 return Err(AppError::Internal(format!(
1177 "dataset '{}': source must be a .parquet file",
1178 self.name
1179 )));
1180 }
1181 return Ok(vec![path.to_path_buf()]);
1182 }
1183
1184 let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1185 .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1186 .filter_map(|entry| entry.ok().map(|e| e.path()))
1187 .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1188 .collect();
1189 files.sort();
1190 if files.is_empty() {
1191 return Err(AppError::Internal(format!(
1192 "dataset '{}': no *.parquet files found in {loc}",
1193 self.name
1194 )));
1195 }
1196 Ok(files)
1197 }
1198
1199 pub fn env_prefix(&self) -> String {
1203 self.name
1204 .chars()
1205 .map(|c| {
1206 if c.is_ascii_alphanumeric() {
1207 c.to_ascii_uppercase()
1208 } else {
1209 '_'
1210 }
1211 })
1212 .collect()
1213 }
1214
1215 pub fn resolved_creds(&self) -> ResolvedCreds {
1220 let prefix = self.env_prefix();
1221 let from_env = |suffix: &str| {
1222 std::env::var(format!("{prefix}_{suffix}"))
1223 .ok()
1224 .filter(|s| !s.is_empty())
1225 };
1226 let inline = self.s3.as_ref();
1227 let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1228
1229 ResolvedCreds {
1230 access_key_id: from_env("AWS_ACCESS_KEY_ID")
1231 .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1232 .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1233 secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1234 .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1235 .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1236 session_token: from_env("AWS_SESSION_TOKEN")
1237 .or_else(|| inline.and_then(|s| s.session_token.clone()))
1238 .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1239 }
1240 }
1241
1242 pub fn resolved_region(&self) -> String {
1245 let prefix = self.env_prefix();
1246 std::env::var(format!("{prefix}_AWS_REGION"))
1247 .ok()
1248 .filter(|s| !s.is_empty())
1249 .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1250 .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1251 .or_else(|| {
1252 std::env::var("AWS_DEFAULT_REGION")
1253 .ok()
1254 .filter(|s| !s.is_empty())
1255 })
1256 .unwrap_or_else(|| "us-east-1".to_string())
1257 }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262 use super::*;
1263
1264 #[test]
1265 fn server_defaults() {
1266 let s = ServerConfig::default();
1267 assert_eq!(s.backend, Backend::Datafusion);
1268 assert_eq!(s.port, 8080);
1269 assert!(s.compress);
1270 assert_eq!(s.max_body_bytes, 1024 * 1024);
1271 assert_eq!(s.max_page_size, 100_000);
1272 assert_eq!(s.request_timeout_ms, 30_000);
1273 assert!(!s.quack.enabled);
1274 assert_eq!(s.quack.uri, "quack:localhost");
1275 assert!(s.quack.token.is_none());
1276 assert!(!s.quack.allow_other_hostname);
1277 assert!(s.quack.read_only);
1278 assert_eq!(s.prefix, "");
1279 assert!(s.listen.is_loopback());
1280 }
1281
1282 #[test]
1283 fn server_overrides_from_toml() {
1284 let toml = r#"
1285 [server]
1286 backend = "duckdb"
1287 port = 9000
1288 prefix = "/datapress"
1289 compress = false
1290 max_body_bytes = 4096
1291 max_page_size = 50000
1292 request_timeout_ms = 0
1293
1294 [server.quack]
1295 enabled = true
1296 uri = "quack:localhost:9495"
1297 token = "test-token"
1298 read_only = false
1299 [[dataset]]
1300 name = "x"
1301 source.kind = "parquet"
1302 source.location = "/tmp/missing.parquet"
1303 "#;
1304 let cfg: AppConfig = toml::from_str(toml).unwrap();
1305 assert_eq!(cfg.server.backend, Backend::Duckdb);
1306 assert_eq!(cfg.server.port, 9000);
1307 assert_eq!(cfg.server.prefix, "/datapress");
1308 assert!(!cfg.server.compress);
1309 assert_eq!(cfg.server.max_body_bytes, 4096);
1310 assert_eq!(cfg.server.max_page_size, 50_000);
1311 assert_eq!(cfg.server.request_timeout_ms, 0);
1312 assert!(cfg.server.quack.enabled);
1313 assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1314 assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1315 assert!(!cfg.server.quack.read_only);
1316 assert_eq!(cfg.datasets.len(), 1);
1317 assert_eq!(cfg.datasets[0].name, "x");
1318 assert!(cfg.datasets[0].dict_encode); }
1320
1321 #[test]
1322 fn validate_rejects_bad_prefix() {
1323 let bad = ["no-leading-slash", "/trailing/"];
1324 for p in bad {
1325 let cfg = AppConfig {
1326 server: ServerConfig {
1327 prefix: p.to_string(),
1328 ..Default::default()
1329 },
1330 docs: DocsConfig::default(),
1331 swagger: SwaggerConfig::default(),
1332 metrics: MetricsConfig::default(),
1333 explorer: ExplorerConfig::default(),
1334 sql: SqlConfig::default(),
1335 auth: AuthConfig::default(),
1336 datasets: vec![],
1337 };
1338 assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1339 }
1340 }
1341
1342 #[test]
1343 fn normalize_lowercases_configured_scopes() {
1344 let mut cfg = AppConfig {
1345 server: ServerConfig::default(),
1346 docs: DocsConfig::default(),
1347 swagger: SwaggerConfig::default(),
1348 metrics: MetricsConfig::default(),
1349 explorer: ExplorerConfig::default(),
1350 sql: SqlConfig::default(),
1351 auth: AuthConfig {
1352 read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1353 reload_scopes: vec!["Datasets:Reload".into()],
1354 ..Default::default()
1355 },
1356 datasets: vec![],
1357 };
1358 cfg.normalize();
1359 assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1360 assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1361 }
1362
1363 #[test]
1364 fn validate_rejects_no_datasets() {
1365 let cfg = AppConfig {
1366 server: ServerConfig::default(),
1367 docs: DocsConfig::default(),
1368 swagger: SwaggerConfig::default(),
1369 metrics: MetricsConfig::default(),
1370 explorer: ExplorerConfig::default(),
1371 sql: SqlConfig::default(),
1372 auth: AuthConfig::default(),
1373 datasets: vec![],
1374 };
1375 let err = cfg.validate().unwrap_err();
1376 assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1377 }
1378
1379 #[cfg(feature = "auth")]
1380 #[test]
1381 fn validate_accepts_auth_issuer_with_trailing_slash() {
1382 use std::io::Write;
1383
1384 let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1385 let _ = std::fs::remove_dir_all(&dir);
1386 std::fs::create_dir_all(&dir).unwrap();
1387 let file = dir.join("a.parquet");
1388 std::fs::File::create(&file)
1389 .unwrap()
1390 .write_all(b"x")
1391 .unwrap();
1392
1393 let cfg = AppConfig {
1394 server: ServerConfig::default(),
1395 docs: DocsConfig::default(),
1396 swagger: SwaggerConfig::default(),
1397 metrics: MetricsConfig::default(),
1398 explorer: ExplorerConfig::default(),
1399 sql: SqlConfig::default(),
1400 auth: AuthConfig {
1401 enabled: true,
1402 issuer: "https://tenant.example.com/".into(),
1403 ..Default::default()
1404 },
1405 datasets: vec![DatasetConfig {
1406 name: "x".into(),
1407 source: SourceConfig {
1408 kind: SourceKind::Parquet,
1409 location: file.to_string_lossy().into_owned(),
1410 },
1411 s3: None,
1412 index: IndexConfig::default(),
1413 columns: vec![],
1414 dict_encode: true,
1415 lazy: false,
1416 }],
1417 };
1418
1419 assert!(cfg.validate().is_ok());
1420 let _ = std::fs::remove_dir_all(&dir);
1421 }
1422
1423 #[test]
1424 fn validate_rejects_quack_non_local_host_without_override() {
1425 let cfg = AppConfig {
1426 server: ServerConfig {
1427 quack: QuackConfig {
1428 enabled: true,
1429 uri: "quack:127.0.0.1".into(),
1430 token: Some("test-token".into()),
1431 ..Default::default()
1432 },
1433 ..Default::default()
1434 },
1435 docs: DocsConfig::default(),
1436 swagger: SwaggerConfig::default(),
1437 metrics: MetricsConfig::default(),
1438 explorer: ExplorerConfig::default(),
1439 sql: SqlConfig::default(),
1440 auth: AuthConfig::default(),
1441 datasets: vec![DatasetConfig {
1442 name: "x".into(),
1443 source: SourceConfig {
1444 kind: SourceKind::Parquet,
1445 location: "/tmp/missing.parquet".into(),
1446 },
1447 s3: None,
1448 index: IndexConfig::default(),
1449 columns: vec![],
1450 dict_encode: true,
1451 lazy: false,
1452 }],
1453 };
1454 let err = cfg.validate().unwrap_err();
1455 assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1456 }
1457
1458 #[test]
1459 fn validate_rejects_bad_dataset_name() {
1460 let cfg: AppConfig = toml::from_str(
1461 r#"
1462 [[dataset]]
1463 name = "bad name!"
1464 source.kind = "parquet"
1465 source.location = "/tmp/whatever"
1466 "#,
1467 )
1468 .unwrap();
1469 let err = cfg.validate().unwrap_err();
1470 assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1471 }
1472
1473 #[test]
1474 fn validate_rejects_duplicate_names() {
1475 use std::io::Write;
1476 let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1477 let _ = std::fs::remove_dir_all(&dir);
1478 std::fs::create_dir_all(&dir).unwrap();
1479 let f = dir.join("a.parquet");
1480 std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1481 let path = f.to_str().unwrap();
1482
1483 let cfg: AppConfig = toml::from_str(&format!(
1484 r#"
1485 [[dataset]]
1486 name = "a"
1487 source.kind = "parquet"
1488 source.location = "{path}"
1489 [[dataset]]
1490 name = "a"
1491 source.kind = "parquet"
1492 source.location = "{path}"
1493 "#
1494 ))
1495 .unwrap();
1496 let err = cfg.validate().expect_err("expected error");
1497 assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1498
1499 let _ = std::fs::remove_dir_all(&dir);
1500 }
1501
1502 #[test]
1503 fn s3_bucket_parsing() {
1504 let mk = |loc: &str| SourceConfig {
1505 kind: SourceKind::Parquet,
1506 location: loc.into(),
1507 };
1508 let s1 = mk("s3://bucket/path/key");
1509 assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1510 let s2 = mk("s3://only-bucket");
1511 assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1512 assert!(mk("s3:///nokey").s3_bucket().is_err());
1513 assert!(mk("/local/path").s3_bucket().is_err());
1514 }
1515
1516 #[test]
1517 fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1518 let mk = |loc: &str| SourceConfig {
1519 kind: SourceKind::Parquet,
1520 location: loc.into(),
1521 };
1522 assert_eq!(
1524 mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1525 "s3://bucket/logs/**/*.parquet"
1526 );
1527 assert_eq!(
1528 mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1529 "s3://bucket/logs/**/*.parquet"
1530 );
1531 assert_eq!(
1533 mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1534 "s3://bucket/logs/*.parquet"
1535 );
1536 assert_eq!(
1538 mk("/local/logs").s3_recursive_parquet_glob(),
1539 "/local/logs"
1540 );
1541 }
1542
1543 #[test]
1544 fn effective_endpoint_folds_bucket_per_mode() {
1545 let virt = S3Config {
1546 endpoint: Some("https://s3.example.com".into()),
1547 addressing_style: AddressingStyle::Virtual,
1548 ..Default::default()
1549 };
1550 assert_eq!(
1552 virt.effective_endpoint("mybucket").as_deref(),
1553 Some("https://mybucket.s3.example.com")
1554 );
1555 let prefixed = S3Config {
1557 endpoint: Some("https://mybucket.s3.example.com".into()),
1558 ..virt.clone()
1559 };
1560 assert_eq!(
1561 prefixed.effective_endpoint("mybucket").as_deref(),
1562 Some("https://mybucket.s3.example.com")
1563 );
1564 let path = S3Config {
1566 addressing_style: AddressingStyle::Path,
1567 ..virt.clone()
1568 };
1569 assert_eq!(
1570 path.effective_endpoint("mybucket").as_deref(),
1571 Some("https://s3.example.com")
1572 );
1573 let forced_off = S3Config {
1575 endpoint_bucket_in_host: BucketInHost::False,
1576 ..virt.clone()
1577 };
1578 assert_eq!(
1579 forced_off.effective_endpoint("mybucket").as_deref(),
1580 Some("https://s3.example.com")
1581 );
1582 let forced_on = S3Config {
1583 endpoint_bucket_in_host: BucketInHost::True,
1584 ..path.clone()
1585 };
1586 assert_eq!(
1587 forced_on.effective_endpoint("mybucket").as_deref(),
1588 Some("https://mybucket.s3.example.com")
1589 );
1590 assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1592 }
1593
1594 #[test]
1595 fn env_prefix_sanitises_name() {
1596 let mk = |name: &str| DatasetConfig {
1597 name: name.into(),
1598 source: SourceConfig {
1599 kind: SourceKind::Parquet,
1600 location: "x".into(),
1601 },
1602 s3: None,
1603 index: IndexConfig::default(),
1604 columns: vec![],
1605 dict_encode: true,
1606 lazy: false,
1607 };
1608 assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1609 assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1610 assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1611 }
1612
1613 #[test]
1614 fn resolve_local_parquet_single_file_and_dir() {
1615 use std::io::Write;
1616 let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1617 let _ = std::fs::remove_dir_all(&dir);
1618 std::fs::create_dir_all(&dir).unwrap();
1619 let f = dir.join("a.parquet");
1620 let mut fh = std::fs::File::create(&f).unwrap();
1621 fh.write_all(b"not really parquet").unwrap();
1622
1623 let mk = |loc: &str| DatasetConfig {
1624 name: "ds".into(),
1625 source: SourceConfig {
1626 kind: SourceKind::Parquet,
1627 location: loc.into(),
1628 },
1629 s3: None,
1630 index: IndexConfig::default(),
1631 columns: vec![],
1632 dict_encode: true,
1633 lazy: false,
1634 };
1635
1636 let files = mk(f.to_str().unwrap())
1638 .resolve_local_parquet_files()
1639 .unwrap();
1640 assert_eq!(files, vec![f.clone()]);
1641
1642 let files = mk(dir.to_str().unwrap())
1644 .resolve_local_parquet_files()
1645 .unwrap();
1646 assert_eq!(files, vec![f.clone()]);
1647
1648 assert!(
1650 mk("/no/such/place.parquet")
1651 .resolve_local_parquet_files()
1652 .is_err()
1653 );
1654
1655 let _ = std::fs::remove_dir_all(&dir);
1656 }
1657}