1use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32const RESERVED_MOUNTS: &[&str] = &[
36 "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45 #[serde(default)]
46 pub server: ServerConfig,
47 #[serde(default)]
48 pub docs: DocsConfig,
49 #[serde(default)]
50 pub swagger: SwaggerConfig,
51 #[serde(default)]
52 pub metrics: MetricsConfig,
53 #[serde(default)]
54 pub auth: AuthConfig,
55 #[serde(rename = "dataset", default)]
56 pub datasets: Vec<DatasetConfig>,
57}
58
59#[derive(Debug, Deserialize)]
60#[serde(default)]
61pub struct ServerConfig {
62 pub backend: Backend,
64 pub listen: IpAddr,
67 pub port: u16,
69 pub workers: Option<usize>,
71 pub prefix: String,
77 pub compress: bool,
82 pub max_body_bytes: usize,
87 pub max_page_size: u64,
91 pub request_timeout_ms: u64,
95 pub shutdown_timeout_secs: u64,
100 pub quack: QuackConfig,
103}
104
105impl Default for ServerConfig {
106 fn default() -> Self {
107 Self {
108 backend: Backend::default(),
109 listen: IpAddr::from([127, 0, 0, 1]),
110 port: 8080,
111 workers: None,
112 prefix: String::new(),
113 compress: true,
114 max_body_bytes: 1024 * 1024,
115 max_page_size: 100_000,
116 request_timeout_ms: 30_000,
117 shutdown_timeout_secs: 30,
118 quack: QuackConfig::default(),
119 }
120 }
121}
122
123#[derive(Debug, Clone, Deserialize)]
129#[serde(default)]
130pub struct QuackConfig {
131 pub enabled: bool,
134 pub uri: String,
137 pub token: Option<String>,
140 pub allow_other_hostname: bool,
143 pub read_only: bool,
146}
147
148impl Default for QuackConfig {
149 fn default() -> Self {
150 Self {
151 enabled: false,
152 uri: "quack:localhost".into(),
153 token: None,
154 allow_other_hostname: false,
155 read_only: true,
156 }
157 }
158}
159
160impl QuackConfig {
161 pub fn validate_enabled(&self) -> Result<(), AppError> {
165 if self.uri.trim().is_empty() {
166 return Err(AppError::Internal(
167 "server.quack.uri must not be empty when server.quack.enabled = true".into(),
168 ));
169 }
170 if !self.uri.starts_with("quack:") {
171 return Err(AppError::Internal(format!(
172 "server.quack.uri must start with 'quack:' (got '{}')",
173 self.uri
174 )));
175 }
176 if !self.allow_other_hostname {
177 let host = self.hostname().unwrap_or_default();
178 if host != "localhost" {
179 return Err(AppError::Internal(format!(
180 "server.quack.uri host must be 'localhost' unless \
181 server.quack.allow_other_hostname = true (got '{}')",
182 self.uri
183 )));
184 }
185 }
186 if let Some(token) = self.token.as_deref()
187 && token.len() < 4
188 {
189 return Err(AppError::Internal(
190 "server.quack.token must be at least 4 characters".into(),
191 ));
192 }
193 Ok(())
194 }
195
196 fn hostname(&self) -> Option<&str> {
197 let rest = self.uri.strip_prefix("quack:")?;
198 let rest = rest.strip_prefix("//").unwrap_or(rest);
199 let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
200 (!host.is_empty()).then_some(host)
201 }
202}
203
204#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
205#[serde(rename_all = "lowercase")]
206pub enum Backend {
207 #[default]
208 Datafusion,
209 Duckdb,
210}
211
212#[derive(Debug, Clone, Deserialize)]
222#[serde(default, deny_unknown_fields)]
223pub struct DocsConfig {
224 pub enabled: bool,
225 pub path: String,
226}
227
228impl Default for DocsConfig {
229 fn default() -> Self {
230 Self {
231 enabled: true,
232 path: "/mkdocs".into(),
233 }
234 }
235}
236
237#[derive(Debug, Clone, Deserialize)]
254#[serde(default, deny_unknown_fields)]
255pub struct SwaggerConfig {
256 pub enabled: bool,
257 pub path: String,
258 pub oauth2: Option<SwaggerOAuth2Config>,
259}
260
261impl Default for SwaggerConfig {
262 fn default() -> Self {
263 Self {
264 enabled: true,
265 path: "/docs".into(),
266 oauth2: None,
267 }
268 }
269}
270
271#[derive(Debug, Clone, Deserialize)]
281#[serde(deny_unknown_fields)]
282pub struct SwaggerOAuth2Config {
283 pub issuer: String,
287 pub client_id: String,
292 #[serde(default)]
296 pub scopes: Vec<String>,
297 #[serde(default = "default_true")]
300 pub pkce: bool,
301}
302
303#[derive(Debug, Clone, Deserialize)]
323#[serde(default, deny_unknown_fields)]
324pub struct MetricsConfig {
325 pub enabled: bool,
326 pub path: String,
327}
328
329impl Default for MetricsConfig {
330 fn default() -> Self {
331 Self {
332 enabled: false,
333 path: "/metrics".into(),
334 }
335 }
336}
337
338#[derive(Debug, Clone, Deserialize)]
355#[serde(default, deny_unknown_fields)]
356pub struct AuthConfig {
357 pub enabled: bool,
359 pub issuer: String,
362 pub audience: String,
365 pub read_scopes: Vec<String>,
369 pub reload_scopes: Vec<String>,
372 pub anonymous_read: bool,
375 pub start_degraded: bool,
380 pub algorithms: Vec<String>,
384 pub leeway_secs: u64,
386 pub jwks_refresh_secs: u64,
390 pub tenant_claim: String,
395 pub allowed_tenants: Vec<String>,
399 pub admin_token_fallback: bool,
404}
405
406impl Default for AuthConfig {
407 fn default() -> Self {
408 Self {
409 enabled: false,
410 issuer: String::new(),
411 audience: String::new(),
412 read_scopes: Vec::new(),
413 reload_scopes: Vec::new(),
414 anonymous_read: false,
415 start_degraded: true,
416 algorithms: vec!["RS256".into()],
417 leeway_secs: 60,
418 jwks_refresh_secs: 3600,
419 tenant_claim: String::new(),
420 allowed_tenants: Vec::new(),
421 admin_token_fallback: true,
422 }
423 }
424}
425
426impl Backend {
427 pub fn as_str(self) -> &'static str {
428 match self {
429 Backend::Datafusion => "datafusion",
430 Backend::Duckdb => "duckdb",
431 }
432 }
433}
434
435#[derive(Debug, Clone, Deserialize)]
436pub struct DatasetConfig {
437 pub name: String,
438 pub source: SourceConfig,
439 #[serde(default)]
440 pub s3: Option<S3Config>,
441 #[serde(default)]
442 pub index: IndexConfig,
443 #[serde(default)]
449 pub columns: Vec<String>,
450 #[serde(default = "default_true")]
457 pub dict_encode: bool,
458 #[serde(default)]
464 pub lazy: bool,
465}
466
467fn default_true() -> bool {
468 true
469}
470
471#[derive(Debug, Clone, Deserialize)]
472pub struct SourceConfig {
473 pub kind: SourceKind,
474 pub location: String,
476}
477
478#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
479#[serde(rename_all = "lowercase")]
480pub enum SourceKind {
481 #[default]
482 Parquet,
483 Delta,
484}
485
486impl SourceKind {
487 pub fn as_str(self) -> &'static str {
488 match self {
489 SourceKind::Parquet => "parquet",
490 SourceKind::Delta => "delta",
491 }
492 }
493}
494
495#[derive(Debug, Clone, Deserialize)]
498#[serde(default)]
499pub struct S3Config {
500 pub region: Option<String>,
501 pub endpoint: Option<String>,
503 pub addressing_style: AddressingStyle,
506 pub allow_http: bool,
508 pub access_key_id: Option<String>,
511 pub secret_access_key: Option<String>,
512 pub session_token: Option<String>,
513 pub partitioning: Partitioning,
516 pub endpoint_bucket_in_host: BucketInHost,
520}
521
522impl Default for S3Config {
523 fn default() -> Self {
524 Self {
525 region: None,
526 endpoint: None,
527 addressing_style: AddressingStyle::Virtual,
528 allow_http: false,
529 access_key_id: None,
530 secret_access_key: None,
531 session_token: None,
532 partitioning: Partitioning::Auto,
533 endpoint_bucket_in_host: BucketInHost::Auto,
534 }
535 }
536}
537
538impl S3Config {
539 pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
548 let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
549
550 let fold = match self.endpoint_bucket_in_host {
551 BucketInHost::False => false,
552 BucketInHost::True => true,
553 BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
554 };
555 if !fold {
556 return Some(ep.to_string());
557 }
558
559 let (scheme, host_and_path) = match ep.split_once("://") {
560 Some((s, rest)) => (Some(s), rest),
561 None => (None, ep),
562 };
563 let (host, path) = match host_and_path.split_once('/') {
565 Some((h, p)) => (h, Some(p)),
566 None => (host_and_path, None),
567 };
568 if host == bucket || host.starts_with(&format!("{bucket}.")) {
570 return Some(ep.to_string());
571 }
572 let new_host = format!("{bucket}.{host}");
573 let rebuilt = match (scheme, path) {
574 (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
575 (Some(s), None) => format!("{s}://{new_host}"),
576 (None, Some(p)) => format!("{new_host}/{p}"),
577 (None, None) => new_host,
578 };
579 Some(rebuilt)
580 }
581}
582
583#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
584#[serde(rename_all = "lowercase")]
585pub enum AddressingStyle {
586 #[default]
587 Virtual,
588 Path,
589}
590
591impl AddressingStyle {
592 pub fn as_str(self) -> &'static str {
593 match self {
594 AddressingStyle::Virtual => "virtual",
595 AddressingStyle::Path => "path",
596 }
597 }
598}
599
600#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
604#[serde(rename_all = "lowercase")]
605pub enum Partitioning {
606 #[default]
609 Auto,
610 Hive,
613 None,
616}
617
618impl Partitioning {
619 pub fn as_str(self) -> &'static str {
620 match self {
621 Partitioning::Auto => "auto",
622 Partitioning::Hive => "hive",
623 Partitioning::None => "none",
624 }
625 }
626}
627
628#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
633#[serde(rename_all = "lowercase")]
634pub enum BucketInHost {
635 #[default]
638 Auto,
639 True,
641 False,
643}
644
645impl BucketInHost {
646 pub fn as_str(self) -> &'static str {
647 match self {
648 BucketInHost::Auto => "auto",
649 BucketInHost::True => "true",
650 BucketInHost::False => "false",
651 }
652 }
653}
654
655#[derive(Debug, Clone, Deserialize)]
656#[serde(default)]
657pub struct IndexConfig {
658 pub mode: IndexMode,
659 pub columns: Vec<String>,
660 pub max_cardinality: usize,
661}
662
663impl Default for IndexConfig {
664 fn default() -> Self {
665 Self {
666 mode: IndexMode::Auto,
667 columns: Vec::new(),
668 max_cardinality: 100_000,
669 }
670 }
671}
672
673#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
674#[serde(rename_all = "lowercase")]
675pub enum IndexMode {
676 #[default]
677 Auto,
678 None,
679 List,
680}
681
682#[derive(Debug, Clone, Default)]
685pub struct ResolvedCreds {
686 pub access_key_id: Option<String>,
687 pub secret_access_key: Option<String>,
688 pub session_token: Option<String>,
689}
690
691impl ResolvedCreds {
692 pub fn has_keypair(&self) -> bool {
693 self.access_key_id.is_some() && self.secret_access_key.is_some()
694 }
695}
696
697impl AppConfig {
702 pub fn load(path: &str) -> Result<Self, AppError> {
704 let raw = std::fs::read_to_string(path)
705 .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
706 let mut cfg: AppConfig =
707 toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
708 cfg.normalize();
709 cfg.validate()?;
710 Ok(cfg)
711 }
712
713 fn normalize(&mut self) {
721 for s in self
722 .auth
723 .read_scopes
724 .iter_mut()
725 .chain(self.auth.reload_scopes.iter_mut())
726 {
727 *s = s.to_ascii_lowercase();
728 }
729 }
730
731 fn validate(&self) -> Result<(), AppError> {
732 let p = &self.server.prefix;
734 if !p.is_empty() {
735 if !p.starts_with('/') {
736 return Err(AppError::Internal(format!(
737 "server.prefix must start with '/' (got '{p}')"
738 )));
739 }
740 if p.ends_with('/') {
741 return Err(AppError::Internal(format!(
742 "server.prefix must not end with '/' (got '{p}')"
743 )));
744 }
745 }
746
747 if self.datasets.is_empty() {
748 return Err(AppError::Internal(
749 "datasets.toml has no [[dataset]] entries".into(),
750 ));
751 }
752
753 if self.server.quack.enabled {
754 self.server.quack.validate_enabled()?;
755 }
756
757 {
760 let dp = &self.docs.path;
761 if !dp.starts_with('/') {
762 return Err(AppError::Internal(format!(
763 "docs.path must start with '/' (got '{dp}')"
764 )));
765 }
766 if dp.len() > 1 && dp.ends_with('/') {
767 return Err(AppError::Internal(format!(
768 "docs.path must not end with '/' (got '{dp}')"
769 )));
770 }
771 if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
772 return Err(AppError::Internal(format!(
773 "docs.path '{dp}' collides with a reserved route"
774 )));
775 }
776 }
777
778 {
780 let sp = &self.swagger.path;
781 if !sp.starts_with('/') {
782 return Err(AppError::Internal(format!(
783 "swagger.path must start with '/' (got '{sp}')"
784 )));
785 }
786 if sp.len() > 1 && sp.ends_with('/') {
787 return Err(AppError::Internal(format!(
788 "swagger.path must not end with '/' (got '{sp}')"
789 )));
790 }
791 if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
792 return Err(AppError::Internal(format!(
793 "swagger.path '{sp}' collides with a reserved route"
794 )));
795 }
796 if sp == &self.docs.path {
797 return Err(AppError::Internal(format!(
798 "swagger.path and docs.path must differ (both '{sp}')"
799 )));
800 }
801 if let Some(o) = &self.swagger.oauth2 {
802 if o.issuer.trim().is_empty() {
803 return Err(AppError::Internal(
804 "swagger.oauth2.issuer must not be empty".into(),
805 ));
806 }
807 if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
808 return Err(AppError::Internal(format!(
809 "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
810 o.issuer
811 )));
812 }
813 if o.client_id.trim().is_empty() {
814 return Err(AppError::Internal(
815 "swagger.oauth2.client_id must not be empty".into(),
816 ));
817 }
818 }
819 }
820
821 {
827 let mp = &self.metrics.path;
828 if !mp.starts_with('/') {
829 return Err(AppError::Internal(format!(
830 "metrics.path must start with '/' (got '{mp}')"
831 )));
832 }
833 if mp.len() > 1 && mp.ends_with('/') {
834 return Err(AppError::Internal(format!(
835 "metrics.path must not end with '/' (got '{mp}')"
836 )));
837 }
838 if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
839 return Err(AppError::Internal(format!(
840 "metrics.path '{mp}' collides with a reserved route"
841 )));
842 }
843 if mp == &self.docs.path {
844 return Err(AppError::Internal(format!(
845 "metrics.path and docs.path must differ (both '{mp}')"
846 )));
847 }
848 if mp == &self.swagger.path {
849 return Err(AppError::Internal(format!(
850 "metrics.path and swagger.path must differ (both '{mp}')"
851 )));
852 }
853 }
854
855 if self.auth.enabled {
860 let a = &self.auth;
861 if a.issuer.trim().is_empty() {
862 return Err(AppError::Internal(
863 "auth.issuer must not be empty when auth.enabled = true".into(),
864 ));
865 }
866 if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
867 return Err(AppError::Internal(format!(
868 "auth.issuer must be an absolute http(s) URL (got '{}')",
869 a.issuer
870 )));
871 }
872 for alg in &a.algorithms {
873 match alg.as_str() {
874 "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
875 | "PS512" => {}
876 other => {
877 return Err(AppError::Internal(format!(
878 "auth.algorithms[{other}] is not allowed; pick one of \
879 RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
880 )));
881 }
882 }
883 }
884 if a.algorithms.is_empty() {
885 return Err(AppError::Internal(
886 "auth.algorithms must not be empty".into(),
887 ));
888 }
889 if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
890 return Err(AppError::Internal(format!(
891 "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
892 a.tenant_claim
893 )));
894 }
895 if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
896 return Err(AppError::Internal(
897 "auth.allowed_tenants is set but auth.tenant_claim is empty — \
898 can't enforce a tenant allow-list without a claim to extract from"
899 .into(),
900 ));
901 }
902 }
903
904 let mut seen = HashSet::new();
905 for d in &self.datasets {
906 if !seen.insert(d.name.as_str()) {
907 return Err(AppError::Internal(format!(
908 "duplicate dataset name: {}",
909 d.name
910 )));
911 }
912 if d.name.is_empty() {
913 return Err(AppError::Internal("dataset name must not be empty".into()));
914 }
915 if !d
917 .name
918 .chars()
919 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
920 {
921 return Err(AppError::Internal(format!(
922 "dataset name '{}' must be alphanumeric (plus _ - .)",
923 d.name
924 )));
925 }
926
927 if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
928 return Err(AppError::Internal(format!(
929 "dataset '{}': index.mode = 'list' requires non-empty index.columns",
930 d.name
931 )));
932 }
933
934 if d.source.is_s3() {
936 d.source.s3_bucket()?;
937 if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
938 && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
939 && std::env::var("AWS_REGION").is_err()
940 && std::env::var("AWS_DEFAULT_REGION").is_err()
941 {
942 log::warn!(
943 "dataset '{}': S3 source without explicit region — \
944 relying on AWS_REGION env var",
945 d.name
946 );
947 }
948 } else {
949 match d.source.kind {
953 SourceKind::Parquet => {
954 d.resolve_local_parquet_files()?;
955 }
956 SourceKind::Delta => {
957 let p = Path::new(&d.source.location);
958 if !p.exists() {
959 return Err(AppError::Internal(format!(
960 "dataset '{}': delta location does not exist: {}",
961 d.name, d.source.location
962 )));
963 }
964 }
965 }
966 }
967 }
968 Ok(())
969 }
970}
971
972impl SourceConfig {
973 pub fn is_s3(&self) -> bool {
974 self.location.starts_with("s3://")
975 }
976
977 pub fn has_glob(&self) -> bool {
980 self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
981 }
982
983 pub fn s3_recursive_parquet_glob(&self) -> String {
989 if !self.is_s3() || self.has_glob() {
990 return self.location.clone();
991 }
992 let trimmed = self.location.trim_end_matches('/');
993 format!("{trimmed}/**/*.parquet")
994 }
995
996 pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
998 let rest = self
999 .location
1000 .strip_prefix("s3://")
1001 .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1002 let (bucket, key) = match rest.split_once('/') {
1003 Some((b, k)) => (b, k),
1004 None => (rest, ""),
1005 };
1006 if bucket.is_empty() {
1007 return Err(AppError::Internal(format!(
1008 "s3 URL missing bucket: {}",
1009 self.location
1010 )));
1011 }
1012 Ok((bucket, key))
1013 }
1014}
1015
1016impl DatasetConfig {
1017 pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1027 if self.source.is_s3() {
1028 return Err(AppError::Internal(format!(
1029 "dataset '{}': resolve_local_parquet_files called on s3 source",
1030 self.name
1031 )));
1032 }
1033 let loc = &self.source.location;
1034
1035 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1037 let mut files: Vec<PathBuf> = glob::glob(loc)
1038 .map_err(|e| {
1039 AppError::Internal(format!(
1040 "dataset '{}': bad glob pattern '{loc}': {e}",
1041 self.name
1042 ))
1043 })?
1044 .filter_map(|r| r.ok())
1045 .filter(|p| {
1046 p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1047 })
1048 .collect();
1049 files.sort();
1050 if files.is_empty() {
1051 return Err(AppError::Internal(format!(
1052 "dataset '{}': glob '{loc}' matched no .parquet files",
1053 self.name
1054 )));
1055 }
1056 return Ok(files);
1057 }
1058
1059 let path = Path::new(loc);
1060 if !path.exists() {
1061 return Err(AppError::Internal(format!(
1062 "dataset '{}': source path does not exist: {loc}",
1063 self.name
1064 )));
1065 }
1066
1067 if path.is_file() {
1068 if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1069 return Err(AppError::Internal(format!(
1070 "dataset '{}': source must be a .parquet file",
1071 self.name
1072 )));
1073 }
1074 return Ok(vec![path.to_path_buf()]);
1075 }
1076
1077 let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1078 .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1079 .filter_map(|entry| entry.ok().map(|e| e.path()))
1080 .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1081 .collect();
1082 files.sort();
1083 if files.is_empty() {
1084 return Err(AppError::Internal(format!(
1085 "dataset '{}': no *.parquet files found in {loc}",
1086 self.name
1087 )));
1088 }
1089 Ok(files)
1090 }
1091
1092 pub fn env_prefix(&self) -> String {
1096 self.name
1097 .chars()
1098 .map(|c| {
1099 if c.is_ascii_alphanumeric() {
1100 c.to_ascii_uppercase()
1101 } else {
1102 '_'
1103 }
1104 })
1105 .collect()
1106 }
1107
1108 pub fn resolved_creds(&self) -> ResolvedCreds {
1113 let prefix = self.env_prefix();
1114 let from_env = |suffix: &str| {
1115 std::env::var(format!("{prefix}_{suffix}"))
1116 .ok()
1117 .filter(|s| !s.is_empty())
1118 };
1119 let inline = self.s3.as_ref();
1120 let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1121
1122 ResolvedCreds {
1123 access_key_id: from_env("AWS_ACCESS_KEY_ID")
1124 .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1125 .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1126 secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1127 .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1128 .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1129 session_token: from_env("AWS_SESSION_TOKEN")
1130 .or_else(|| inline.and_then(|s| s.session_token.clone()))
1131 .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1132 }
1133 }
1134
1135 pub fn resolved_region(&self) -> String {
1138 let prefix = self.env_prefix();
1139 std::env::var(format!("{prefix}_AWS_REGION"))
1140 .ok()
1141 .filter(|s| !s.is_empty())
1142 .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1143 .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1144 .or_else(|| {
1145 std::env::var("AWS_DEFAULT_REGION")
1146 .ok()
1147 .filter(|s| !s.is_empty())
1148 })
1149 .unwrap_or_else(|| "us-east-1".to_string())
1150 }
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155 use super::*;
1156
1157 #[test]
1158 fn server_defaults() {
1159 let s = ServerConfig::default();
1160 assert_eq!(s.backend, Backend::Datafusion);
1161 assert_eq!(s.port, 8080);
1162 assert!(s.compress);
1163 assert_eq!(s.max_body_bytes, 1024 * 1024);
1164 assert_eq!(s.max_page_size, 100_000);
1165 assert_eq!(s.request_timeout_ms, 30_000);
1166 assert!(!s.quack.enabled);
1167 assert_eq!(s.quack.uri, "quack:localhost");
1168 assert!(s.quack.token.is_none());
1169 assert!(!s.quack.allow_other_hostname);
1170 assert!(s.quack.read_only);
1171 assert_eq!(s.prefix, "");
1172 assert!(s.listen.is_loopback());
1173 }
1174
1175 #[test]
1176 fn server_overrides_from_toml() {
1177 let toml = r#"
1178 [server]
1179 backend = "duckdb"
1180 port = 9000
1181 prefix = "/datapress"
1182 compress = false
1183 max_body_bytes = 4096
1184 max_page_size = 50000
1185 request_timeout_ms = 0
1186
1187 [server.quack]
1188 enabled = true
1189 uri = "quack:localhost:9495"
1190 token = "test-token"
1191 read_only = false
1192 [[dataset]]
1193 name = "x"
1194 source.kind = "parquet"
1195 source.location = "/tmp/missing.parquet"
1196 "#;
1197 let cfg: AppConfig = toml::from_str(toml).unwrap();
1198 assert_eq!(cfg.server.backend, Backend::Duckdb);
1199 assert_eq!(cfg.server.port, 9000);
1200 assert_eq!(cfg.server.prefix, "/datapress");
1201 assert!(!cfg.server.compress);
1202 assert_eq!(cfg.server.max_body_bytes, 4096);
1203 assert_eq!(cfg.server.max_page_size, 50_000);
1204 assert_eq!(cfg.server.request_timeout_ms, 0);
1205 assert!(cfg.server.quack.enabled);
1206 assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1207 assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1208 assert!(!cfg.server.quack.read_only);
1209 assert_eq!(cfg.datasets.len(), 1);
1210 assert_eq!(cfg.datasets[0].name, "x");
1211 assert!(cfg.datasets[0].dict_encode); }
1213
1214 #[test]
1215 fn validate_rejects_bad_prefix() {
1216 let bad = ["no-leading-slash", "/trailing/"];
1217 for p in bad {
1218 let cfg = AppConfig {
1219 server: ServerConfig {
1220 prefix: p.to_string(),
1221 ..Default::default()
1222 },
1223 docs: DocsConfig::default(),
1224 swagger: SwaggerConfig::default(),
1225 metrics: MetricsConfig::default(),
1226 auth: AuthConfig::default(),
1227 datasets: vec![],
1228 };
1229 assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1230 }
1231 }
1232
1233 #[test]
1234 fn normalize_lowercases_configured_scopes() {
1235 let mut cfg = AppConfig {
1236 server: ServerConfig::default(),
1237 docs: DocsConfig::default(),
1238 swagger: SwaggerConfig::default(),
1239 metrics: MetricsConfig::default(),
1240 auth: AuthConfig {
1241 read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1242 reload_scopes: vec!["Datasets:Reload".into()],
1243 ..Default::default()
1244 },
1245 datasets: vec![],
1246 };
1247 cfg.normalize();
1248 assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1249 assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1250 }
1251
1252 #[test]
1253 fn validate_rejects_no_datasets() {
1254 let cfg = AppConfig {
1255 server: ServerConfig::default(),
1256 docs: DocsConfig::default(),
1257 swagger: SwaggerConfig::default(),
1258 metrics: MetricsConfig::default(),
1259 auth: AuthConfig::default(),
1260 datasets: vec![],
1261 };
1262 let err = cfg.validate().unwrap_err();
1263 assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1264 }
1265
1266 #[cfg(feature = "auth")]
1267 #[test]
1268 fn validate_accepts_auth_issuer_with_trailing_slash() {
1269 use std::io::Write;
1270
1271 let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1272 let _ = std::fs::remove_dir_all(&dir);
1273 std::fs::create_dir_all(&dir).unwrap();
1274 let file = dir.join("a.parquet");
1275 std::fs::File::create(&file)
1276 .unwrap()
1277 .write_all(b"x")
1278 .unwrap();
1279
1280 let cfg = AppConfig {
1281 server: ServerConfig::default(),
1282 docs: DocsConfig::default(),
1283 swagger: SwaggerConfig::default(),
1284 metrics: MetricsConfig::default(),
1285 auth: AuthConfig {
1286 enabled: true,
1287 issuer: "https://tenant.example.com/".into(),
1288 ..Default::default()
1289 },
1290 datasets: vec![DatasetConfig {
1291 name: "x".into(),
1292 source: SourceConfig {
1293 kind: SourceKind::Parquet,
1294 location: file.to_string_lossy().into_owned(),
1295 },
1296 s3: None,
1297 index: IndexConfig::default(),
1298 columns: vec![],
1299 dict_encode: true,
1300 lazy: false,
1301 }],
1302 };
1303
1304 assert!(cfg.validate().is_ok());
1305 let _ = std::fs::remove_dir_all(&dir);
1306 }
1307
1308 #[test]
1309 fn validate_rejects_quack_non_local_host_without_override() {
1310 let cfg = AppConfig {
1311 server: ServerConfig {
1312 quack: QuackConfig {
1313 enabled: true,
1314 uri: "quack:127.0.0.1".into(),
1315 token: Some("test-token".into()),
1316 ..Default::default()
1317 },
1318 ..Default::default()
1319 },
1320 docs: DocsConfig::default(),
1321 swagger: SwaggerConfig::default(),
1322 metrics: MetricsConfig::default(),
1323 auth: AuthConfig::default(),
1324 datasets: vec![DatasetConfig {
1325 name: "x".into(),
1326 source: SourceConfig {
1327 kind: SourceKind::Parquet,
1328 location: "/tmp/missing.parquet".into(),
1329 },
1330 s3: None,
1331 index: IndexConfig::default(),
1332 columns: vec![],
1333 dict_encode: true,
1334 lazy: false,
1335 }],
1336 };
1337 let err = cfg.validate().unwrap_err();
1338 assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1339 }
1340
1341 #[test]
1342 fn validate_rejects_bad_dataset_name() {
1343 let cfg: AppConfig = toml::from_str(
1344 r#"
1345 [[dataset]]
1346 name = "bad name!"
1347 source.kind = "parquet"
1348 source.location = "/tmp/whatever"
1349 "#,
1350 )
1351 .unwrap();
1352 let err = cfg.validate().unwrap_err();
1353 assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1354 }
1355
1356 #[test]
1357 fn validate_rejects_duplicate_names() {
1358 use std::io::Write;
1359 let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1360 let _ = std::fs::remove_dir_all(&dir);
1361 std::fs::create_dir_all(&dir).unwrap();
1362 let f = dir.join("a.parquet");
1363 std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1364 let path = f.to_str().unwrap();
1365
1366 let cfg: AppConfig = toml::from_str(&format!(
1367 r#"
1368 [[dataset]]
1369 name = "a"
1370 source.kind = "parquet"
1371 source.location = "{path}"
1372 [[dataset]]
1373 name = "a"
1374 source.kind = "parquet"
1375 source.location = "{path}"
1376 "#
1377 ))
1378 .unwrap();
1379 let err = cfg.validate().expect_err("expected error");
1380 assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1381
1382 let _ = std::fs::remove_dir_all(&dir);
1383 }
1384
1385 #[test]
1386 fn s3_bucket_parsing() {
1387 let mk = |loc: &str| SourceConfig {
1388 kind: SourceKind::Parquet,
1389 location: loc.into(),
1390 };
1391 let s1 = mk("s3://bucket/path/key");
1392 assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1393 let s2 = mk("s3://only-bucket");
1394 assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1395 assert!(mk("s3:///nokey").s3_bucket().is_err());
1396 assert!(mk("/local/path").s3_bucket().is_err());
1397 }
1398
1399 #[test]
1400 fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1401 let mk = |loc: &str| SourceConfig {
1402 kind: SourceKind::Parquet,
1403 location: loc.into(),
1404 };
1405 assert_eq!(
1407 mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1408 "s3://bucket/logs/**/*.parquet"
1409 );
1410 assert_eq!(
1411 mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1412 "s3://bucket/logs/**/*.parquet"
1413 );
1414 assert_eq!(
1416 mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1417 "s3://bucket/logs/*.parquet"
1418 );
1419 assert_eq!(
1421 mk("/local/logs").s3_recursive_parquet_glob(),
1422 "/local/logs"
1423 );
1424 }
1425
1426 #[test]
1427 fn effective_endpoint_folds_bucket_per_mode() {
1428 let virt = S3Config {
1429 endpoint: Some("https://s3.example.com".into()),
1430 addressing_style: AddressingStyle::Virtual,
1431 ..Default::default()
1432 };
1433 assert_eq!(
1435 virt.effective_endpoint("mybucket").as_deref(),
1436 Some("https://mybucket.s3.example.com")
1437 );
1438 let prefixed = S3Config {
1440 endpoint: Some("https://mybucket.s3.example.com".into()),
1441 ..virt.clone()
1442 };
1443 assert_eq!(
1444 prefixed.effective_endpoint("mybucket").as_deref(),
1445 Some("https://mybucket.s3.example.com")
1446 );
1447 let path = S3Config {
1449 addressing_style: AddressingStyle::Path,
1450 ..virt.clone()
1451 };
1452 assert_eq!(
1453 path.effective_endpoint("mybucket").as_deref(),
1454 Some("https://s3.example.com")
1455 );
1456 let forced_off = S3Config {
1458 endpoint_bucket_in_host: BucketInHost::False,
1459 ..virt.clone()
1460 };
1461 assert_eq!(
1462 forced_off.effective_endpoint("mybucket").as_deref(),
1463 Some("https://s3.example.com")
1464 );
1465 let forced_on = S3Config {
1466 endpoint_bucket_in_host: BucketInHost::True,
1467 ..path.clone()
1468 };
1469 assert_eq!(
1470 forced_on.effective_endpoint("mybucket").as_deref(),
1471 Some("https://mybucket.s3.example.com")
1472 );
1473 assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1475 }
1476
1477 #[test]
1478 fn env_prefix_sanitises_name() {
1479 let mk = |name: &str| DatasetConfig {
1480 name: name.into(),
1481 source: SourceConfig {
1482 kind: SourceKind::Parquet,
1483 location: "x".into(),
1484 },
1485 s3: None,
1486 index: IndexConfig::default(),
1487 columns: vec![],
1488 dict_encode: true,
1489 lazy: false,
1490 };
1491 assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1492 assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1493 assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1494 }
1495
1496 #[test]
1497 fn resolve_local_parquet_single_file_and_dir() {
1498 use std::io::Write;
1499 let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1500 let _ = std::fs::remove_dir_all(&dir);
1501 std::fs::create_dir_all(&dir).unwrap();
1502 let f = dir.join("a.parquet");
1503 let mut fh = std::fs::File::create(&f).unwrap();
1504 fh.write_all(b"not really parquet").unwrap();
1505
1506 let mk = |loc: &str| DatasetConfig {
1507 name: "ds".into(),
1508 source: SourceConfig {
1509 kind: SourceKind::Parquet,
1510 location: loc.into(),
1511 },
1512 s3: None,
1513 index: IndexConfig::default(),
1514 columns: vec![],
1515 dict_encode: true,
1516 lazy: false,
1517 };
1518
1519 let files = mk(f.to_str().unwrap())
1521 .resolve_local_parquet_files()
1522 .unwrap();
1523 assert_eq!(files, vec![f.clone()]);
1524
1525 let files = mk(dir.to_str().unwrap())
1527 .resolve_local_parquet_files()
1528 .unwrap();
1529 assert_eq!(files, vec![f.clone()]);
1530
1531 assert!(
1533 mk("/no/such/place.parquet")
1534 .resolve_local_parquet_files()
1535 .is_err()
1536 );
1537
1538 let _ = std::fs::remove_dir_all(&dir);
1539 }
1540}