1use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32const RESERVED_MOUNTS: &[&str] = &[
36 "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45 #[serde(default)]
46 pub server: ServerConfig,
47 #[serde(default)]
48 pub docs: DocsConfig,
49 #[serde(default)]
50 pub swagger: SwaggerConfig,
51 #[serde(default)]
52 pub metrics: MetricsConfig,
53 #[serde(default)]
54 pub explorer: ExplorerConfig,
55 #[serde(default)]
56 pub auth: AuthConfig,
57 #[serde(rename = "dataset", default)]
58 pub datasets: Vec<DatasetConfig>,
59}
60
61#[derive(Debug, Deserialize)]
62#[serde(default)]
63pub struct ServerConfig {
64 pub backend: Backend,
66 pub listen: IpAddr,
69 pub port: u16,
71 pub workers: Option<usize>,
73 pub prefix: String,
79 pub compress: bool,
84 pub max_body_bytes: usize,
89 pub max_page_size: u64,
93 pub request_timeout_ms: u64,
97 pub shutdown_timeout_secs: u64,
102 pub quack: QuackConfig,
105}
106
107impl Default for ServerConfig {
108 fn default() -> Self {
109 Self {
110 backend: Backend::default(),
111 listen: IpAddr::from([127, 0, 0, 1]),
112 port: 8080,
113 workers: None,
114 prefix: String::new(),
115 compress: true,
116 max_body_bytes: 1024 * 1024,
117 max_page_size: 100_000,
118 request_timeout_ms: 30_000,
119 shutdown_timeout_secs: 30,
120 quack: QuackConfig::default(),
121 }
122 }
123}
124
125#[derive(Debug, Clone, Deserialize)]
131#[serde(default)]
132pub struct QuackConfig {
133 pub enabled: bool,
136 pub uri: String,
139 pub token: Option<String>,
142 pub allow_other_hostname: bool,
145 pub read_only: bool,
148}
149
150impl Default for QuackConfig {
151 fn default() -> Self {
152 Self {
153 enabled: false,
154 uri: "quack:localhost".into(),
155 token: None,
156 allow_other_hostname: false,
157 read_only: true,
158 }
159 }
160}
161
162impl QuackConfig {
163 pub fn validate_enabled(&self) -> Result<(), AppError> {
167 if self.uri.trim().is_empty() {
168 return Err(AppError::Internal(
169 "server.quack.uri must not be empty when server.quack.enabled = true".into(),
170 ));
171 }
172 if !self.uri.starts_with("quack:") {
173 return Err(AppError::Internal(format!(
174 "server.quack.uri must start with 'quack:' (got '{}')",
175 self.uri
176 )));
177 }
178 if !self.allow_other_hostname {
179 let host = self.hostname().unwrap_or_default();
180 if host != "localhost" {
181 return Err(AppError::Internal(format!(
182 "server.quack.uri host must be 'localhost' unless \
183 server.quack.allow_other_hostname = true (got '{}')",
184 self.uri
185 )));
186 }
187 }
188 if let Some(token) = self.token.as_deref()
189 && token.len() < 4
190 {
191 return Err(AppError::Internal(
192 "server.quack.token must be at least 4 characters".into(),
193 ));
194 }
195 Ok(())
196 }
197
198 fn hostname(&self) -> Option<&str> {
199 let rest = self.uri.strip_prefix("quack:")?;
200 let rest = rest.strip_prefix("//").unwrap_or(rest);
201 let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
202 (!host.is_empty()).then_some(host)
203 }
204}
205
206#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
207#[serde(rename_all = "lowercase")]
208pub enum Backend {
209 #[default]
210 Datafusion,
211 Duckdb,
212}
213
214#[derive(Debug, Clone, Deserialize)]
224#[serde(default, deny_unknown_fields)]
225pub struct DocsConfig {
226 pub enabled: bool,
227 pub path: String,
228}
229
230impl Default for DocsConfig {
231 fn default() -> Self {
232 Self {
233 enabled: true,
234 path: "/mkdocs".into(),
235 }
236 }
237}
238
239#[derive(Debug, Clone, Deserialize)]
256#[serde(default, deny_unknown_fields)]
257pub struct SwaggerConfig {
258 pub enabled: bool,
259 pub path: String,
260 pub oauth2: Option<SwaggerOAuth2Config>,
261}
262
263impl Default for SwaggerConfig {
264 fn default() -> Self {
265 Self {
266 enabled: true,
267 path: "/docs".into(),
268 oauth2: None,
269 }
270 }
271}
272
273#[derive(Debug, Clone, Deserialize)]
283#[serde(deny_unknown_fields)]
284pub struct SwaggerOAuth2Config {
285 pub issuer: String,
289 pub client_id: String,
294 #[serde(default)]
298 pub scopes: Vec<String>,
299 #[serde(default = "default_true")]
302 pub pkce: bool,
303}
304
305#[derive(Debug, Clone, Deserialize)]
325#[serde(default, deny_unknown_fields)]
326pub struct MetricsConfig {
327 pub enabled: bool,
328 pub path: String,
329}
330
331impl Default for MetricsConfig {
332 fn default() -> Self {
333 Self {
334 enabled: false,
335 path: "/metrics".into(),
336 }
337 }
338}
339
340#[derive(Debug, Clone, Deserialize)]
353#[serde(default, deny_unknown_fields)]
354pub struct ExplorerConfig {
355 pub enabled: bool,
356 pub path: String,
357}
358
359impl Default for ExplorerConfig {
360 fn default() -> Self {
361 Self {
362 enabled: true,
363 path: "/explore".into(),
364 }
365 }
366}
367
368#[derive(Debug, Clone, Deserialize)]
385#[serde(default, deny_unknown_fields)]
386pub struct AuthConfig {
387 pub enabled: bool,
389 pub issuer: String,
392 pub audience: String,
395 pub read_scopes: Vec<String>,
399 pub reload_scopes: Vec<String>,
402 pub anonymous_read: bool,
405 pub start_degraded: bool,
410 pub algorithms: Vec<String>,
414 pub leeway_secs: u64,
416 pub jwks_refresh_secs: u64,
420 pub tenant_claim: String,
425 pub allowed_tenants: Vec<String>,
429 pub admin_token_fallback: bool,
434}
435
436impl Default for AuthConfig {
437 fn default() -> Self {
438 Self {
439 enabled: false,
440 issuer: String::new(),
441 audience: String::new(),
442 read_scopes: Vec::new(),
443 reload_scopes: Vec::new(),
444 anonymous_read: false,
445 start_degraded: true,
446 algorithms: vec!["RS256".into()],
447 leeway_secs: 60,
448 jwks_refresh_secs: 3600,
449 tenant_claim: String::new(),
450 allowed_tenants: Vec::new(),
451 admin_token_fallback: true,
452 }
453 }
454}
455
456impl Backend {
457 pub fn as_str(self) -> &'static str {
458 match self {
459 Backend::Datafusion => "datafusion",
460 Backend::Duckdb => "duckdb",
461 }
462 }
463}
464
465#[derive(Debug, Clone, Deserialize)]
466pub struct DatasetConfig {
467 pub name: String,
468 pub source: SourceConfig,
469 #[serde(default)]
470 pub s3: Option<S3Config>,
471 #[serde(default)]
472 pub index: IndexConfig,
473 #[serde(default)]
479 pub columns: Vec<String>,
480 #[serde(default = "default_true")]
487 pub dict_encode: bool,
488 #[serde(default)]
494 pub lazy: bool,
495}
496
497fn default_true() -> bool {
498 true
499}
500
501#[derive(Debug, Clone, Deserialize)]
502pub struct SourceConfig {
503 pub kind: SourceKind,
504 pub location: String,
506}
507
508#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
509#[serde(rename_all = "lowercase")]
510pub enum SourceKind {
511 #[default]
512 Parquet,
513 Delta,
514}
515
516impl SourceKind {
517 pub fn as_str(self) -> &'static str {
518 match self {
519 SourceKind::Parquet => "parquet",
520 SourceKind::Delta => "delta",
521 }
522 }
523}
524
525#[derive(Debug, Clone, Deserialize)]
528#[serde(default)]
529pub struct S3Config {
530 pub region: Option<String>,
531 pub endpoint: Option<String>,
533 pub addressing_style: AddressingStyle,
536 pub allow_http: bool,
538 pub access_key_id: Option<String>,
541 pub secret_access_key: Option<String>,
542 pub session_token: Option<String>,
543 pub partitioning: Partitioning,
546 pub endpoint_bucket_in_host: BucketInHost,
550}
551
552impl Default for S3Config {
553 fn default() -> Self {
554 Self {
555 region: None,
556 endpoint: None,
557 addressing_style: AddressingStyle::Virtual,
558 allow_http: false,
559 access_key_id: None,
560 secret_access_key: None,
561 session_token: None,
562 partitioning: Partitioning::Auto,
563 endpoint_bucket_in_host: BucketInHost::Auto,
564 }
565 }
566}
567
568impl S3Config {
569 pub fn effective_endpoint(&self, bucket: &str) -> Option<String> {
578 let ep = self.endpoint.as_deref().filter(|s| !s.is_empty())?;
579
580 let fold = match self.endpoint_bucket_in_host {
581 BucketInHost::False => false,
582 BucketInHost::True => true,
583 BucketInHost::Auto => self.addressing_style == AddressingStyle::Virtual,
584 };
585 if !fold {
586 return Some(ep.to_string());
587 }
588
589 let (scheme, host_and_path) = match ep.split_once("://") {
590 Some((s, rest)) => (Some(s), rest),
591 None => (None, ep),
592 };
593 let (host, path) = match host_and_path.split_once('/') {
595 Some((h, p)) => (h, Some(p)),
596 None => (host_and_path, None),
597 };
598 if host == bucket || host.starts_with(&format!("{bucket}.")) {
600 return Some(ep.to_string());
601 }
602 let new_host = format!("{bucket}.{host}");
603 let rebuilt = match (scheme, path) {
604 (Some(s), Some(p)) => format!("{s}://{new_host}/{p}"),
605 (Some(s), None) => format!("{s}://{new_host}"),
606 (None, Some(p)) => format!("{new_host}/{p}"),
607 (None, None) => new_host,
608 };
609 Some(rebuilt)
610 }
611}
612
613#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
614#[serde(rename_all = "lowercase")]
615pub enum AddressingStyle {
616 #[default]
617 Virtual,
618 Path,
619}
620
621impl AddressingStyle {
622 pub fn as_str(self) -> &'static str {
623 match self {
624 AddressingStyle::Virtual => "virtual",
625 AddressingStyle::Path => "path",
626 }
627 }
628}
629
630#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
634#[serde(rename_all = "lowercase")]
635pub enum Partitioning {
636 #[default]
639 Auto,
640 Hive,
643 None,
646}
647
648impl Partitioning {
649 pub fn as_str(self) -> &'static str {
650 match self {
651 Partitioning::Auto => "auto",
652 Partitioning::Hive => "hive",
653 Partitioning::None => "none",
654 }
655 }
656}
657
658#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
663#[serde(rename_all = "lowercase")]
664pub enum BucketInHost {
665 #[default]
668 Auto,
669 True,
671 False,
673}
674
675impl BucketInHost {
676 pub fn as_str(self) -> &'static str {
677 match self {
678 BucketInHost::Auto => "auto",
679 BucketInHost::True => "true",
680 BucketInHost::False => "false",
681 }
682 }
683}
684
685#[derive(Debug, Clone, Deserialize)]
686#[serde(default)]
687pub struct IndexConfig {
688 pub mode: IndexMode,
689 pub columns: Vec<String>,
690 pub max_cardinality: usize,
691}
692
693impl Default for IndexConfig {
694 fn default() -> Self {
695 Self {
696 mode: IndexMode::Auto,
697 columns: Vec::new(),
698 max_cardinality: 100_000,
699 }
700 }
701}
702
703#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
704#[serde(rename_all = "lowercase")]
705pub enum IndexMode {
706 #[default]
707 Auto,
708 None,
709 List,
710}
711
712#[derive(Debug, Clone, Default)]
715pub struct ResolvedCreds {
716 pub access_key_id: Option<String>,
717 pub secret_access_key: Option<String>,
718 pub session_token: Option<String>,
719}
720
721impl ResolvedCreds {
722 pub fn has_keypair(&self) -> bool {
723 self.access_key_id.is_some() && self.secret_access_key.is_some()
724 }
725}
726
727impl AppConfig {
732 pub fn load(path: &str) -> Result<Self, AppError> {
734 let raw = std::fs::read_to_string(path)
735 .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
736 let mut cfg: AppConfig =
737 toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
738 cfg.normalize();
739 cfg.validate()?;
740 Ok(cfg)
741 }
742
743 fn normalize(&mut self) {
751 for s in self
752 .auth
753 .read_scopes
754 .iter_mut()
755 .chain(self.auth.reload_scopes.iter_mut())
756 {
757 *s = s.to_ascii_lowercase();
758 }
759 }
760
761 fn validate(&self) -> Result<(), AppError> {
762 let p = &self.server.prefix;
764 if !p.is_empty() {
765 if !p.starts_with('/') {
766 return Err(AppError::Internal(format!(
767 "server.prefix must start with '/' (got '{p}')"
768 )));
769 }
770 if p.ends_with('/') {
771 return Err(AppError::Internal(format!(
772 "server.prefix must not end with '/' (got '{p}')"
773 )));
774 }
775 }
776
777 if self.datasets.is_empty() {
778 return Err(AppError::Internal(
779 "datasets.toml has no [[dataset]] entries".into(),
780 ));
781 }
782
783 if self.server.quack.enabled {
784 self.server.quack.validate_enabled()?;
785 }
786
787 {
790 let dp = &self.docs.path;
791 if !dp.starts_with('/') {
792 return Err(AppError::Internal(format!(
793 "docs.path must start with '/' (got '{dp}')"
794 )));
795 }
796 if dp.len() > 1 && dp.ends_with('/') {
797 return Err(AppError::Internal(format!(
798 "docs.path must not end with '/' (got '{dp}')"
799 )));
800 }
801 if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
802 return Err(AppError::Internal(format!(
803 "docs.path '{dp}' collides with a reserved route"
804 )));
805 }
806 }
807
808 {
810 let sp = &self.swagger.path;
811 if !sp.starts_with('/') {
812 return Err(AppError::Internal(format!(
813 "swagger.path must start with '/' (got '{sp}')"
814 )));
815 }
816 if sp.len() > 1 && sp.ends_with('/') {
817 return Err(AppError::Internal(format!(
818 "swagger.path must not end with '/' (got '{sp}')"
819 )));
820 }
821 if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
822 return Err(AppError::Internal(format!(
823 "swagger.path '{sp}' collides with a reserved route"
824 )));
825 }
826 if sp == &self.docs.path {
827 return Err(AppError::Internal(format!(
828 "swagger.path and docs.path must differ (both '{sp}')"
829 )));
830 }
831 if let Some(o) = &self.swagger.oauth2 {
832 if o.issuer.trim().is_empty() {
833 return Err(AppError::Internal(
834 "swagger.oauth2.issuer must not be empty".into(),
835 ));
836 }
837 if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
838 return Err(AppError::Internal(format!(
839 "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
840 o.issuer
841 )));
842 }
843 if o.client_id.trim().is_empty() {
844 return Err(AppError::Internal(
845 "swagger.oauth2.client_id must not be empty".into(),
846 ));
847 }
848 }
849 }
850
851 {
857 let mp = &self.metrics.path;
858 if !mp.starts_with('/') {
859 return Err(AppError::Internal(format!(
860 "metrics.path must start with '/' (got '{mp}')"
861 )));
862 }
863 if mp.len() > 1 && mp.ends_with('/') {
864 return Err(AppError::Internal(format!(
865 "metrics.path must not end with '/' (got '{mp}')"
866 )));
867 }
868 if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
869 return Err(AppError::Internal(format!(
870 "metrics.path '{mp}' collides with a reserved route"
871 )));
872 }
873 if mp == &self.docs.path {
874 return Err(AppError::Internal(format!(
875 "metrics.path and docs.path must differ (both '{mp}')"
876 )));
877 }
878 if mp == &self.swagger.path {
879 return Err(AppError::Internal(format!(
880 "metrics.path and swagger.path must differ (both '{mp}')"
881 )));
882 }
883 }
884
885 {
888 let ep = &self.explorer.path;
889 if !ep.starts_with('/') {
890 return Err(AppError::Internal(format!(
891 "explorer.path must start with '/' (got '{ep}')"
892 )));
893 }
894 if ep.len() > 1 && ep.ends_with('/') {
895 return Err(AppError::Internal(format!(
896 "explorer.path must not end with '/' (got '{ep}')"
897 )));
898 }
899 if RESERVED_MOUNTS.iter().any(|r| *r == ep) {
900 return Err(AppError::Internal(format!(
901 "explorer.path '{ep}' collides with a reserved route"
902 )));
903 }
904 if ep == &self.docs.path {
905 return Err(AppError::Internal(format!(
906 "explorer.path and docs.path must differ (both '{ep}')"
907 )));
908 }
909 if ep == &self.swagger.path {
910 return Err(AppError::Internal(format!(
911 "explorer.path and swagger.path must differ (both '{ep}')"
912 )));
913 }
914 if ep == &self.metrics.path {
915 return Err(AppError::Internal(format!(
916 "explorer.path and metrics.path must differ (both '{ep}')"
917 )));
918 }
919 }
920
921 if self.auth.enabled {
926 let a = &self.auth;
927 if a.issuer.trim().is_empty() {
928 return Err(AppError::Internal(
929 "auth.issuer must not be empty when auth.enabled = true".into(),
930 ));
931 }
932 if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
933 return Err(AppError::Internal(format!(
934 "auth.issuer must be an absolute http(s) URL (got '{}')",
935 a.issuer
936 )));
937 }
938 for alg in &a.algorithms {
939 match alg.as_str() {
940 "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
941 | "PS512" => {}
942 other => {
943 return Err(AppError::Internal(format!(
944 "auth.algorithms[{other}] is not allowed; pick one of \
945 RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
946 )));
947 }
948 }
949 }
950 if a.algorithms.is_empty() {
951 return Err(AppError::Internal(
952 "auth.algorithms must not be empty".into(),
953 ));
954 }
955 if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
956 return Err(AppError::Internal(format!(
957 "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
958 a.tenant_claim
959 )));
960 }
961 if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
962 return Err(AppError::Internal(
963 "auth.allowed_tenants is set but auth.tenant_claim is empty — \
964 can't enforce a tenant allow-list without a claim to extract from"
965 .into(),
966 ));
967 }
968 }
969
970 let mut seen = HashSet::new();
971 for d in &self.datasets {
972 if !seen.insert(d.name.as_str()) {
973 return Err(AppError::Internal(format!(
974 "duplicate dataset name: {}",
975 d.name
976 )));
977 }
978 if d.name.is_empty() {
979 return Err(AppError::Internal("dataset name must not be empty".into()));
980 }
981 if !d
983 .name
984 .chars()
985 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
986 {
987 return Err(AppError::Internal(format!(
988 "dataset name '{}' must be alphanumeric (plus _ - .)",
989 d.name
990 )));
991 }
992
993 if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
994 return Err(AppError::Internal(format!(
995 "dataset '{}': index.mode = 'list' requires non-empty index.columns",
996 d.name
997 )));
998 }
999
1000 if d.source.is_s3() {
1002 d.source.s3_bucket()?;
1003 if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
1004 && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
1005 && std::env::var("AWS_REGION").is_err()
1006 && std::env::var("AWS_DEFAULT_REGION").is_err()
1007 {
1008 log::warn!(
1009 "dataset '{}': S3 source without explicit region — \
1010 relying on AWS_REGION env var",
1011 d.name
1012 );
1013 }
1014 } else {
1015 match d.source.kind {
1019 SourceKind::Parquet => {
1020 d.resolve_local_parquet_files()?;
1021 }
1022 SourceKind::Delta => {
1023 let p = Path::new(&d.source.location);
1024 if !p.exists() {
1025 return Err(AppError::Internal(format!(
1026 "dataset '{}': delta location does not exist: {}",
1027 d.name, d.source.location
1028 )));
1029 }
1030 }
1031 }
1032 }
1033 }
1034 Ok(())
1035 }
1036}
1037
1038impl SourceConfig {
1039 pub fn is_s3(&self) -> bool {
1040 self.location.starts_with("s3://")
1041 }
1042
1043 pub fn has_glob(&self) -> bool {
1046 self.location.contains('*') || self.location.contains('?') || self.location.contains('[')
1047 }
1048
1049 pub fn s3_recursive_parquet_glob(&self) -> String {
1055 if !self.is_s3() || self.has_glob() {
1056 return self.location.clone();
1057 }
1058 let trimmed = self.location.trim_end_matches('/');
1059 format!("{trimmed}/**/*.parquet")
1060 }
1061
1062 pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
1064 let rest = self
1065 .location
1066 .strip_prefix("s3://")
1067 .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
1068 let (bucket, key) = match rest.split_once('/') {
1069 Some((b, k)) => (b, k),
1070 None => (rest, ""),
1071 };
1072 if bucket.is_empty() {
1073 return Err(AppError::Internal(format!(
1074 "s3 URL missing bucket: {}",
1075 self.location
1076 )));
1077 }
1078 Ok((bucket, key))
1079 }
1080}
1081
1082impl DatasetConfig {
1083 pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
1093 if self.source.is_s3() {
1094 return Err(AppError::Internal(format!(
1095 "dataset '{}': resolve_local_parquet_files called on s3 source",
1096 self.name
1097 )));
1098 }
1099 let loc = &self.source.location;
1100
1101 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1103 let mut files: Vec<PathBuf> = glob::glob(loc)
1104 .map_err(|e| {
1105 AppError::Internal(format!(
1106 "dataset '{}': bad glob pattern '{loc}': {e}",
1107 self.name
1108 ))
1109 })?
1110 .filter_map(|r| r.ok())
1111 .filter(|p| {
1112 p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
1113 })
1114 .collect();
1115 files.sort();
1116 if files.is_empty() {
1117 return Err(AppError::Internal(format!(
1118 "dataset '{}': glob '{loc}' matched no .parquet files",
1119 self.name
1120 )));
1121 }
1122 return Ok(files);
1123 }
1124
1125 let path = Path::new(loc);
1126 if !path.exists() {
1127 return Err(AppError::Internal(format!(
1128 "dataset '{}': source path does not exist: {loc}",
1129 self.name
1130 )));
1131 }
1132
1133 if path.is_file() {
1134 if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
1135 return Err(AppError::Internal(format!(
1136 "dataset '{}': source must be a .parquet file",
1137 self.name
1138 )));
1139 }
1140 return Ok(vec![path.to_path_buf()]);
1141 }
1142
1143 let mut files: Vec<PathBuf> = std::fs::read_dir(path)
1144 .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
1145 .filter_map(|entry| entry.ok().map(|e| e.path()))
1146 .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
1147 .collect();
1148 files.sort();
1149 if files.is_empty() {
1150 return Err(AppError::Internal(format!(
1151 "dataset '{}': no *.parquet files found in {loc}",
1152 self.name
1153 )));
1154 }
1155 Ok(files)
1156 }
1157
1158 pub fn env_prefix(&self) -> String {
1162 self.name
1163 .chars()
1164 .map(|c| {
1165 if c.is_ascii_alphanumeric() {
1166 c.to_ascii_uppercase()
1167 } else {
1168 '_'
1169 }
1170 })
1171 .collect()
1172 }
1173
1174 pub fn resolved_creds(&self) -> ResolvedCreds {
1179 let prefix = self.env_prefix();
1180 let from_env = |suffix: &str| {
1181 std::env::var(format!("{prefix}_{suffix}"))
1182 .ok()
1183 .filter(|s| !s.is_empty())
1184 };
1185 let inline = self.s3.as_ref();
1186 let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
1187
1188 ResolvedCreds {
1189 access_key_id: from_env("AWS_ACCESS_KEY_ID")
1190 .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
1191 .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
1192 secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
1193 .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1194 .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1195 session_token: from_env("AWS_SESSION_TOKEN")
1196 .or_else(|| inline.and_then(|s| s.session_token.clone()))
1197 .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1198 }
1199 }
1200
1201 pub fn resolved_region(&self) -> String {
1204 let prefix = self.env_prefix();
1205 std::env::var(format!("{prefix}_AWS_REGION"))
1206 .ok()
1207 .filter(|s| !s.is_empty())
1208 .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1209 .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1210 .or_else(|| {
1211 std::env::var("AWS_DEFAULT_REGION")
1212 .ok()
1213 .filter(|s| !s.is_empty())
1214 })
1215 .unwrap_or_else(|| "us-east-1".to_string())
1216 }
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221 use super::*;
1222
1223 #[test]
1224 fn server_defaults() {
1225 let s = ServerConfig::default();
1226 assert_eq!(s.backend, Backend::Datafusion);
1227 assert_eq!(s.port, 8080);
1228 assert!(s.compress);
1229 assert_eq!(s.max_body_bytes, 1024 * 1024);
1230 assert_eq!(s.max_page_size, 100_000);
1231 assert_eq!(s.request_timeout_ms, 30_000);
1232 assert!(!s.quack.enabled);
1233 assert_eq!(s.quack.uri, "quack:localhost");
1234 assert!(s.quack.token.is_none());
1235 assert!(!s.quack.allow_other_hostname);
1236 assert!(s.quack.read_only);
1237 assert_eq!(s.prefix, "");
1238 assert!(s.listen.is_loopback());
1239 }
1240
1241 #[test]
1242 fn server_overrides_from_toml() {
1243 let toml = r#"
1244 [server]
1245 backend = "duckdb"
1246 port = 9000
1247 prefix = "/datapress"
1248 compress = false
1249 max_body_bytes = 4096
1250 max_page_size = 50000
1251 request_timeout_ms = 0
1252
1253 [server.quack]
1254 enabled = true
1255 uri = "quack:localhost:9495"
1256 token = "test-token"
1257 read_only = false
1258 [[dataset]]
1259 name = "x"
1260 source.kind = "parquet"
1261 source.location = "/tmp/missing.parquet"
1262 "#;
1263 let cfg: AppConfig = toml::from_str(toml).unwrap();
1264 assert_eq!(cfg.server.backend, Backend::Duckdb);
1265 assert_eq!(cfg.server.port, 9000);
1266 assert_eq!(cfg.server.prefix, "/datapress");
1267 assert!(!cfg.server.compress);
1268 assert_eq!(cfg.server.max_body_bytes, 4096);
1269 assert_eq!(cfg.server.max_page_size, 50_000);
1270 assert_eq!(cfg.server.request_timeout_ms, 0);
1271 assert!(cfg.server.quack.enabled);
1272 assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1273 assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1274 assert!(!cfg.server.quack.read_only);
1275 assert_eq!(cfg.datasets.len(), 1);
1276 assert_eq!(cfg.datasets[0].name, "x");
1277 assert!(cfg.datasets[0].dict_encode); }
1279
1280 #[test]
1281 fn validate_rejects_bad_prefix() {
1282 let bad = ["no-leading-slash", "/trailing/"];
1283 for p in bad {
1284 let cfg = AppConfig {
1285 server: ServerConfig {
1286 prefix: p.to_string(),
1287 ..Default::default()
1288 },
1289 docs: DocsConfig::default(),
1290 swagger: SwaggerConfig::default(),
1291 metrics: MetricsConfig::default(),
1292 explorer: ExplorerConfig::default(),
1293 auth: AuthConfig::default(),
1294 datasets: vec![],
1295 };
1296 assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1297 }
1298 }
1299
1300 #[test]
1301 fn normalize_lowercases_configured_scopes() {
1302 let mut cfg = AppConfig {
1303 server: ServerConfig::default(),
1304 docs: DocsConfig::default(),
1305 swagger: SwaggerConfig::default(),
1306 metrics: MetricsConfig::default(),
1307 explorer: ExplorerConfig::default(),
1308 auth: AuthConfig {
1309 read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1310 reload_scopes: vec!["Datasets:Reload".into()],
1311 ..Default::default()
1312 },
1313 datasets: vec![],
1314 };
1315 cfg.normalize();
1316 assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1317 assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1318 }
1319
1320 #[test]
1321 fn validate_rejects_no_datasets() {
1322 let cfg = AppConfig {
1323 server: ServerConfig::default(),
1324 docs: DocsConfig::default(),
1325 swagger: SwaggerConfig::default(),
1326 metrics: MetricsConfig::default(),
1327 explorer: ExplorerConfig::default(),
1328 auth: AuthConfig::default(),
1329 datasets: vec![],
1330 };
1331 let err = cfg.validate().unwrap_err();
1332 assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1333 }
1334
1335 #[cfg(feature = "auth")]
1336 #[test]
1337 fn validate_accepts_auth_issuer_with_trailing_slash() {
1338 use std::io::Write;
1339
1340 let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1341 let _ = std::fs::remove_dir_all(&dir);
1342 std::fs::create_dir_all(&dir).unwrap();
1343 let file = dir.join("a.parquet");
1344 std::fs::File::create(&file)
1345 .unwrap()
1346 .write_all(b"x")
1347 .unwrap();
1348
1349 let cfg = AppConfig {
1350 server: ServerConfig::default(),
1351 docs: DocsConfig::default(),
1352 swagger: SwaggerConfig::default(),
1353 metrics: MetricsConfig::default(),
1354 explorer: ExplorerConfig::default(),
1355 auth: AuthConfig {
1356 enabled: true,
1357 issuer: "https://tenant.example.com/".into(),
1358 ..Default::default()
1359 },
1360 datasets: vec![DatasetConfig {
1361 name: "x".into(),
1362 source: SourceConfig {
1363 kind: SourceKind::Parquet,
1364 location: file.to_string_lossy().into_owned(),
1365 },
1366 s3: None,
1367 index: IndexConfig::default(),
1368 columns: vec![],
1369 dict_encode: true,
1370 lazy: false,
1371 }],
1372 };
1373
1374 assert!(cfg.validate().is_ok());
1375 let _ = std::fs::remove_dir_all(&dir);
1376 }
1377
1378 #[test]
1379 fn validate_rejects_quack_non_local_host_without_override() {
1380 let cfg = AppConfig {
1381 server: ServerConfig {
1382 quack: QuackConfig {
1383 enabled: true,
1384 uri: "quack:127.0.0.1".into(),
1385 token: Some("test-token".into()),
1386 ..Default::default()
1387 },
1388 ..Default::default()
1389 },
1390 docs: DocsConfig::default(),
1391 swagger: SwaggerConfig::default(),
1392 metrics: MetricsConfig::default(),
1393 explorer: ExplorerConfig::default(),
1394 auth: AuthConfig::default(),
1395 datasets: vec![DatasetConfig {
1396 name: "x".into(),
1397 source: SourceConfig {
1398 kind: SourceKind::Parquet,
1399 location: "/tmp/missing.parquet".into(),
1400 },
1401 s3: None,
1402 index: IndexConfig::default(),
1403 columns: vec![],
1404 dict_encode: true,
1405 lazy: false,
1406 }],
1407 };
1408 let err = cfg.validate().unwrap_err();
1409 assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1410 }
1411
1412 #[test]
1413 fn validate_rejects_bad_dataset_name() {
1414 let cfg: AppConfig = toml::from_str(
1415 r#"
1416 [[dataset]]
1417 name = "bad name!"
1418 source.kind = "parquet"
1419 source.location = "/tmp/whatever"
1420 "#,
1421 )
1422 .unwrap();
1423 let err = cfg.validate().unwrap_err();
1424 assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1425 }
1426
1427 #[test]
1428 fn validate_rejects_duplicate_names() {
1429 use std::io::Write;
1430 let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1431 let _ = std::fs::remove_dir_all(&dir);
1432 std::fs::create_dir_all(&dir).unwrap();
1433 let f = dir.join("a.parquet");
1434 std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1435 let path = f.to_str().unwrap();
1436
1437 let cfg: AppConfig = toml::from_str(&format!(
1438 r#"
1439 [[dataset]]
1440 name = "a"
1441 source.kind = "parquet"
1442 source.location = "{path}"
1443 [[dataset]]
1444 name = "a"
1445 source.kind = "parquet"
1446 source.location = "{path}"
1447 "#
1448 ))
1449 .unwrap();
1450 let err = cfg.validate().expect_err("expected error");
1451 assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1452
1453 let _ = std::fs::remove_dir_all(&dir);
1454 }
1455
1456 #[test]
1457 fn s3_bucket_parsing() {
1458 let mk = |loc: &str| SourceConfig {
1459 kind: SourceKind::Parquet,
1460 location: loc.into(),
1461 };
1462 let s1 = mk("s3://bucket/path/key");
1463 assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1464 let s2 = mk("s3://only-bucket");
1465 assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1466 assert!(mk("s3:///nokey").s3_bucket().is_err());
1467 assert!(mk("/local/path").s3_bucket().is_err());
1468 }
1469
1470 #[test]
1471 fn s3_recursive_parquet_glob_only_expands_plain_prefixes() {
1472 let mk = |loc: &str| SourceConfig {
1473 kind: SourceKind::Parquet,
1474 location: loc.into(),
1475 };
1476 assert_eq!(
1478 mk("s3://bucket/logs/").s3_recursive_parquet_glob(),
1479 "s3://bucket/logs/**/*.parquet"
1480 );
1481 assert_eq!(
1482 mk("s3://bucket/logs").s3_recursive_parquet_glob(),
1483 "s3://bucket/logs/**/*.parquet"
1484 );
1485 assert_eq!(
1487 mk("s3://bucket/logs/*.parquet").s3_recursive_parquet_glob(),
1488 "s3://bucket/logs/*.parquet"
1489 );
1490 assert_eq!(
1492 mk("/local/logs").s3_recursive_parquet_glob(),
1493 "/local/logs"
1494 );
1495 }
1496
1497 #[test]
1498 fn effective_endpoint_folds_bucket_per_mode() {
1499 let virt = S3Config {
1500 endpoint: Some("https://s3.example.com".into()),
1501 addressing_style: AddressingStyle::Virtual,
1502 ..Default::default()
1503 };
1504 assert_eq!(
1506 virt.effective_endpoint("mybucket").as_deref(),
1507 Some("https://mybucket.s3.example.com")
1508 );
1509 let prefixed = S3Config {
1511 endpoint: Some("https://mybucket.s3.example.com".into()),
1512 ..virt.clone()
1513 };
1514 assert_eq!(
1515 prefixed.effective_endpoint("mybucket").as_deref(),
1516 Some("https://mybucket.s3.example.com")
1517 );
1518 let path = S3Config {
1520 addressing_style: AddressingStyle::Path,
1521 ..virt.clone()
1522 };
1523 assert_eq!(
1524 path.effective_endpoint("mybucket").as_deref(),
1525 Some("https://s3.example.com")
1526 );
1527 let forced_off = S3Config {
1529 endpoint_bucket_in_host: BucketInHost::False,
1530 ..virt.clone()
1531 };
1532 assert_eq!(
1533 forced_off.effective_endpoint("mybucket").as_deref(),
1534 Some("https://s3.example.com")
1535 );
1536 let forced_on = S3Config {
1537 endpoint_bucket_in_host: BucketInHost::True,
1538 ..path.clone()
1539 };
1540 assert_eq!(
1541 forced_on.effective_endpoint("mybucket").as_deref(),
1542 Some("https://mybucket.s3.example.com")
1543 );
1544 assert_eq!(S3Config::default().effective_endpoint("mybucket"), None);
1546 }
1547
1548 #[test]
1549 fn env_prefix_sanitises_name() {
1550 let mk = |name: &str| DatasetConfig {
1551 name: name.into(),
1552 source: SourceConfig {
1553 kind: SourceKind::Parquet,
1554 location: "x".into(),
1555 },
1556 s3: None,
1557 index: IndexConfig::default(),
1558 columns: vec![],
1559 dict_encode: true,
1560 lazy: false,
1561 };
1562 assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1563 assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1564 assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1565 }
1566
1567 #[test]
1568 fn resolve_local_parquet_single_file_and_dir() {
1569 use std::io::Write;
1570 let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1571 let _ = std::fs::remove_dir_all(&dir);
1572 std::fs::create_dir_all(&dir).unwrap();
1573 let f = dir.join("a.parquet");
1574 let mut fh = std::fs::File::create(&f).unwrap();
1575 fh.write_all(b"not really parquet").unwrap();
1576
1577 let mk = |loc: &str| DatasetConfig {
1578 name: "ds".into(),
1579 source: SourceConfig {
1580 kind: SourceKind::Parquet,
1581 location: loc.into(),
1582 },
1583 s3: None,
1584 index: IndexConfig::default(),
1585 columns: vec![],
1586 dict_encode: true,
1587 lazy: false,
1588 };
1589
1590 let files = mk(f.to_str().unwrap())
1592 .resolve_local_parquet_files()
1593 .unwrap();
1594 assert_eq!(files, vec![f.clone()]);
1595
1596 let files = mk(dir.to_str().unwrap())
1598 .resolve_local_parquet_files()
1599 .unwrap();
1600 assert_eq!(files, vec![f.clone()]);
1601
1602 assert!(
1604 mk("/no/such/place.parquet")
1605 .resolve_local_parquet_files()
1606 .is_err()
1607 );
1608
1609 let _ = std::fs::remove_dir_all(&dir);
1610 }
1611}