1use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32const RESERVED_MOUNTS: &[&str] = &[
36 "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45 #[serde(default)]
46 pub server: ServerConfig,
47 #[serde(default)]
48 pub docs: DocsConfig,
49 #[serde(default)]
50 pub swagger: SwaggerConfig,
51 #[serde(default)]
52 pub metrics: MetricsConfig,
53 #[serde(default)]
54 pub explorer: ExplorerConfig,
55 #[serde(default)]
56 pub auth: AuthConfig,
57 #[serde(rename = "dataset", default)]
58 pub datasets: Vec<DatasetConfig>,
59}
60
61#[derive(Debug, Deserialize)]
62#[serde(default)]
63pub struct ServerConfig {
64 pub backend: Backend,
66 pub listen: IpAddr,
69 pub port: u16,
71 pub workers: Option<usize>,
73 pub prefix: String,
79 pub compress: bool,
84 pub max_body_bytes: usize,
89 pub max_page_size: u64,
93 pub request_timeout_ms: u64,
97 pub shutdown_timeout_secs: u64,
102 pub quack: QuackConfig,
105}
106
107impl Default for ServerConfig {
108 fn default() -> Self {
109 Self {
110 backend: Backend::default(),
111 listen: IpAddr::from([127, 0, 0, 1]),
112 port: 8080,
113 workers: None,
114 prefix: String::new(),
115 compress: true,
116 max_body_bytes: 1024 * 1024,
117 max_page_size: 100_000,
118 request_timeout_ms: 30_000,
119 shutdown_timeout_secs: 30,
120 quack: QuackConfig::default(),
121 }
122 }
123}
124
125#[derive(Debug, Clone, Deserialize)]
131#[serde(default)]
132pub struct QuackConfig {
133 pub enabled: bool,
136 pub uri: String,
139 pub token: Option<String>,
142 pub allow_other_hostname: bool,
145 pub read_only: bool,
148}
149
150impl Default for QuackConfig {
151 fn default() -> Self {
152 Self {
153 enabled: false,
154 uri: "quack:localhost".into(),
155 token: None,
156 allow_other_hostname: false,
157 read_only: true,
158 }
159 }
160}
161
162impl QuackConfig {
163 pub fn validate_enabled(&self) -> Result<(), AppError> {
167 if self.uri.trim().is_empty() {
168 return Err(AppError::Internal(
169 "server.quack.uri must not be empty when server.quack.enabled = true".into(),
170 ));
171 }
172 if !self.uri.starts_with("quack:") {
173 return Err(AppError::Internal(format!(
174 "server.quack.uri must start with 'quack:' (got '{}')",
175 self.uri
176 )));
177 }
178 if !self.allow_other_hostname {
179 let host = self.hostname().unwrap_or_default();
180 if host != "localhost" {
181 return Err(AppError::Internal(format!(
182 "server.quack.uri host must be 'localhost' unless \
183 server.quack.allow_other_hostname = true (got '{}')",
184 self.uri
185 )));
186 }
187 }
188 if let Some(token) = self.token.as_deref()
189 && token.len() < 4
190 {
191 return Err(AppError::Internal(
192 "server.quack.token must be at least 4 characters".into(),
193 ));
194 }
195 Ok(())
196 }
197
198 fn hostname(&self) -> Option<&str> {
199 let rest = self.uri.strip_prefix("quack:")?;
200 let rest = rest.strip_prefix("//").unwrap_or(rest);
201 let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
202 (!host.is_empty()).then_some(host)
203 }
204}
205
206#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
207#[serde(rename_all = "lowercase")]
208pub enum Backend {
209 #[default]
210 Datafusion,
211 Duckdb,
212}
213
214#[derive(Debug, Clone, Deserialize)]
224#[serde(default, deny_unknown_fields)]
225pub struct DocsConfig {
226 pub enabled: bool,
227 pub path: String,
228}
229
230impl Default for DocsConfig {
231 fn default() -> Self {
232 Self {
233 enabled: true,
234 path: "/mkdocs".into(),
235 }
236 }
237}
238
239#[derive(Debug, Clone, Deserialize)]
256#[serde(default, deny_unknown_fields)]
257pub struct SwaggerConfig {
258 pub enabled: bool,
259 pub path: String,
260 pub oauth2: Option<SwaggerOAuth2Config>,
261}
262
263impl Default for SwaggerConfig {
264 fn default() -> Self {
265 Self {
266 enabled: true,
267 path: "/docs".into(),
268 oauth2: None,
269 }
270 }
271}
272
273#[derive(Debug, Clone, Deserialize)]
283#[serde(deny_unknown_fields)]
284pub struct SwaggerOAuth2Config {
285 pub issuer: String,
289 pub client_id: String,
294 #[serde(default)]
298 pub scopes: Vec<String>,
299 #[serde(default = "default_true")]
302 pub pkce: bool,
303}
304
305#[derive(Debug, Clone, Deserialize)]
325#[serde(default, deny_unknown_fields)]
326pub struct MetricsConfig {
327 pub enabled: bool,
328 pub path: String,
329}
330
331impl Default for MetricsConfig {
332 fn default() -> Self {
333 Self {
334 enabled: false,
335 path: "/metrics".into(),
336 }
337 }
338}
339
340#[derive(Debug, Clone, Deserialize)]
353#[serde(default, deny_unknown_fields)]
354pub struct ExplorerConfig {
355 pub enabled: bool,
356 pub path: String,
357}
358
359impl Default for ExplorerConfig {
360 fn default() -> Self {
361 Self {
362 enabled: true,
363 path: "/explore".into(),
364 }
365 }
366}
367
368#[derive(Debug, Clone, Deserialize)]
385#[serde(default, deny_unknown_fields)]
386pub struct AuthConfig {
387 pub enabled: bool,
389 pub issuer: String,
392 pub audience: String,
395 pub read_scopes: Vec<String>,
399 pub reload_scopes: Vec<String>,
402 pub anonymous_read: bool,
405 pub start_degraded: bool,
410 pub algorithms: Vec<String>,
414 pub leeway_secs: u64,
416 pub jwks_refresh_secs: u64,
420 pub tenant_claim: String,
425 pub allowed_tenants: Vec<String>,
429 pub admin_token_fallback: bool,
434}
435
436impl Default for AuthConfig {
437 fn default() -> Self {
438 Self {
439 enabled: false,
440 issuer: String::new(),
441 audience: String::new(),
442 read_scopes: Vec::new(),
443 reload_scopes: Vec::new(),
444 anonymous_read: false,
445 start_degraded: true,
446 algorithms: vec!["RS256".into()],
447 leeway_secs: 60,
448 jwks_refresh_secs: 3600,
449 tenant_claim: String::new(),
450 allowed_tenants: Vec::new(),
451 admin_token_fallback: true,
452 }
453 }
454}
455
456impl Backend {
457 pub fn as_str(self) -> &'static str {
458 match self {
459 Backend::Datafusion => "datafusion",
460 Backend::Duckdb => "duckdb",
461 }
462 }
463}
464
465#[derive(Debug, Clone, Deserialize)]
466pub struct DatasetConfig {
467 pub name: String,
468 pub source: SourceConfig,
469 #[serde(default)]
470 pub s3: Option<S3Config>,
471 #[serde(default)]
472 pub index: IndexConfig,
473 #[serde(default)]
479 pub columns: Vec<String>,
480 #[serde(default = "default_true")]
487 pub dict_encode: bool,
488 #[serde(default)]
496 pub lazy: bool,
497}
498
499fn default_true() -> bool {
500 true
501}
502
503#[derive(Debug, Clone, Deserialize)]
504pub struct SourceConfig {
505 pub kind: SourceKind,
506 pub location: String,
508}
509
510#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
511#[serde(rename_all = "lowercase")]
512pub enum SourceKind {
513 #[default]
514 Parquet,
515 Delta,
516}
517
518impl SourceKind {
519 pub fn as_str(self) -> &'static str {
520 match self {
521 SourceKind::Parquet => "parquet",
522 SourceKind::Delta => "delta",
523 }
524 }
525}
526
527#[derive(Debug, Clone, Deserialize)]
530#[serde(default)]
531pub struct S3Config {
532 pub region: Option<String>,
533 pub endpoint: Option<String>,
535 pub addressing_style: AddressingStyle,
538 pub allow_http: bool,
540 pub access_key_id: Option<String>,
543 pub secret_access_key: Option<String>,
544 pub session_token: Option<String>,
545 pub partitioning: Partitioning,
548 pub endpoint_bucket_in_host: BucketInHost,
552}
553
554impl Default for S3Config {
555 fn default() -> Self {
556 Self {
557 region: None,
558 endpoint: None,
559 addressing_style: AddressingStyle::Virtual,
560 allow_http: false,
561 access_key_id: None,
562 secret_access_key: None,
563 session_token: None,
564 partitioning: Partitioning::Auto,
565 endpoint_bucket_in_host: BucketInHost::Auto,
566 }
567 }
568}
569
570impl S3Config {
571 pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
580 let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
581
582 let fold = match self.endpoint_bucket_in_host {
583 BucketInHost::False => false,
584 BucketInHost::True => true,
585 BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
586 };
587 if !fold {
588 return Some(ep.to_string());
589 }
590
591 let (scheme, host_and_path) = match ep.split_once("://") {
592 Some((s, rest)) => (Some(s), rest),
593 None => (None, ep),
594 };
595 let (host, path) = match host_and_path.split_once('/') {
597 Some((h, p)) => (h, Some(p)),
598 None => (host_and_path, None),
599 };
600 if host == bucket || host.starts_with(&format!("{bucket}.")) {
602 return Some(ep.to_string());
603 }
604 let new_host = format!("{bucket}.{host}");
605 let rebuilt = match (scheme, path) {
606 (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
607 (Some(s), None) => format!("{s}://{new_host}"),
608 (None, Some(p)) => format!("{new_host}/{p}"),
609 (None, None) => new_host,
610 };
611 Some(rebuilt)
612 }
613}
614
615#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
616#[serde(rename_all = "lowercase")]
617pub enum AddressingStyle {
618 #[default]
619 Virtual,
620 Path,
621}
622
623impl AddressingStyle {
624 pub fn as_str(self) -> &'static str {
625 match self {
626 AddressingStyle::Virtual => "virtual",
627 AddressingStyle::Path => "path",
628 }
629 }
630}
631
632#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
636#[serde(rename_all = "lowercase")]
637pub enum Partitioning {
638 #[default]
641 Auto,
642 Hive,
645 None,
648}
649
650impl Partitioning {
651 pub fn as_str(self) -> &'static str {
652 match self {
653 Partitioning::Auto => "auto",
654 Partitioning::Hive => "hive",
655 Partitioning::None => "none",
656 }
657 }
658}
659
660#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
665#[serde(rename_all = "lowercase")]
666pub enum BucketInHost {
667 #[default]
670 Auto,
671 True,
673 False,
675}
676
677impl BucketInHost {
678 pub fn as_str(self) -> &'static str {
679 match self {
680 BucketInHost::Auto => "auto",
681 BucketInHost::True => "true",
682 BucketInHost::False => "false",
683 }
684 }
685}
686
687#[derive(Debug, Clone, Deserialize)]
688#[serde(default)]
689pub struct IndexConfig {
690 pub mode: IndexMode,
691 pub columns: Vec<String>,
692 pub max_cardinality: usize,
693}
694
695impl Default for IndexConfig {
696 fn default() -> Self {
697 Self {
698 mode: IndexMode::Auto,
699 columns: Vec::new(),
700 max_cardinality: 100_000,
701 }
702 }
703}
704
705#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
706#[serde(rename_all = "lowercase")]
707pub enum IndexMode {
708 #[default]
709 Auto,
710 None,
711 List,
712}
713
714#[derive(Debug, Clone, Default)]
717pub struct ResolvedCreds {
718 pub access_key_id: Option<String>,
719 pub secret_access_key: Option<String>,
720 pub session_token: Option<String>,
721}
722
723impl ResolvedCreds {
724 pub fn has_keypair(&self) -> bool {
725 self.access_key_id.is_some() && self.secret_access_key.is_some()
726 }
727}
728
729impl AppConfig {
734 pub fn load(path: &str) -> Result<Self, AppError> {
736 let raw = std::fs::read_to_string(path)
737 .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
738 let mut cfg: AppConfig =
739 toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
740 cfg.normalize();
741 cfg.validate()?;
742 Ok(cfg)
743 }
744
745 fn normalize(&mut self) {
753 for s in self
754 .auth
755 .read_scopes
756 .iter_mut()
757 .chain(self.auth.reload_scopes.iter_mut())
758 {
759 *s = s.to_ascii_lowercase();
760 }
761 }
762
763 fn validate(&self) -> Result<(), AppError> {
764 let p = &self.server.prefix;
766 if !p.is_empty() {
767 if !p.starts_with('/') {
768 return Err(AppError::Internal(format!(
769 "server.prefix must start with '/' (got '{p}')"
770 )));
771 }
772 if p.ends_with('/') {
773 return Err(AppError::Internal(format!(
774 "server.prefix must not end with '/' (got '{p}')"
775 )));
776 }
777 }
778
779 if self.datasets.is_empty() {
780 return Err(AppError::Internal(
781 "datasets.toml has no [[dataset]] entries".into(),
782 ));
783 }
784
785 if self.server.quack.enabled {
786 self.server.quack.validate_enabled()?;
787 }
788
789 {
792 let dp = &self.docs.path;
793 if !dp.starts_with('/') {
794 return Err(AppError::Internal(format!(
795 "docs.path must start with '/' (got '{dp}')"
796 )));
797 }
798 if dp.len() > 1 && dp.ends_with('/') {
799 return Err(AppError::Internal(format!(
800 "docs.path must not end with '/' (got '{dp}')"
801 )));
802 }
803 if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
804 return Err(AppError::Internal(format!(
805 "docs.path '{dp}' collides with a reserved route"
806 )));
807 }
808 }
809
810 {
812 let sp = &self.swagger.path;
813 if !sp.starts_with('/') {
814 return Err(AppError::Internal(format!(
815 "swagger.path must start with '/' (got '{sp}')"
816 )));
817 }
818 if sp.len() > 1 && sp.ends_with('/') {
819 return Err(AppError::Internal(format!(
820 "swagger.path must not end with '/' (got '{sp}')"
821 )));
822 }
823 if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
824 return Err(AppError::Internal(format!(
825 "swagger.path '{sp}' collides with a reserved route"
826 )));
827 }
828 if sp == &self.docs.path {
829 return Err(AppError::Internal(format!(
830 "swagger.path and docs.path must differ (both '{sp}')"
831 )));
832 }
833 if let Some(o) = &self.swagger.oauth2 {
834 if o.issuer.trim().is_empty() {
835 return Err(AppError::Internal(
836 "swagger.oauth2.issuer must not be empty".into(),
837 ));
838 }
839 if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
840 return Err(AppError::Internal(format!(
841 "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
842 o.issuer
843 )));
844 }
845 if o.client_id.trim().is_empty() {
846 return Err(AppError::Internal(
847 "swagger.oauth2.client_id must not be empty".into(),
848 ));
849 }
850 }
851 }
852
853 {
859 let mp = &self.metrics.path;
860 if !mp.starts_with('/') {
861 return Err(AppError::Internal(format!(
862 "metrics.path must start with '/' (got '{mp}')"
863 )));
864 }
865 if mp.len() > 1 && mp.ends_with('/') {
866 return Err(AppError::Internal(format!(
867 "metrics.path must not end with '/' (got '{mp}')"
868 )));
869 }
870 if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
871 return Err(AppError::Internal(format!(
872 "metrics.path '{mp}' collides with a reserved route"
873 )));
874 }
875 if mp == &self.docs.path {
876 return Err(AppError::Internal(format!(
877 "metrics.path and docs.path must differ (both '{mp}')"
878 )));
879 }
880 if mp == &self.swagger.path {
881 return Err(AppError::Internal(format!(
882 "metrics.path and swagger.path must differ (both '{mp}')"
883 )));
884 }
885 }
886
887 {
890 let ep = &self.explorer.path;
891 if !ep.starts_with('/') {
892 return Err(AppError::Internal(format!(
893 "explorer.path must start with '/' (got '{ep}')"
894 )));
895 }
896 if ep.len() > 1 && ep.ends_with('/') {
897 return Err(AppError::Internal(format!(
898 "explorer.path must not end with '/' (got '{ep}')"
899 )));
900 }
901 if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
902 return Err(AppError::Internal(format!(
903 "explorer.path '{ep}' collides with a reserved route"
904 )));
905 }
906 if ep == &self.docs.path {
907 return Err(AppError::Internal(format!(
908 "explorer.path and docs.path must differ (both '{ep}')"
909 )));
910 }
911 if ep == &self.swagger.path {
912 return Err(AppError::Internal(format!(
913 "explorer.path and swagger.path must differ (both '{ep}')"
914 )));
915 }
916 if ep == &self.metrics.path {
917 return Err(AppError::Internal(format!(
918 "explorer.path and metrics.path must differ (both '{ep}')"
919 )));
920 }
921 }
922
923 if self.auth.enabled {
928 let a = &self.auth;
929 if a.issuer.trim().is_empty() {
930 return Err(AppError::Internal(
931 "auth.issuer must not be empty when auth.enabled = true".into(),
932 ));
933 }
934 if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
935 return Err(AppError::Internal(format!(
936 "auth.issuer must be an absolute http(s) URL (got '{}')",
937 a.issuer
938 )));
939 }
940 for alg in &a.algorithms {
941 match alg.as_str() {
942 "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
943 | "PS512" => {}
944 other => {
945 return Err(AppError::Internal(format!(
946 "auth.algorithms[{other}] is not allowed; pick one of \
947 RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
948 )));
949 }
950 }
951 }
952 if a.algorithms.is_empty() {
953 return Err(AppError::Internal(
954 "auth.algorithms must not be empty".into(),
955 ));
956 }
957 if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
958 return Err(AppError::Internal(format!(
959 "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
960 a.tenant_claim
961 )));
962 }
963 if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
964 return Err(AppError::Internal(
965 "auth.allowed_tenants is set but auth.tenant_claim is empty — \
966 can't enforce a tenant allow-list without a claim to extract from"
967 .into(),
968 ));
969 }
970 }
971
972 let mut seen = HashSet::new();
973 for d in &self.datasets {
974 if !seen.insert(d.name.as_str()) {
975 return Err(AppError::Internal(format!(
976 "duplicate dataset name: {}",
977 d.name
978 )));
979 }
980 if d.name.is_empty() {
981 return Err(AppError::Internal("dataset name must not be empty".into()));
982 }
983 if !d
985 .name
986 .chars()
987 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
988 {
989 return Err(AppError::Internal(format!(
990 "dataset name '{}' must be alphanumeric (plus _ - .)",
991 d.name
992 )));
993 }
994
995 if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
996 return Err(AppError::Internal(format!(
997 "dataset '{}': index.mode = 'list' requires non-empty index.columns",
998 d.name
999 )));
1000 }
1001
1002 if d.source.is_s3() {
1004 d.source.s3_bucket()?;
1005 if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1006 && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1007 && std::env::var("AWS_REGION").is_err()
1008 && std::env::var("AWS_DEFAULT_REGION").is_err()
1009 {
1010 log::warn!(
1011 "dataset '{}': S3 source without explicit region — \
1012 relying on AWS_REGION env var",
1013 d.name
1014 );
1015 }
1016 } else {
1017 match d.source.kind {
1021 SourceKind::Parquet => {
1022 d.resolve_local_parquet_files()?;
1023 }
1024 SourceKind::Delta => {
1025 let p = Path::new(&d.source.location);
1026 if !p.exists() {
1027 return Err(AppError::Internal(format!(
1028 "dataset '{}': delta location does not exist: {}",
1029 d.name, d.source.location
1030 )));
1031 }
1032 }
1033 }
1034 }
1035 }
1036 Ok(())
1037 }
1038}
1039
1040impl SourceConfig {
1041 pub fn is_s3(&self) -> bool {
1042 self.location.starts_with("s3://")
1043 }
1044
1045 pub fn has_glob(&self) -> bool {
1048 self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1049 }
1050
1051 pub fn s3_recursive_parquet_glob(&self) -> String {
1057 if !self.is_s3() || self.has_glob() {
1058 return self.location.clone();
1059 }
1060 let trimmed = self.location.trim_end_matches('/');
1061 format!("{trimmed}/**/*.parquet")
1062 }
1063
1064 pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1066 let rest = self
1067 .location
1068 .strip_prefix("s3://")
1069 .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1070 let (bucket, key) = match rest.split_once('/') {
1071 Some((b, k)) => (b, k),
1072 None => (rest, ""),
1073 };
1074 if bucket.is_empty() {
1075 return Err(AppError::Internal(format!(
1076 "s3 URL missing bucket: {}",
1077 self.location
1078 )));
1079 }
1080 Ok((bucket, key))
1081 }
1082}
1083
1084impl DatasetConfig {
1085 pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1095 if self.source.is_s3() {
1096 return Err(AppError::Internal(format!(
1097 "dataset '{}': resolve_local_parquet_files called on s3 source",
1098 self.name
1099 )));
1100 }
1101 let loc = &self.source.location;
1102
1103 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1105 let mut files: Vec<PathBuf> = glob::glob(loc)
1106 .map_err(|e| {
1107 AppError::Internal(format!(
1108 "dataset '{}': bad glob pattern '{loc}': {e}",
1109 self.name
1110 ))
1111 })?
1112 .filter_map(|r| r.ok())
1113 .filter(|p| {
1114 p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1115 })
1116 .collect();
1117 files.sort();
1118 if files.is_empty() {
1119 return Err(AppError::Internal(format!(
1120 "dataset '{}': glob '{loc}' matched no .parquet files",
1121 self.name
1122 )));
1123 }
1124 return Ok(files);
1125 }
1126
1127 let path = Path::new(loc);
1128 if !path.exists() {
1129 return Err(AppError::Internal(format!(
1130 "dataset '{}': source path does not exist: {loc}",
1131 self.name
1132 )));
1133 }
1134
1135 if path.is_file() {
1136 if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1137 return Err(AppError::Internal(format!(
1138 "dataset '{}': source must be a .parquet file",
1139 self.name
1140 )));
1141 }
1142 return Ok(vec![path.to_path_buf()]);
1143 }
1144
1145 let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1146 .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1147 .filter_map(|entry| entry.ok().map(|e| e.path()))
1148 .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1149 .collect();
1150 files.sort();
1151 if files.is_empty() {
1152 return Err(AppError::Internal(format!(
1153 "dataset '{}': no *.parquet files found in {loc}",
1154 self.name
1155 )));
1156 }
1157 Ok(files)
1158 }
1159
1160 pub fn env_prefix(&self) -> String {
1164 self.name
1165 .chars()
1166 .map(|c| {
1167 if c.is_ascii_alphanumeric() {
1168 c.to_ascii_uppercase()
1169 } else {
1170 '_'
1171 }
1172 })
1173 .collect()
1174 }
1175
1176 pub fn resolved_creds(&self) -> ResolvedCreds {
1181 let prefix = self.env_prefix();
1182 let from_env = |suffix: &str| {
1183 std::env::var(format!("{prefix}_{suffix}"))
1184 .ok()
1185 .filter(|s| !s.is_empty())
1186 };
1187 let inline = self.s3.as_ref();
1188 let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1189
1190 ResolvedCreds {
1191 access_key_id: from_env("AWS_ACCESS_KEY_ID")
1192 .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1193 .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1194 secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1195 .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1196 .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1197 session_token: from_env("AWS_SESSION_TOKEN")
1198 .or_else(|| inline.and_then(|s| s.session_token.clone()))
1199 .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1200 }
1201 }
1202
1203 pub fn resolved_region(&self) -> String {
1206 let prefix = self.env_prefix();
1207 std::env::var(format!("{prefix}_AWS_REGION"))
1208 .ok()
1209 .filter(|s| !s.is_empty())
1210 .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1211 .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1212 .or_else(|| {
1213 std::env::var("AWS_DEFAULT_REGION")
1214 .ok()
1215 .filter(|s| !s.is_empty())
1216 })
1217 .unwrap_or_else(|| "us-east-1".to_string())
1218 }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223 use super::*;
1224
1225 #[test]
1226 fn server_defaults() {
1227 let s = ServerConfig::default();
1228 assert_eq!(s.backend, Backend::Datafusion);
1229 assert_eq!(s.port, 8080);
1230 assert!(s.compress);
1231 assert_eq!(s.max_body_bytes, 1024 * 1024);
1232 assert_eq!(s.max_page_size, 100_000);
1233 assert_eq!(s.request_timeout_ms, 30_000);
1234 assert!(!s.quack.enabled);
1235 assert_eq!(s.quack.uri, "quack:localhost");
1236 assert!(s.quack.token.is_none());
1237 assert!(!s.quack.allow_other_hostname);
1238 assert!(s.quack.read_only);
1239 assert_eq!(s.prefix, "");
1240 assert!(s.listen.is_loopback());
1241 }
1242
1243 #[test]
1244 fn server_overrides_from_toml() {
1245 let toml = r#"
1246 [server]
1247 backend = "duckdb"
1248 port = 9000
1249 prefix = "/datapress"
1250 compress = false
1251 max_body_bytes = 4096
1252 max_page_size = 50000
1253 request_timeout_ms = 0
1254
1255 [server.quack]
1256 enabled = true
1257 uri = "quack:localhost:9495"
1258 token = "test-token"
1259 read_only = false
1260 [[dataset]]
1261 name = "x"
1262 source.kind = "parquet"
1263 source.location = "/tmp/missing.parquet"
1264 "#;
1265 let cfg: AppConfig = toml::from_str(toml).unwrap();
1266 assert_eq!(cfg.server.backend, Backend::Duckdb);
1267 assert_eq!(cfg.server.port, 9000);
1268 assert_eq!(cfg.server.prefix, "/datapress");
1269 assert!(!cfg.server.compress);
1270 assert_eq!(cfg.server.max_body_bytes, 4096);
1271 assert_eq!(cfg.server.max_page_size, 50_000);
1272 assert_eq!(cfg.server.request_timeout_ms, 0);
1273 assert!(cfg.server.quack.enabled);
1274 assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1275 assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1276 assert!(!cfg.server.quack.read_only);
1277 assert_eq!(cfg.datasets.len(), 1);
1278 assert_eq!(cfg.datasets[0].name, "x");
1279 assert!(cfg.datasets[0].dict_encode); }
1281
1282 #[test]
1283 fn validate_rejects_bad_prefix() {
1284 let bad = ["no-leading-slash", "/trailing/"];
1285 for p in bad {
1286 let cfg = AppConfig {
1287 server: ServerConfig {
1288 prefix: p.to_string(),
1289 ..Default::default()
1290 },
1291 docs: DocsConfig::default(),
1292 swagger: SwaggerConfig::default(),
1293 metrics: MetricsConfig::default(),
1294 explorer: ExplorerConfig::default(),
1295 auth: AuthConfig::default(),
1296 datasets: vec![],
1297 };
1298 assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1299 }
1300 }
1301
1302 #[test]
1303 fn normalize_lowercases_configured_scopes() {
1304 let mut cfg = AppConfig {
1305 server: ServerConfig::default(),
1306 docs: DocsConfig::default(),
1307 swagger: SwaggerConfig::default(),
1308 metrics: MetricsConfig::default(),
1309 explorer: ExplorerConfig::default(),
1310 auth: AuthConfig {
1311 read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1312 reload_scopes: vec!["Datasets:Reload".into()],
1313 ..Default::default()
1314 },
1315 datasets: vec![],
1316 };
1317 cfg.normalize();
1318 assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1319 assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1320 }
1321
1322 #[test]
1323 fn validate_rejects_no_datasets() {
1324 let cfg = AppConfig {
1325 server: ServerConfig::default(),
1326 docs: DocsConfig::default(),
1327 swagger: SwaggerConfig::default(),
1328 metrics: MetricsConfig::default(),
1329 explorer: ExplorerConfig::default(),
1330 auth: AuthConfig::default(),
1331 datasets: vec![],
1332 };
1333 let err = cfg.validate().unwrap_err();
1334 assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1335 }
1336
1337 #[cfg(feature = "auth")]
1338 #[test]
1339 fn validate_accepts_auth_issuer_with_trailing_slash() {
1340 use std::io::Write;
1341
1342 let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1343 let _ = std::fs::remove_dir_all(&dir);
1344 std::fs::create_dir_all(&dir).unwrap();
1345 let file = dir.join("a.parquet");
1346 std::fs::File::create(&file)
1347 .unwrap()
1348 .write_all(b"x")
1349 .unwrap();
1350
1351 let cfg = AppConfig {
1352 server: ServerConfig::default(),
1353 docs: DocsConfig::default(),
1354 swagger: SwaggerConfig::default(),
1355 metrics: MetricsConfig::default(),
1356 explorer: ExplorerConfig::default(),
1357 auth: AuthConfig {
1358 enabled: true,
1359 issuer: "https://tenant.example.com/".into(),
1360 ..Default::default()
1361 },
1362 datasets: vec![DatasetConfig {
1363 name: "x".into(),
1364 source: SourceConfig {
1365 kind: SourceKind::Parquet,
1366 location: file.to_string_lossy().into_owned(),
1367 },
1368 s3: None,
1369 index: IndexConfig::default(),
1370 columns: vec![],
1371 dict_encode: true,
1372 lazy: false,
1373 }],
1374 };
1375
1376 assert!(cfg.validate().is_ok());
1377 let _ = std::fs::remove_dir_all(&dir);
1378 }
1379
1380 #[test]
1381 fn validate_rejects_quack_non_local_host_without_override() {
1382 let cfg = AppConfig {
1383 server: ServerConfig {
1384 quack: QuackConfig {
1385 enabled: true,
1386 uri: "quack:127.0.0.1".into(),
1387 token: Some("test-token".into()),
1388 ..Default::default()
1389 },
1390 ..Default::default()
1391 },
1392 docs: DocsConfig::default(),
1393 swagger: SwaggerConfig::default(),
1394 metrics: MetricsConfig::default(),
1395 explorer: ExplorerConfig::default(),
1396 auth: AuthConfig::default(),
1397 datasets: vec![DatasetConfig {
1398 name: "x".into(),
1399 source: SourceConfig {
1400 kind: SourceKind::Parquet,
1401 location: "/tmp/missing.parquet".into(),
1402 },
1403 s3: None,
1404 index: IndexConfig::default(),
1405 columns: vec![],
1406 dict_encode: true,
1407 lazy: false,
1408 }],
1409 };
1410 let err = cfg.validate().unwrap_err();
1411 assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1412 }
1413
1414 #[test]
1415 fn validate_rejects_bad_dataset_name() {
1416 let cfg: AppConfig = toml::from_str(
1417 r#"
1418 [[dataset]]
1419 name = "bad name!"
1420 source.kind = "parquet"
1421 source.location = "/tmp/whatever"
1422 "#,
1423 )
1424 .unwrap();
1425 let err = cfg.validate().unwrap_err();
1426 assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1427 }
1428
1429 #[test]
1430 fn validate_rejects_duplicate_names() {
1431 use std::io::Write;
1432 let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1433 let _ = std::fs::remove_dir_all(&dir);
1434 std::fs::create_dir_all(&dir).unwrap();
1435 let f = dir.join("a.parquet");
1436 std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1437 let path = f.to_str().unwrap();
1438
1439 let cfg: AppConfig = toml::from_str(&format!(
1440 r#"
1441 [[dataset]]
1442 name = "a"
1443 source.kind = "parquet"
1444 source.location = "{path}"
1445 [[dataset]]
1446 name = "a"
1447 source.kind = "parquet"
1448 source.location = "{path}"
1449 "#
1450 ))
1451 .unwrap();
1452 let err = cfg.validate().expect_err("expected error");
1453 assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1454
1455 let _ = std::fs::remove_dir_all(&dir);
1456 }
1457
1458 #[test]
1459 fn s3_bucket_parsing() {
1460 let mk = |loc: &str| SourceConfig {
1461 kind: SourceKind::Parquet,
1462 location: loc.into(),
1463 };
1464 let s1 = mk("s3://bucket/path/key");
1465 assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1466 let s2 = mk("s3://only-bucket");
1467 assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1468 assert!(mk("s3:///nokey").s3_bucket().is_err());
1469 assert!(mk("/local/path").s3_bucket().is_err());
1470 }
1471
1472 #[test]
1473 fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1474 let mk = |loc: &str| SourceConfig {
1475 kind: SourceKind::Parquet,
1476 location: loc.into(),
1477 };
1478 assert_eq!(
1480 mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1481 "s3://bucket/logs/**/*.parquet"
1482 );
1483 assert_eq!(
1484 mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1485 "s3://bucket/logs/**/*.parquet"
1486 );
1487 assert_eq!(
1489 mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1490 "s3://bucket/logs/*.parquet"
1491 );
1492 assert_eq!(
1494 mk("/local/logs").s3_recursive_parquet_glob(),
1495 "/local/logs"
1496 );
1497 }
1498
1499 #[test]
1500 fn effective_endpoint_folds_bucket_per_mode() {
1501 let virt = S3Config {
1502 endpoint: Some("https://s3.example.com".into()),
1503 addressing_style: AddressingStyle::Virtual,
1504 ..Default::default()
1505 };
1506 assert_eq!(
1508 virt.effective_endpoint("mybucket").as_deref(),
1509 Some("https://mybucket.s3.example.com")
1510 );
1511 let prefixed = S3Config {
1513 endpoint: Some("https://mybucket.s3.example.com".into()),
1514 ..virt.clone()
1515 };
1516 assert_eq!(
1517 prefixed.effective_endpoint("mybucket").as_deref(),
1518 Some("https://mybucket.s3.example.com")
1519 );
1520 let path = S3Config {
1522 addressing_style: AddressingStyle::Path,
1523 ..virt.clone()
1524 };
1525 assert_eq!(
1526 path.effective_endpoint("mybucket").as_deref(),
1527 Some("https://s3.example.com")
1528 );
1529 let forced_off = S3Config {
1531 endpoint_bucket_in_host: BucketInHost::False,
1532 ..virt.clone()
1533 };
1534 assert_eq!(
1535 forced_off.effective_endpoint("mybucket").as_deref(),
1536 Some("https://s3.example.com")
1537 );
1538 let forced_on = S3Config {
1539 endpoint_bucket_in_host: BucketInHost::True,
1540 ..path.clone()
1541 };
1542 assert_eq!(
1543 forced_on.effective_endpoint("mybucket").as_deref(),
1544 Some("https://mybucket.s3.example.com")
1545 );
1546 assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1548 }
1549
1550 #[test]
1551 fn env_prefix_sanitises_name() {
1552 let mk = |name: &str| DatasetConfig {
1553 name: name.into(),
1554 source: SourceConfig {
1555 kind: SourceKind::Parquet,
1556 location: "x".into(),
1557 },
1558 s3: None,
1559 index: IndexConfig::default(),
1560 columns: vec![],
1561 dict_encode: true,
1562 lazy: false,
1563 };
1564 assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1565 assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1566 assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1567 }
1568
1569 #[test]
1570 fn resolve_local_parquet_single_file_and_dir() {
1571 use std::io::Write;
1572 let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1573 let _ = std::fs::remove_dir_all(&dir);
1574 std::fs::create_dir_all(&dir).unwrap();
1575 let f = dir.join("a.parquet");
1576 let mut fh = std::fs::File::create(&f).unwrap();
1577 fh.write_all(b"not really parquet").unwrap();
1578
1579 let mk = |loc: &str| DatasetConfig {
1580 name: "ds".into(),
1581 source: SourceConfig {
1582 kind: SourceKind::Parquet,
1583 location: loc.into(),
1584 },
1585 s3: None,
1586 index: IndexConfig::default(),
1587 columns: vec![],
1588 dict_encode: true,
1589 lazy: false,
1590 };
1591
1592 let files = mk(f.to_str().unwrap())
1594 .resolve_local_parquet_files()
1595 .unwrap();
1596 assert_eq!(files, vec![f.clone()]);
1597
1598 let files = mk(dir.to_str().unwrap())
1600 .resolve_local_parquet_files()
1601 .unwrap();
1602 assert_eq!(files, vec![f.clone()]);
1603
1604 assert!(
1606 mk("/no/such/place.parquet")
1607 .resolve_local_parquet_files()
1608 .is_err()
1609 );
1610
1611 let _ = std::fs::remove_dir_all(&dir);
1612 }
1613}