1use std::collections::HashSet;
25use std::net::IpAddr;
26use std::path::{Path, PathBuf};
27
28use serde::Deserialize;
29
30use crate::errors::AppError;
31
32const RESERVED_MOUNTS: &[&str] = &[
36 "/", "/api", "/api/v1", "/health", "/healthz", "/readyz", "/version", "/metrics",
37];
38
39#[derive(Debug, Deserialize)]
44pub struct AppConfig {
45 #[serde(default)]
46 pub server: ServerConfig,
47 #[serde(default)]
48 pub docs: DocsConfig,
49 #[serde(default)]
50 pub swagger: SwaggerConfig,
51 #[serde(default)]
52 pub metrics: MetricsConfig,
53 #[serde(default)]
54 pub auth: AuthConfig,
55 #[serde(rename = "dataset", default)]
56 pub datasets: Vec<DatasetConfig>,
57}
58
59#[derive(Debug, Deserialize)]
60#[serde(default)]
61pub struct ServerConfig {
62 pub backend: Backend,
64 pub listen: IpAddr,
67 pub port: u16,
69 pub workers: Option<usize>,
71 pub prefix: String,
77 pub compress: bool,
82 pub max_body_bytes: usize,
87 pub max_page_size: u64,
91 pub request_timeout_ms: u64,
95 pub shutdown_timeout_secs: u64,
100 pub quack: QuackConfig,
103}
104
105impl Default for ServerConfig {
106 fn default() -> Self {
107 Self {
108 backend: Backend::default(),
109 listen: IpAddr::from([127, 0, 0, 1]),
110 port: 8080,
111 workers: None,
112 prefix: String::new(),
113 compress: true,
114 max_body_bytes: 1024 * 1024,
115 max_page_size: 100_000,
116 request_timeout_ms: 30_000,
117 shutdown_timeout_secs: 30,
118 quack: QuackConfig::default(),
119 }
120 }
121}
122
123#[derive(Debug, Clone, Deserialize)]
129#[serde(default)]
130pub struct QuackConfig {
131 pub enabled: bool,
134 pub uri: String,
137 pub token: Option<String>,
140 pub allow_other_hostname: bool,
143 pub read_only: bool,
146}
147
148impl Default for QuackConfig {
149 fn default() -> Self {
150 Self {
151 enabled: false,
152 uri: "quack:localhost".into(),
153 token: None,
154 allow_other_hostname: false,
155 read_only: true,
156 }
157 }
158}
159
160impl QuackConfig {
161 pub fn validate_enabled(&self) -> Result<(), AppError> {
165 if self.uri.trim().is_empty() {
166 return Err(AppError::Internal(
167 "server.quack.uri must not be empty when server.quack.enabled = true".into(),
168 ));
169 }
170 if !self.uri.starts_with("quack:") {
171 return Err(AppError::Internal(format!(
172 "server.quack.uri must start with 'quack:' (got '{}')",
173 self.uri
174 )));
175 }
176 if !self.allow_other_hostname {
177 let host = self.hostname().unwrap_or_default();
178 if host != "localhost" {
179 return Err(AppError::Internal(format!(
180 "server.quack.uri host must be 'localhost' unless \
181 server.quack.allow_other_hostname = true (got '{}')",
182 self.uri
183 )));
184 }
185 }
186 if let Some(token) = self.token.as_deref()
187 && token.len() < 4
188 {
189 return Err(AppError::Internal(
190 "server.quack.token must be at least 4 characters".into(),
191 ));
192 }
193 Ok(())
194 }
195
196 fn hostname(&self) -> Option<&str> {
197 let rest = self.uri.strip_prefix("quack:")?;
198 let rest = rest.strip_prefix("//").unwrap_or(rest);
199 let host = rest.split([':', '/', '?', '#']).next().unwrap_or_default();
200 (!host.is_empty()).then_some(host)
201 }
202}
203
204#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
205#[serde(rename_all = "lowercase")]
206pub enum Backend {
207 #[default]
208 Datafusion,
209 Duckdb,
210}
211
212#[derive(Debug, Clone, Deserialize)]
222#[serde(default, deny_unknown_fields)]
223pub struct DocsConfig {
224 pub enabled: bool,
225 pub path: String,
226}
227
228impl Default for DocsConfig {
229 fn default() -> Self {
230 Self {
231 enabled: true,
232 path: "/mkdocs".into(),
233 }
234 }
235}
236
237#[derive(Debug, Clone, Deserialize)]
254#[serde(default, deny_unknown_fields)]
255pub struct SwaggerConfig {
256 pub enabled: bool,
257 pub path: String,
258 pub oauth2: Option<SwaggerOAuth2Config>,
259}
260
261impl Default for SwaggerConfig {
262 fn default() -> Self {
263 Self {
264 enabled: true,
265 path: "/docs".into(),
266 oauth2: None,
267 }
268 }
269}
270
271#[derive(Debug, Clone, Deserialize)]
281#[serde(deny_unknown_fields)]
282pub struct SwaggerOAuth2Config {
283 pub issuer: String,
287 pub client_id: String,
292 #[serde(default)]
296 pub scopes: Vec<String>,
297 #[serde(default = "default_true")]
300 pub pkce: bool,
301}
302
303#[derive(Debug, Clone, Deserialize)]
323#[serde(default, deny_unknown_fields)]
324pub struct MetricsConfig {
325 pub enabled: bool,
326 pub path: String,
327}
328
329impl Default for MetricsConfig {
330 fn default() -> Self {
331 Self {
332 enabled: false,
333 path: "/metrics".into(),
334 }
335 }
336}
337
338#[derive(Debug, Clone, Deserialize)]
355#[serde(default, deny_unknown_fields)]
356pub struct AuthConfig {
357 pub enabled: bool,
359 pub issuer: String,
362 pub audience: String,
365 pub read_scopes: Vec<String>,
369 pub reload_scopes: Vec<String>,
372 pub anonymous_read: bool,
375 pub start_degraded: bool,
380 pub algorithms: Vec<String>,
384 pub leeway_secs: u64,
386 pub jwks_refresh_secs: u64,
390 pub tenant_claim: String,
395 pub allowed_tenants: Vec<String>,
399 pub admin_token_fallback: bool,
404}
405
406impl Default for AuthConfig {
407 fn default() -> Self {
408 Self {
409 enabled: false,
410 issuer: String::new(),
411 audience: String::new(),
412 read_scopes: Vec::new(),
413 reload_scopes: Vec::new(),
414 anonymous_read: false,
415 start_degraded: true,
416 algorithms: vec!["RS256".into()],
417 leeway_secs: 60,
418 jwks_refresh_secs: 3600,
419 tenant_claim: String::new(),
420 allowed_tenants: Vec::new(),
421 admin_token_fallback: true,
422 }
423 }
424}
425
426impl Backend {
427 pub fn as_str(self) -> &'static str {
428 match self {
429 Backend::Datafusion => "datafusion",
430 Backend::Duckdb => "duckdb",
431 }
432 }
433}
434
435#[derive(Debug, Clone, Deserialize)]
436pub struct DatasetConfig {
437 pub name: String,
438 pub source: SourceConfig,
439 #[serde(default)]
440 pub s3: Option<S3Config>,
441 #[serde(default)]
442 pub index: IndexConfig,
443 #[serde(default)]
449 pub columns: Vec<String>,
450 #[serde(default = "default_true")]
457 pub dict_encode: bool,
458 #[serde(default)]
464 pub lazy: bool,
465}
466
467fn default_true() -> bool {
468 true
469}
470
471#[derive(Debug, Clone, Deserialize)]
472pub struct SourceConfig {
473 pub kind: SourceKind,
474 pub location: String,
476}
477
478#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
479#[serde(rename_all = "lowercase")]
480pub enum SourceKind {
481 #[default]
482 Parquet,
483 Delta,
484}
485
486impl SourceKind {
487 pub fn as_str(self) -> &'static str {
488 match self {
489 SourceKind::Parquet => "parquet",
490 SourceKind::Delta => "delta",
491 }
492 }
493}
494
495#[derive(Debug, Clone, Deserialize)]
498#[serde(default)]
499pub struct S3Config {
500 pub region: Option<String>,
501 pub endpoint: Option<String>,
503 pub addressing_style: AddressingStyle,
506 pub allow_http: bool,
508 pub access_key_id: Option<String>,
511 pub secret_access_key: Option<String>,
512 pub session_token: Option<String>,
513}
514
515impl Default for S3Config {
516 fn default() -> Self {
517 Self {
518 region: None,
519 endpoint: None,
520 addressing_style: AddressingStyle::Virtual,
521 allow_http: false,
522 access_key_id: None,
523 secret_access_key: None,
524 session_token: None,
525 }
526 }
527}
528
529#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
530#[serde(rename_all = "lowercase")]
531pub enum AddressingStyle {
532 #[default]
533 Virtual,
534 Path,
535}
536
537impl AddressingStyle {
538 pub fn as_str(self) -> &'static str {
539 match self {
540 AddressingStyle::Virtual => "virtual",
541 AddressingStyle::Path => "path",
542 }
543 }
544}
545
546#[derive(Debug, Clone, Deserialize)]
547#[serde(default)]
548pub struct IndexConfig {
549 pub mode: IndexMode,
550 pub columns: Vec<String>,
551 pub max_cardinality: usize,
552}
553
554impl Default for IndexConfig {
555 fn default() -> Self {
556 Self {
557 mode: IndexMode::Auto,
558 columns: Vec::new(),
559 max_cardinality: 100_000,
560 }
561 }
562}
563
564#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Deserialize)]
565#[serde(rename_all = "lowercase")]
566pub enum IndexMode {
567 #[default]
568 Auto,
569 None,
570 List,
571}
572
573#[derive(Debug, Clone, Default)]
576pub struct ResolvedCreds {
577 pub access_key_id: Option<String>,
578 pub secret_access_key: Option<String>,
579 pub session_token: Option<String>,
580}
581
582impl ResolvedCreds {
583 pub fn has_keypair(&self) -> bool {
584 self.access_key_id.is_some() && self.secret_access_key.is_some()
585 }
586}
587
588impl AppConfig {
593 pub fn load(path: &str) -> Result<Self, AppError> {
595 let raw = std::fs::read_to_string(path)
596 .map_err(|e| AppError::Internal(format!("failed to read {path}: {e}")))?;
597 let mut cfg: AppConfig =
598 toml::from_str(&raw).map_err(|e| AppError::Internal(format!("invalid {path}: {e}")))?;
599 cfg.normalize();
600 cfg.validate()?;
601 Ok(cfg)
602 }
603
604 fn normalize(&mut self) {
612 for s in self
613 .auth
614 .read_scopes
615 .iter_mut()
616 .chain(self.auth.reload_scopes.iter_mut())
617 {
618 *s = s.to_ascii_lowercase();
619 }
620 }
621
622 fn validate(&self) -> Result<(), AppError> {
623 let p = &self.server.prefix;
625 if !p.is_empty() {
626 if !p.starts_with('/') {
627 return Err(AppError::Internal(format!(
628 "server.prefix must start with '/' (got '{p}')"
629 )));
630 }
631 if p.ends_with('/') {
632 return Err(AppError::Internal(format!(
633 "server.prefix must not end with '/' (got '{p}')"
634 )));
635 }
636 }
637
638 if self.datasets.is_empty() {
639 return Err(AppError::Internal(
640 "datasets.toml has no [[dataset]] entries".into(),
641 ));
642 }
643
644 if self.server.quack.enabled {
645 self.server.quack.validate_enabled()?;
646 }
647
648 {
651 let dp = &self.docs.path;
652 if !dp.starts_with('/') {
653 return Err(AppError::Internal(format!(
654 "docs.path must start with '/' (got '{dp}')"
655 )));
656 }
657 if dp.len() > 1 && dp.ends_with('/') {
658 return Err(AppError::Internal(format!(
659 "docs.path must not end with '/' (got '{dp}')"
660 )));
661 }
662 if RESERVED_MOUNTS.iter().any(|r| *r == dp) {
663 return Err(AppError::Internal(format!(
664 "docs.path '{dp}' collides with a reserved route"
665 )));
666 }
667 }
668
669 {
671 let sp = &self.swagger.path;
672 if !sp.starts_with('/') {
673 return Err(AppError::Internal(format!(
674 "swagger.path must start with '/' (got '{sp}')"
675 )));
676 }
677 if sp.len() > 1 && sp.ends_with('/') {
678 return Err(AppError::Internal(format!(
679 "swagger.path must not end with '/' (got '{sp}')"
680 )));
681 }
682 if RESERVED_MOUNTS.iter().any(|r| *r == sp) {
683 return Err(AppError::Internal(format!(
684 "swagger.path '{sp}' collides with a reserved route"
685 )));
686 }
687 if sp == &self.docs.path {
688 return Err(AppError::Internal(format!(
689 "swagger.path and docs.path must differ (both '{sp}')"
690 )));
691 }
692 if let Some(o) = &self.swagger.oauth2 {
693 if o.issuer.trim().is_empty() {
694 return Err(AppError::Internal(
695 "swagger.oauth2.issuer must not be empty".into(),
696 ));
697 }
698 if !(o.issuer.starts_with("https://") || o.issuer.starts_with("http://")) {
699 return Err(AppError::Internal(format!(
700 "swagger.oauth2.issuer must be an absolute http(s) URL (got '{}')",
701 o.issuer
702 )));
703 }
704 if o.client_id.trim().is_empty() {
705 return Err(AppError::Internal(
706 "swagger.oauth2.client_id must not be empty".into(),
707 ));
708 }
709 }
710 }
711
712 {
718 let mp = &self.metrics.path;
719 if !mp.starts_with('/') {
720 return Err(AppError::Internal(format!(
721 "metrics.path must start with '/' (got '{mp}')"
722 )));
723 }
724 if mp.len() > 1 && mp.ends_with('/') {
725 return Err(AppError::Internal(format!(
726 "metrics.path must not end with '/' (got '{mp}')"
727 )));
728 }
729 if RESERVED_MOUNTS.iter().any(|r| *r == mp && *r != "/metrics") {
730 return Err(AppError::Internal(format!(
731 "metrics.path '{mp}' collides with a reserved route"
732 )));
733 }
734 if mp == &self.docs.path {
735 return Err(AppError::Internal(format!(
736 "metrics.path and docs.path must differ (both '{mp}')"
737 )));
738 }
739 if mp == &self.swagger.path {
740 return Err(AppError::Internal(format!(
741 "metrics.path and swagger.path must differ (both '{mp}')"
742 )));
743 }
744 }
745
746 if self.auth.enabled {
751 let a = &self.auth;
752 if a.issuer.trim().is_empty() {
753 return Err(AppError::Internal(
754 "auth.issuer must not be empty when auth.enabled = true".into(),
755 ));
756 }
757 if !(a.issuer.starts_with("https://") || a.issuer.starts_with("http://")) {
758 return Err(AppError::Internal(format!(
759 "auth.issuer must be an absolute http(s) URL (got '{}')",
760 a.issuer
761 )));
762 }
763 for alg in &a.algorithms {
764 match alg.as_str() {
765 "RS256" | "RS384" | "RS512" | "ES256" | "ES384" | "PS256" | "PS384"
766 | "PS512" => {}
767 other => {
768 return Err(AppError::Internal(format!(
769 "auth.algorithms[{other}] is not allowed; pick one of \
770 RS256/RS384/RS512, ES256/ES384, PS256/PS384/PS512"
771 )));
772 }
773 }
774 }
775 if a.algorithms.is_empty() {
776 return Err(AppError::Internal(
777 "auth.algorithms must not be empty".into(),
778 ));
779 }
780 if !a.tenant_claim.is_empty() && !a.tenant_claim.starts_with('/') {
781 return Err(AppError::Internal(format!(
782 "auth.tenant_claim must be a JSON pointer starting with '/' (got '{}')",
783 a.tenant_claim
784 )));
785 }
786 if !a.allowed_tenants.is_empty() && a.tenant_claim.is_empty() {
787 return Err(AppError::Internal(
788 "auth.allowed_tenants is set but auth.tenant_claim is empty — \
789 can't enforce a tenant allow-list without a claim to extract from"
790 .into(),
791 ));
792 }
793 }
794
795 let mut seen = HashSet::new();
796 for d in &self.datasets {
797 if !seen.insert(d.name.as_str()) {
798 return Err(AppError::Internal(format!(
799 "duplicate dataset name: {}",
800 d.name
801 )));
802 }
803 if d.name.is_empty() {
804 return Err(AppError::Internal("dataset name must not be empty".into()));
805 }
806 if !d
808 .name
809 .chars()
810 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
811 {
812 return Err(AppError::Internal(format!(
813 "dataset name '{}' must be alphanumeric (plus _ - .)",
814 d.name
815 )));
816 }
817
818 if d.index.mode == IndexMode::List && d.index.columns.is_empty() {
819 return Err(AppError::Internal(format!(
820 "dataset '{}': index.mode = 'list' requires non-empty index.columns",
821 d.name
822 )));
823 }
824
825 if d.source.is_s3() {
827 d.source.s3_bucket()?;
828 if d.s3.as_ref().and_then(|s| s.region.as_deref()).is_none()
829 && d.s3.as_ref().and_then(|s| s.endpoint.as_deref()).is_none()
830 && std::env::var("AWS_REGION").is_err()
831 && std::env::var("AWS_DEFAULT_REGION").is_err()
832 {
833 log::warn!(
834 "dataset '{}': S3 source without explicit region — \
835 relying on AWS_REGION env var",
836 d.name
837 );
838 }
839 } else {
840 match d.source.kind {
844 SourceKind::Parquet => {
845 d.resolve_local_parquet_files()?;
846 }
847 SourceKind::Delta => {
848 let p = Path::new(&d.source.location);
849 if !p.exists() {
850 return Err(AppError::Internal(format!(
851 "dataset '{}': delta location does not exist: {}",
852 d.name, d.source.location
853 )));
854 }
855 }
856 }
857 }
858 }
859 Ok(())
860 }
861}
862
863impl SourceConfig {
864 pub fn is_s3(&self) -> bool {
865 self.location.starts_with("s3://")
866 }
867
868 pub fn s3_bucket(&self) -> Result<(&str, &str), AppError> {
870 let rest = self
871 .location
872 .strip_prefix("s3://")
873 .ok_or_else(|| AppError::Internal(format!("not an s3:// URL: {}", self.location)))?;
874 let (bucket, key) = match rest.split_once('/') {
875 Some((b, k)) => (b, k),
876 None => (rest, ""),
877 };
878 if bucket.is_empty() {
879 return Err(AppError::Internal(format!(
880 "s3 URL missing bucket: {}",
881 self.location
882 )));
883 }
884 Ok((bucket, key))
885 }
886}
887
888impl DatasetConfig {
889 pub fn resolve_local_parquet_files(&self) -> Result<Vec<PathBuf>, AppError> {
899 if self.source.is_s3() {
900 return Err(AppError::Internal(format!(
901 "dataset '{}': resolve_local_parquet_files called on s3 source",
902 self.name
903 )));
904 }
905 let loc = &self.source.location;
906
907 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
909 let mut files: Vec<PathBuf> = glob::glob(loc)
910 .map_err(|e| {
911 AppError::Internal(format!(
912 "dataset '{}': bad glob pattern '{loc}': {e}",
913 self.name
914 ))
915 })?
916 .filter_map(|r| r.ok())
917 .filter(|p| {
918 p.is_file() && p.extension().and_then(|e| e.to_str()) == Some("parquet")
919 })
920 .collect();
921 files.sort();
922 if files.is_empty() {
923 return Err(AppError::Internal(format!(
924 "dataset '{}': glob '{loc}' matched no .parquet files",
925 self.name
926 )));
927 }
928 return Ok(files);
929 }
930
931 let path = Path::new(loc);
932 if !path.exists() {
933 return Err(AppError::Internal(format!(
934 "dataset '{}': source path does not exist: {loc}",
935 self.name
936 )));
937 }
938
939 if path.is_file() {
940 if path.extension().and_then(|e| e.to_str()) != Some("parquet") {
941 return Err(AppError::Internal(format!(
942 "dataset '{}': source must be a .parquet file",
943 self.name
944 )));
945 }
946 return Ok(vec![path.to_path_buf()]);
947 }
948
949 let mut files: Vec<PathBuf> = std::fs::read_dir(path)
950 .map_err(|e| AppError::Internal(format!("read {loc}: {e}")))?
951 .filter_map(|entry| entry.ok().map(|e| e.path()))
952 .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("parquet"))
953 .collect();
954 files.sort();
955 if files.is_empty() {
956 return Err(AppError::Internal(format!(
957 "dataset '{}': no *.parquet files found in {loc}",
958 self.name
959 )));
960 }
961 Ok(files)
962 }
963
964 pub fn env_prefix(&self) -> String {
968 self.name
969 .chars()
970 .map(|c| {
971 if c.is_ascii_alphanumeric() {
972 c.to_ascii_uppercase()
973 } else {
974 '_'
975 }
976 })
977 .collect()
978 }
979
980 pub fn resolved_creds(&self) -> ResolvedCreds {
985 let prefix = self.env_prefix();
986 let from_env = |suffix: &str| {
987 std::env::var(format!("{prefix}_{suffix}"))
988 .ok()
989 .filter(|s| !s.is_empty())
990 };
991 let inline = self.s3.as_ref();
992 let plain_env = |k: &str| std::env::var(k).ok().filter(|s| !s.is_empty());
993
994 ResolvedCreds {
995 access_key_id: from_env("AWS_ACCESS_KEY_ID")
996 .or_else(|| inline.and_then(|s| s.access_key_id.clone()))
997 .or_else(|| plain_env("AWS_ACCESS_KEY_ID")),
998 secret_access_key: from_env("AWS_SECRET_ACCESS_KEY")
999 .or_else(|| inline.and_then(|s| s.secret_access_key.clone()))
1000 .or_else(|| plain_env("AWS_SECRET_ACCESS_KEY")),
1001 session_token: from_env("AWS_SESSION_TOKEN")
1002 .or_else(|| inline.and_then(|s| s.session_token.clone()))
1003 .or_else(|| plain_env("AWS_SESSION_TOKEN")),
1004 }
1005 }
1006
1007 pub fn resolved_region(&self) -> String {
1010 let prefix = self.env_prefix();
1011 std::env::var(format!("{prefix}_AWS_REGION"))
1012 .ok()
1013 .filter(|s| !s.is_empty())
1014 .or_else(|| self.s3.as_ref().and_then(|s| s.region.clone()))
1015 .or_else(|| std::env::var("AWS_REGION").ok().filter(|s| !s.is_empty()))
1016 .or_else(|| {
1017 std::env::var("AWS_DEFAULT_REGION")
1018 .ok()
1019 .filter(|s| !s.is_empty())
1020 })
1021 .unwrap_or_else(|| "us-east-1".to_string())
1022 }
1023}
1024
1025#[cfg(test)]
1026mod tests {
1027 use super::*;
1028
1029 #[test]
1030 fn server_defaults() {
1031 let s = ServerConfig::default();
1032 assert_eq!(s.backend, Backend::Datafusion);
1033 assert_eq!(s.port, 8080);
1034 assert!(s.compress);
1035 assert_eq!(s.max_body_bytes, 1024 * 1024);
1036 assert_eq!(s.max_page_size, 100_000);
1037 assert_eq!(s.request_timeout_ms, 30_000);
1038 assert!(!s.quack.enabled);
1039 assert_eq!(s.quack.uri, "quack:localhost");
1040 assert!(s.quack.token.is_none());
1041 assert!(!s.quack.allow_other_hostname);
1042 assert!(s.quack.read_only);
1043 assert_eq!(s.prefix, "");
1044 assert!(s.listen.is_loopback());
1045 }
1046
1047 #[test]
1048 fn server_overrides_from_toml() {
1049 let toml = r#"
1050 [server]
1051 backend = "duckdb"
1052 port = 9000
1053 prefix = "/datapress"
1054 compress = false
1055 max_body_bytes = 4096
1056 max_page_size = 50000
1057 request_timeout_ms = 0
1058
1059 [server.quack]
1060 enabled = true
1061 uri = "quack:localhost:9495"
1062 token = "test-token"
1063 read_only = false
1064 [[dataset]]
1065 name = "x"
1066 source.kind = "parquet"
1067 source.location = "/tmp/missing.parquet"
1068 "#;
1069 let cfg: AppConfig = toml::from_str(toml).unwrap();
1070 assert_eq!(cfg.server.backend, Backend::Duckdb);
1071 assert_eq!(cfg.server.port, 9000);
1072 assert_eq!(cfg.server.prefix, "/datapress");
1073 assert!(!cfg.server.compress);
1074 assert_eq!(cfg.server.max_body_bytes, 4096);
1075 assert_eq!(cfg.server.max_page_size, 50_000);
1076 assert_eq!(cfg.server.request_timeout_ms, 0);
1077 assert!(cfg.server.quack.enabled);
1078 assert_eq!(cfg.server.quack.uri, "quack:localhost:9495");
1079 assert_eq!(cfg.server.quack.token.as_deref(), Some("test-token"));
1080 assert!(!cfg.server.quack.read_only);
1081 assert_eq!(cfg.datasets.len(), 1);
1082 assert_eq!(cfg.datasets[0].name, "x");
1083 assert!(cfg.datasets[0].dict_encode); }
1085
1086 #[test]
1087 fn validate_rejects_bad_prefix() {
1088 let bad = ["no-leading-slash", "/trailing/"];
1089 for p in bad {
1090 let cfg = AppConfig {
1091 server: ServerConfig {
1092 prefix: p.to_string(),
1093 ..Default::default()
1094 },
1095 docs: DocsConfig::default(),
1096 swagger: SwaggerConfig::default(),
1097 metrics: MetricsConfig::default(),
1098 auth: AuthConfig::default(),
1099 datasets: vec![],
1100 };
1101 assert!(cfg.validate().is_err(), "prefix {p:?} should fail");
1102 }
1103 }
1104
1105 #[test]
1106 fn normalize_lowercases_configured_scopes() {
1107 let mut cfg = AppConfig {
1108 server: ServerConfig::default(),
1109 docs: DocsConfig::default(),
1110 swagger: SwaggerConfig::default(),
1111 metrics: MetricsConfig::default(),
1112 auth: AuthConfig {
1113 read_scopes: vec!["Datasets:Read".into(), "API.READ".into()],
1114 reload_scopes: vec!["Datasets:Reload".into()],
1115 ..Default::default()
1116 },
1117 datasets: vec![],
1118 };
1119 cfg.normalize();
1120 assert_eq!(cfg.auth.read_scopes, vec!["datasets:read", "api.read"]);
1121 assert_eq!(cfg.auth.reload_scopes, vec!["datasets:reload"]);
1122 }
1123
1124 #[test]
1125 fn validate_rejects_no_datasets() {
1126 let cfg = AppConfig {
1127 server: ServerConfig::default(),
1128 docs: DocsConfig::default(),
1129 swagger: SwaggerConfig::default(),
1130 metrics: MetricsConfig::default(),
1131 auth: AuthConfig::default(),
1132 datasets: vec![],
1133 };
1134 let err = cfg.validate().unwrap_err();
1135 assert!(matches!(err, AppError::Internal(m) if m.contains("[[dataset]]")));
1136 }
1137
1138 #[cfg(feature = "auth")]
1139 #[test]
1140 fn validate_accepts_auth_issuer_with_trailing_slash() {
1141 use std::io::Write;
1142
1143 let dir = std::env::temp_dir().join(format!("dp-auth-issuer-test-{}", std::process::id()));
1144 let _ = std::fs::remove_dir_all(&dir);
1145 std::fs::create_dir_all(&dir).unwrap();
1146 let file = dir.join("a.parquet");
1147 std::fs::File::create(&file)
1148 .unwrap()
1149 .write_all(b"x")
1150 .unwrap();
1151
1152 let cfg = AppConfig {
1153 server: ServerConfig::default(),
1154 docs: DocsConfig::default(),
1155 swagger: SwaggerConfig::default(),
1156 metrics: MetricsConfig::default(),
1157 auth: AuthConfig {
1158 enabled: true,
1159 issuer: "https://tenant.example.com/".into(),
1160 ..Default::default()
1161 },
1162 datasets: vec![DatasetConfig {
1163 name: "x".into(),
1164 source: SourceConfig {
1165 kind: SourceKind::Parquet,
1166 location: file.to_string_lossy().into_owned(),
1167 },
1168 s3: None,
1169 index: IndexConfig::default(),
1170 columns: vec![],
1171 dict_encode: true,
1172 lazy: false,
1173 }],
1174 };
1175
1176 assert!(cfg.validate().is_ok());
1177 let _ = std::fs::remove_dir_all(&dir);
1178 }
1179
1180 #[test]
1181 fn validate_rejects_quack_non_local_host_without_override() {
1182 let cfg = AppConfig {
1183 server: ServerConfig {
1184 quack: QuackConfig {
1185 enabled: true,
1186 uri: "quack:127.0.0.1".into(),
1187 token: Some("test-token".into()),
1188 ..Default::default()
1189 },
1190 ..Default::default()
1191 },
1192 docs: DocsConfig::default(),
1193 swagger: SwaggerConfig::default(),
1194 metrics: MetricsConfig::default(),
1195 auth: AuthConfig::default(),
1196 datasets: vec![DatasetConfig {
1197 name: "x".into(),
1198 source: SourceConfig {
1199 kind: SourceKind::Parquet,
1200 location: "/tmp/missing.parquet".into(),
1201 },
1202 s3: None,
1203 index: IndexConfig::default(),
1204 columns: vec![],
1205 dict_encode: true,
1206 lazy: false,
1207 }],
1208 };
1209 let err = cfg.validate().unwrap_err();
1210 assert!(matches!(err, AppError::Internal(m) if m.contains("host must be 'localhost'")));
1211 }
1212
1213 #[test]
1214 fn validate_rejects_bad_dataset_name() {
1215 let cfg: AppConfig = toml::from_str(
1216 r#"
1217 [[dataset]]
1218 name = "bad name!"
1219 source.kind = "parquet"
1220 source.location = "/tmp/whatever"
1221 "#,
1222 )
1223 .unwrap();
1224 let err = cfg.validate().unwrap_err();
1225 assert!(matches!(err, AppError::Internal(m) if m.contains("alphanumeric")));
1226 }
1227
1228 #[test]
1229 fn validate_rejects_duplicate_names() {
1230 use std::io::Write;
1231 let dir = std::env::temp_dir().join(format!("dp-dup-test-{}", std::process::id()));
1232 let _ = std::fs::remove_dir_all(&dir);
1233 std::fs::create_dir_all(&dir).unwrap();
1234 let f = dir.join("a.parquet");
1235 std::fs::File::create(&f).unwrap().write_all(b"x").unwrap();
1236 let path = f.to_str().unwrap();
1237
1238 let cfg: AppConfig = toml::from_str(&format!(
1239 r#"
1240 [[dataset]]
1241 name = "a"
1242 source.kind = "parquet"
1243 source.location = "{path}"
1244 [[dataset]]
1245 name = "a"
1246 source.kind = "parquet"
1247 source.location = "{path}"
1248 "#
1249 ))
1250 .unwrap();
1251 let err = cfg.validate().expect_err("expected error");
1252 assert!(matches!(err, AppError::Internal(m) if m.contains("duplicate")));
1253
1254 let _ = std::fs::remove_dir_all(&dir);
1255 }
1256
1257 #[test]
1258 fn s3_bucket_parsing() {
1259 let mk = |loc: &str| SourceConfig {
1260 kind: SourceKind::Parquet,
1261 location: loc.into(),
1262 };
1263 let s1 = mk("s3://bucket/path/key");
1264 assert_eq!(s1.s3_bucket().unwrap(), ("bucket", "path/key"));
1265 let s2 = mk("s3://only-bucket");
1266 assert_eq!(s2.s3_bucket().unwrap(), ("only-bucket", ""));
1267 assert!(mk("s3:///nokey").s3_bucket().is_err());
1268 assert!(mk("/local/path").s3_bucket().is_err());
1269 }
1270
1271 #[test]
1272 fn env_prefix_sanitises_name() {
1273 let mk = |name: &str| DatasetConfig {
1274 name: name.into(),
1275 source: SourceConfig {
1276 kind: SourceKind::Parquet,
1277 location: "x".into(),
1278 },
1279 s3: None,
1280 index: IndexConfig::default(),
1281 columns: vec![],
1282 dict_encode: true,
1283 lazy: false,
1284 };
1285 assert_eq!(mk("accidents").env_prefix(), "ACCIDENTS");
1286 assert_eq!(mk("sales.eu-1").env_prefix(), "SALES_EU_1");
1287 assert_eq!(mk("a_b.c-d").env_prefix(), "A_B_C_D");
1288 }
1289
1290 #[test]
1291 fn resolve_local_parquet_single_file_and_dir() {
1292 use std::io::Write;
1293 let dir = std::env::temp_dir().join(format!("dp-cfg-test-{}", std::process::id()));
1294 let _ = std::fs::remove_dir_all(&dir);
1295 std::fs::create_dir_all(&dir).unwrap();
1296 let f = dir.join("a.parquet");
1297 let mut fh = std::fs::File::create(&f).unwrap();
1298 fh.write_all(b"not really parquet").unwrap();
1299
1300 let mk = |loc: &str| DatasetConfig {
1301 name: "ds".into(),
1302 source: SourceConfig {
1303 kind: SourceKind::Parquet,
1304 location: loc.into(),
1305 },
1306 s3: None,
1307 index: IndexConfig::default(),
1308 columns: vec![],
1309 dict_encode: true,
1310 lazy: false,
1311 };
1312
1313 let files = mk(f.to_str().unwrap())
1315 .resolve_local_parquet_files()
1316 .unwrap();
1317 assert_eq!(files, vec![f.clone()]);
1318
1319 let files = mk(dir.to_str().unwrap())
1321 .resolve_local_parquet_files()
1322 .unwrap();
1323 assert_eq!(files, vec![f.clone()]);
1324
1325 assert!(
1327 mk("/no/such/place.parquet")
1328 .resolve_local_parquet_files()
1329 .is_err()
1330 );
1331
1332 let _ = std::fs::remove_dir_all(&dir);
1333 }
1334}