1use std::borrow::Cow;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use anyhow::Context;
8use anyhow::Result;
9use anyhow::anyhow;
10use anyhow::bail;
11use crankshaft::events::Event;
12use indexmap::IndexMap;
13use secrecy::ExposeSecret;
14use serde::Deserialize;
15use serde::Serialize;
16use tokio::sync::broadcast;
17use tracing::warn;
18use url::Url;
19
20use crate::DockerBackend;
21use crate::LocalBackend;
22use crate::LsfApptainerBackend;
23use crate::LsfApptainerBackendConfig;
24use crate::SYSTEM;
25use crate::TaskExecutionBackend;
26use crate::TesBackend;
27use crate::convert_unit_string;
28use crate::path::is_url;
29
30pub const MAX_RETRIES: u64 = 100;
32
33pub const DEFAULT_TASK_SHELL: &str = "bash";
35
36pub const DEFAULT_BACKEND_NAME: &str = "default";
38
39const REDACTED: &str = "<REDACTED>";
41
42#[derive(Debug, Clone)]
46pub struct SecretString {
47 inner: secrecy::SecretString,
51 redacted: bool,
58}
59
60impl SecretString {
61 pub fn redact(&mut self) {
66 self.redacted = true;
67 }
68
69 pub fn unredact(&mut self) {
71 self.redacted = false;
72 }
73
74 pub fn inner(&self) -> &secrecy::SecretString {
76 &self.inner
77 }
78}
79
80impl From<String> for SecretString {
81 fn from(s: String) -> Self {
82 Self {
83 inner: s.into(),
84 redacted: true,
85 }
86 }
87}
88
89impl From<&str> for SecretString {
90 fn from(s: &str) -> Self {
91 Self {
92 inner: s.into(),
93 redacted: true,
94 }
95 }
96}
97
98impl Default for SecretString {
99 fn default() -> Self {
100 Self {
101 inner: Default::default(),
102 redacted: true,
103 }
104 }
105}
106
107impl serde::Serialize for SecretString {
108 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
109 where
110 S: serde::Serializer,
111 {
112 use secrecy::ExposeSecret;
113
114 if self.redacted {
115 serializer.serialize_str(REDACTED)
116 } else {
117 serializer.serialize_str(self.inner.expose_secret())
118 }
119 }
120}
121
122impl<'de> serde::Deserialize<'de> for SecretString {
123 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
124 where
125 D: serde::Deserializer<'de>,
126 {
127 let inner = secrecy::SecretString::deserialize(deserializer)?;
128 Ok(Self {
129 inner,
130 redacted: true,
131 })
132 }
133}
134
135#[derive(Debug, Default, Clone, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case", deny_unknown_fields)]
147pub struct Config {
148 #[serde(default)]
150 pub http: HttpConfig,
151 #[serde(default)]
153 pub workflow: WorkflowConfig,
154 #[serde(default)]
156 pub task: TaskConfig,
157 #[serde(skip_serializing_if = "Option::is_none")]
162 pub backend: Option<String>,
163 #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
171 pub backends: IndexMap<String, BackendConfig>,
172 #[serde(default)]
174 pub storage: StorageConfig,
175 #[serde(default)]
189 pub suppress_env_specific_output: bool,
190 #[serde(default)]
197 pub experimental_features_enabled: bool,
198}
199
200impl Config {
201 pub async fn validate(&self) -> Result<()> {
203 self.http.validate()?;
204 self.workflow.validate()?;
205 self.task.validate()?;
206
207 if self.backend.is_none() && self.backends.len() < 2 {
208 } else {
211 let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
213 if !self.backends.contains_key(backend) {
214 bail!("a backend named `{backend}` is not present in the configuration");
215 }
216 }
217
218 for backend in self.backends.values() {
219 backend.validate(self).await?;
220 }
221
222 self.storage.validate()?;
223
224 if self.suppress_env_specific_output && !self.experimental_features_enabled {
225 bail!("`suppress_env_specific_output` requires enabling experimental features");
226 }
227
228 Ok(())
229 }
230
231 pub fn redact(&mut self) {
235 for backend in self.backends.values_mut() {
236 backend.redact();
237 }
238
239 self.storage.azure.redact();
240
241 if let Some(auth) = &mut self.storage.s3.auth {
242 auth.redact();
243 }
244
245 if let Some(auth) = &mut self.storage.google.auth {
246 auth.redact();
247 }
248 }
249
250 pub fn unredact(&mut self) {
254 for backend in self.backends.values_mut() {
255 backend.unredact();
256 }
257
258 self.storage.azure.unredact();
259
260 if let Some(auth) = &mut self.storage.s3.auth {
261 auth.unredact();
262 }
263
264 if let Some(auth) = &mut self.storage.google.auth {
265 auth.unredact();
266 }
267 }
268
269 pub async fn create_backend(
271 self: &Arc<Self>,
272 events: Option<broadcast::Sender<Event>>,
273 ) -> Result<Arc<dyn TaskExecutionBackend>> {
274 let config = if self.backend.is_none() && self.backends.len() < 2 {
275 if self.backends.len() == 1 {
276 Cow::Borrowed(self.backends.values().next().unwrap())
278 } else {
279 Cow::Owned(BackendConfig::default())
281 }
282 } else {
283 let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
285 Cow::Borrowed(self.backends.get(backend).ok_or_else(|| {
286 anyhow!("a backend named `{backend}` is not present in the configuration")
287 })?)
288 };
289
290 match config.as_ref() {
291 BackendConfig::Local(config) => {
292 warn!(
293 "the engine is configured to use the local backend: tasks will not be run \
294 inside of a container"
295 );
296 Ok(Arc::new(LocalBackend::new(self.clone(), config, events)?))
297 }
298 BackendConfig::Docker(config) => Ok(Arc::new(
299 DockerBackend::new(self.clone(), config, events).await?,
300 )),
301 BackendConfig::Tes(config) => Ok(Arc::new(
302 TesBackend::new(self.clone(), config, events).await?,
303 )),
304 BackendConfig::LsfApptainer(config) => Ok(Arc::new(LsfApptainerBackend::new(
305 self.clone(),
306 config.clone(),
307 events,
308 ))),
309 }
310 }
311}
312
313#[derive(Debug, Default, Clone, Serialize, Deserialize)]
315#[serde(rename_all = "snake_case", deny_unknown_fields)]
316pub struct HttpConfig {
317 #[serde(default, skip_serializing_if = "Option::is_none")]
321 pub cache: Option<PathBuf>,
322 #[serde(default, skip_serializing_if = "Option::is_none")]
326 pub retries: Option<usize>,
327 #[serde(default, skip_serializing_if = "Option::is_none")]
331 pub parallelism: Option<usize>,
332}
333
334impl HttpConfig {
335 pub fn validate(&self) -> Result<()> {
337 if let Some(parallelism) = self.parallelism
338 && parallelism == 0
339 {
340 bail!("configuration value `http.parallelism` cannot be zero");
341 }
342 Ok(())
343 }
344}
345
346#[derive(Debug, Default, Clone, Serialize, Deserialize)]
348#[serde(rename_all = "snake_case", deny_unknown_fields)]
349pub struct StorageConfig {
350 #[serde(default)]
352 pub azure: AzureStorageConfig,
353 #[serde(default)]
355 pub s3: S3StorageConfig,
356 #[serde(default)]
358 pub google: GoogleStorageConfig,
359}
360
361impl StorageConfig {
362 pub fn validate(&self) -> Result<()> {
364 self.azure.validate()?;
365 self.s3.validate()?;
366 self.google.validate()?;
367 Ok(())
368 }
369}
370
371#[derive(Debug, Default, Clone, Serialize, Deserialize)]
373#[serde(rename_all = "snake_case", deny_unknown_fields)]
374pub struct AzureStorageConfig {
375 #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
384 pub auth: IndexMap<String, IndexMap<String, SecretString>>,
385}
386
387impl AzureStorageConfig {
388 pub fn validate(&self) -> Result<()> {
390 Ok(())
391 }
392
393 pub fn redact(&mut self) {
395 for v in self.auth.values_mut() {
396 for v in v.values_mut() {
397 v.redact();
398 }
399 }
400 }
401
402 pub fn unredact(&mut self) {
404 for v in self.auth.values_mut() {
405 for v in v.values_mut() {
406 v.unredact();
407 }
408 }
409 }
410}
411
412#[derive(Debug, Default, Clone, Serialize, Deserialize)]
414#[serde(rename_all = "snake_case", deny_unknown_fields)]
415pub struct S3StorageAuthConfig {
416 pub access_key_id: String,
418 pub secret_access_key: SecretString,
420}
421
422impl S3StorageAuthConfig {
423 pub fn validate(&self) -> Result<()> {
425 if self.access_key_id.is_empty() {
426 bail!("configuration value `storage.s3.auth.access_key_id` is required");
427 }
428
429 if self.secret_access_key.inner.expose_secret().is_empty() {
430 bail!("configuration value `storage.s3.auth.secret_access_key` is required");
431 }
432
433 Ok(())
434 }
435
436 pub fn redact(&mut self) {
439 self.secret_access_key.redact();
440 }
441
442 pub fn unredact(&mut self) {
445 self.secret_access_key.unredact();
446 }
447}
448
449#[derive(Debug, Default, Clone, Serialize, Deserialize)]
451#[serde(rename_all = "snake_case", deny_unknown_fields)]
452pub struct S3StorageConfig {
453 #[serde(default, skip_serializing_if = "Option::is_none")]
458 pub region: Option<String>,
459
460 #[serde(default, skip_serializing_if = "Option::is_none")]
462 pub auth: Option<S3StorageAuthConfig>,
463}
464
465impl S3StorageConfig {
466 pub fn validate(&self) -> Result<()> {
468 if let Some(auth) = &self.auth {
469 auth.validate()?;
470 }
471
472 Ok(())
473 }
474}
475
476#[derive(Debug, Default, Clone, Serialize, Deserialize)]
478#[serde(rename_all = "snake_case", deny_unknown_fields)]
479pub struct GoogleStorageAuthConfig {
480 pub access_key: String,
482 pub secret: SecretString,
484}
485
486impl GoogleStorageAuthConfig {
487 pub fn validate(&self) -> Result<()> {
489 if self.access_key.is_empty() {
490 bail!("configuration value `storage.google.auth.access_key` is required");
491 }
492
493 if self.secret.inner.expose_secret().is_empty() {
494 bail!("configuration value `storage.google.auth.secret` is required");
495 }
496
497 Ok(())
498 }
499
500 pub fn redact(&mut self) {
503 self.secret.redact();
504 }
505
506 pub fn unredact(&mut self) {
509 self.secret.unredact();
510 }
511}
512
513#[derive(Debug, Default, Clone, Serialize, Deserialize)]
515#[serde(rename_all = "snake_case", deny_unknown_fields)]
516pub struct GoogleStorageConfig {
517 #[serde(default, skip_serializing_if = "Option::is_none")]
519 pub auth: Option<GoogleStorageAuthConfig>,
520}
521
522impl GoogleStorageConfig {
523 pub fn validate(&self) -> Result<()> {
525 if let Some(auth) = &self.auth {
526 auth.validate()?;
527 }
528
529 Ok(())
530 }
531}
532
533#[derive(Debug, Default, Clone, Serialize, Deserialize)]
535#[serde(rename_all = "snake_case", deny_unknown_fields)]
536pub struct WorkflowConfig {
537 #[serde(default)]
539 pub scatter: ScatterConfig,
540}
541
542impl WorkflowConfig {
543 pub fn validate(&self) -> Result<()> {
545 self.scatter.validate()?;
546 Ok(())
547 }
548}
549
550#[derive(Debug, Default, Clone, Serialize, Deserialize)]
552#[serde(rename_all = "snake_case", deny_unknown_fields)]
553pub struct ScatterConfig {
554 #[serde(default, skip_serializing_if = "Option::is_none")]
607 pub concurrency: Option<u64>,
608}
609
610impl ScatterConfig {
611 pub fn validate(&self) -> Result<()> {
613 if let Some(concurrency) = self.concurrency
614 && concurrency == 0
615 {
616 bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
617 }
618
619 Ok(())
620 }
621}
622
623#[derive(Debug, Default, Clone, Serialize, Deserialize)]
625#[serde(rename_all = "snake_case", deny_unknown_fields)]
626pub struct TaskConfig {
627 #[serde(default, skip_serializing_if = "Option::is_none")]
633 pub retries: Option<u64>,
634 #[serde(default, skip_serializing_if = "Option::is_none")]
639 pub container: Option<String>,
640 #[serde(default, skip_serializing_if = "Option::is_none")]
648 pub shell: Option<String>,
649 #[serde(default)]
651 pub cpu_limit_behavior: TaskResourceLimitBehavior,
652 #[serde(default)]
654 pub memory_limit_behavior: TaskResourceLimitBehavior,
655}
656
657impl TaskConfig {
658 pub fn validate(&self) -> Result<()> {
660 if self.retries.unwrap_or(0) > MAX_RETRIES {
661 bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
662 }
663
664 Ok(())
665 }
666}
667
668#[derive(Debug, Default, Clone, Serialize, Deserialize)]
671#[serde(rename_all = "snake_case", deny_unknown_fields)]
672pub enum TaskResourceLimitBehavior {
673 TryWithMax,
676 #[default]
680 Deny,
681}
682
683#[derive(Debug, Clone, Serialize, Deserialize)]
685#[serde(rename_all = "snake_case", tag = "type")]
686pub enum BackendConfig {
687 Local(LocalBackendConfig),
689 Docker(DockerBackendConfig),
691 Tes(Box<TesBackendConfig>),
693 LsfApptainer(Arc<LsfApptainerBackendConfig>),
697}
698
699impl Default for BackendConfig {
700 fn default() -> Self {
701 Self::Docker(Default::default())
702 }
703}
704
705impl BackendConfig {
706 pub async fn validate(&self, engine_config: &Config) -> Result<()> {
708 match self {
709 Self::Local(config) => config.validate(),
710 Self::Docker(config) => config.validate(),
711 Self::Tes(config) => config.validate(),
712 Self::LsfApptainer(config) => config.validate(engine_config).await,
713 }
714 }
715
716 pub fn as_local(&self) -> Option<&LocalBackendConfig> {
720 match self {
721 Self::Local(config) => Some(config),
722 _ => None,
723 }
724 }
725
726 pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
730 match self {
731 Self::Docker(config) => Some(config),
732 _ => None,
733 }
734 }
735
736 pub fn as_tes(&self) -> Option<&TesBackendConfig> {
740 match self {
741 Self::Tes(config) => Some(config),
742 _ => None,
743 }
744 }
745
746 pub fn redact(&mut self) {
748 match self {
749 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) => {}
750 Self::Tes(config) => config.redact(),
751 }
752 }
753
754 pub fn unredact(&mut self) {
756 match self {
757 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) => {}
758 Self::Tes(config) => config.unredact(),
759 }
760 }
761}
762
763#[derive(Debug, Default, Clone, Serialize, Deserialize)]
770#[serde(rename_all = "snake_case", deny_unknown_fields)]
771pub struct LocalBackendConfig {
772 #[serde(default, skip_serializing_if = "Option::is_none")]
778 pub cpu: Option<u64>,
779
780 #[serde(default, skip_serializing_if = "Option::is_none")]
787 pub memory: Option<String>,
788}
789
790impl LocalBackendConfig {
791 pub fn validate(&self) -> Result<()> {
793 if let Some(cpu) = self.cpu {
794 if cpu == 0 {
795 bail!("local backend configuration value `cpu` cannot be zero");
796 }
797
798 let total = SYSTEM.cpus().len() as u64;
799 if cpu > total {
800 bail!(
801 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
802 available to the host ({total})"
803 );
804 }
805 }
806
807 if let Some(memory) = &self.memory {
808 let memory = convert_unit_string(memory).with_context(|| {
809 format!("local backend configuration value `memory` has invalid value `{memory}`")
810 })?;
811
812 if memory == 0 {
813 bail!("local backend configuration value `memory` cannot be zero");
814 }
815
816 let total = SYSTEM.total_memory();
817 if memory > total {
818 bail!(
819 "local backend configuration value `memory` cannot exceed the total memory of \
820 the host ({total} bytes)"
821 );
822 }
823 }
824
825 Ok(())
826 }
827}
828
829const fn cleanup_default() -> bool {
831 true
832}
833
834#[derive(Debug, Clone, Serialize, Deserialize)]
836#[serde(rename_all = "snake_case", deny_unknown_fields)]
837pub struct DockerBackendConfig {
838 #[serde(default = "cleanup_default")]
842 pub cleanup: bool,
843}
844
845impl DockerBackendConfig {
846 pub fn validate(&self) -> Result<()> {
848 Ok(())
849 }
850}
851
852impl Default for DockerBackendConfig {
853 fn default() -> Self {
854 Self { cleanup: true }
855 }
856}
857
858#[derive(Debug, Default, Clone, Serialize, Deserialize)]
860#[serde(rename_all = "snake_case", deny_unknown_fields)]
861pub struct BasicAuthConfig {
862 #[serde(default)]
864 pub username: String,
865 #[serde(default)]
867 pub password: SecretString,
868}
869
870impl BasicAuthConfig {
871 pub fn validate(&self) -> Result<()> {
873 Ok(())
874 }
875
876 pub fn redact(&mut self) {
878 self.password.redact();
879 }
880
881 pub fn unredact(&mut self) {
883 self.password.unredact();
884 }
885}
886
887#[derive(Debug, Default, Clone, Serialize, Deserialize)]
889#[serde(rename_all = "snake_case", deny_unknown_fields)]
890pub struct BearerAuthConfig {
891 #[serde(default)]
893 pub token: SecretString,
894}
895
896impl BearerAuthConfig {
897 pub fn validate(&self) -> Result<()> {
899 Ok(())
900 }
901
902 pub fn redact(&mut self) {
904 self.token.redact();
905 }
906
907 pub fn unredact(&mut self) {
909 self.token.unredact();
910 }
911}
912
913#[derive(Debug, Clone, Serialize, Deserialize)]
915#[serde(rename_all = "snake_case", tag = "type")]
916pub enum TesBackendAuthConfig {
917 Basic(BasicAuthConfig),
919 Bearer(BearerAuthConfig),
921}
922
923impl TesBackendAuthConfig {
924 pub fn validate(&self) -> Result<()> {
926 match self {
927 Self::Basic(config) => config.validate(),
928 Self::Bearer(config) => config.validate(),
929 }
930 }
931
932 pub fn redact(&mut self) {
935 match self {
936 Self::Basic(auth) => auth.redact(),
937 Self::Bearer(auth) => auth.redact(),
938 }
939 }
940
941 pub fn unredact(&mut self) {
944 match self {
945 Self::Basic(auth) => auth.unredact(),
946 Self::Bearer(auth) => auth.unredact(),
947 }
948 }
949}
950
951#[derive(Debug, Default, Clone, Serialize, Deserialize)]
953#[serde(rename_all = "snake_case", deny_unknown_fields)]
954pub struct TesBackendConfig {
955 #[serde(default, skip_serializing_if = "Option::is_none")]
957 pub url: Option<Url>,
958
959 #[serde(default, skip_serializing_if = "Option::is_none")]
961 pub auth: Option<TesBackendAuthConfig>,
962
963 #[serde(default, skip_serializing_if = "Option::is_none")]
965 pub inputs: Option<Url>,
966
967 #[serde(default, skip_serializing_if = "Option::is_none")]
969 pub outputs: Option<Url>,
970
971 #[serde(default, skip_serializing_if = "Option::is_none")]
975 pub interval: Option<u64>,
976
977 pub retries: Option<u32>,
982
983 #[serde(default, skip_serializing_if = "Option::is_none")]
988 pub max_concurrency: Option<u32>,
989
990 #[serde(default)]
993 pub insecure: bool,
994}
995
996impl TesBackendConfig {
997 pub fn validate(&self) -> Result<()> {
999 match &self.url {
1000 Some(url) => {
1001 if !self.insecure && url.scheme() != "https" {
1002 bail!(
1003 "TES backend configuration value `url` has invalid value `{url}`: URL \
1004 must use a HTTPS scheme"
1005 );
1006 }
1007 }
1008 None => bail!("TES backend configuration value `url` is required"),
1009 }
1010
1011 if let Some(auth) = &self.auth {
1012 auth.validate()?;
1013 }
1014
1015 if let Some(max_concurrency) = self.max_concurrency
1016 && max_concurrency == 0
1017 {
1018 bail!("TES backend configuration value `max_concurrency` cannot be zero");
1019 }
1020
1021 match &self.inputs {
1022 Some(url) => {
1023 if !is_url(url.as_str()) {
1024 bail!(
1025 "TES backend storage configuration value `inputs` has invalid value \
1026 `{url}`: URL scheme is not supported"
1027 );
1028 }
1029
1030 if !url.path().ends_with('/') {
1031 bail!(
1032 "TES backend storage configuration value `inputs` has invalid value \
1033 `{url}`: URL path must end with a slash"
1034 );
1035 }
1036 }
1037 None => bail!("TES backend configuration value `inputs` is required"),
1038 }
1039
1040 match &self.outputs {
1041 Some(url) => {
1042 if !is_url(url.as_str()) {
1043 bail!(
1044 "TES backend storage configuration value `outputs` has invalid value \
1045 `{url}`: URL scheme is not supported"
1046 );
1047 }
1048
1049 if !url.path().ends_with('/') {
1050 bail!(
1051 "TES backend storage configuration value `outputs` has invalid value \
1052 `{url}`: URL path must end with a slash"
1053 );
1054 }
1055 }
1056 None => bail!("TES backend storage configuration value `outputs` is required"),
1057 }
1058
1059 Ok(())
1060 }
1061
1062 pub fn redact(&mut self) {
1064 if let Some(auth) = &mut self.auth {
1065 auth.redact();
1066 }
1067 }
1068
1069 pub fn unredact(&mut self) {
1071 if let Some(auth) = &mut self.auth {
1072 auth.unredact();
1073 }
1074 }
1075}
1076
1077#[cfg(test)]
1078mod test {
1079 use pretty_assertions::assert_eq;
1080
1081 use super::*;
1082
1083 #[test]
1084 fn redacted_secret() {
1085 let mut secret: SecretString = "secret".into();
1086
1087 assert_eq!(
1088 serde_json::to_string(&secret).unwrap(),
1089 format!(r#""{REDACTED}""#)
1090 );
1091
1092 secret.unredact();
1093 assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1094
1095 secret.redact();
1096 assert_eq!(
1097 serde_json::to_string(&secret).unwrap(),
1098 format!(r#""{REDACTED}""#)
1099 );
1100 }
1101
1102 #[test]
1103 fn redacted_config() {
1104 let config = Config {
1105 backends: [
1106 (
1107 "first".to_string(),
1108 BackendConfig::Tes(
1109 TesBackendConfig {
1110 auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1111 username: "foo".into(),
1112 password: "secret".into(),
1113 })),
1114 ..Default::default()
1115 }
1116 .into(),
1117 ),
1118 ),
1119 (
1120 "second".to_string(),
1121 BackendConfig::Tes(
1122 TesBackendConfig {
1123 auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1124 token: "secret".into(),
1125 })),
1126 ..Default::default()
1127 }
1128 .into(),
1129 ),
1130 ),
1131 ]
1132 .into(),
1133 storage: StorageConfig {
1134 azure: AzureStorageConfig {
1135 auth: [("foo".into(), [("bar".into(), "secret".into())].into())].into(),
1136 },
1137 s3: S3StorageConfig {
1138 auth: Some(S3StorageAuthConfig {
1139 access_key_id: "foo".into(),
1140 secret_access_key: "secret".into(),
1141 }),
1142 ..Default::default()
1143 },
1144 google: GoogleStorageConfig {
1145 auth: Some(GoogleStorageAuthConfig {
1146 access_key: "foo".into(),
1147 secret: "secret".into(),
1148 }),
1149 },
1150 },
1151 ..Default::default()
1152 };
1153
1154 let json = serde_json::to_string_pretty(&config).unwrap();
1155 assert!(json.contains("secret"), "`{json}` contains a secret");
1156 }
1157
1158 #[tokio::test]
1159 async fn test_config_validate() {
1160 let mut config = Config::default();
1162 config.task.retries = Some(1000000);
1163 assert_eq!(
1164 config.validate().await.unwrap_err().to_string(),
1165 "configuration value `task.retries` cannot exceed 100"
1166 );
1167
1168 let mut config = Config::default();
1170 config.workflow.scatter.concurrency = Some(0);
1171 assert_eq!(
1172 config.validate().await.unwrap_err().to_string(),
1173 "configuration value `workflow.scatter.concurrency` cannot be zero"
1174 );
1175
1176 let config = Config {
1178 backend: Some("foo".into()),
1179 ..Default::default()
1180 };
1181 assert_eq!(
1182 config.validate().await.unwrap_err().to_string(),
1183 "a backend named `foo` is not present in the configuration"
1184 );
1185 let config = Config {
1186 backend: Some("bar".into()),
1187 backends: [("foo".to_string(), BackendConfig::default())].into(),
1188 ..Default::default()
1189 };
1190 assert_eq!(
1191 config.validate().await.unwrap_err().to_string(),
1192 "a backend named `bar` is not present in the configuration"
1193 );
1194
1195 let config = Config {
1197 backends: [("foo".to_string(), BackendConfig::default())].into(),
1198 ..Default::default()
1199 };
1200 config.validate().await.expect("config should validate");
1201
1202 let config = Config {
1204 backends: [(
1205 "default".to_string(),
1206 BackendConfig::Local(LocalBackendConfig {
1207 cpu: Some(0),
1208 ..Default::default()
1209 }),
1210 )]
1211 .into(),
1212 ..Default::default()
1213 };
1214 assert_eq!(
1215 config.validate().await.unwrap_err().to_string(),
1216 "local backend configuration value `cpu` cannot be zero"
1217 );
1218 let config = Config {
1219 backends: [(
1220 "default".to_string(),
1221 BackendConfig::Local(LocalBackendConfig {
1222 cpu: Some(10000000),
1223 ..Default::default()
1224 }),
1225 )]
1226 .into(),
1227 ..Default::default()
1228 };
1229 assert!(
1230 config
1231 .validate()
1232 .await
1233 .unwrap_err()
1234 .to_string()
1235 .starts_with(
1236 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1237 available to the host"
1238 )
1239 );
1240
1241 let config = Config {
1243 backends: [(
1244 "default".to_string(),
1245 BackendConfig::Local(LocalBackendConfig {
1246 memory: Some("0 GiB".to_string()),
1247 ..Default::default()
1248 }),
1249 )]
1250 .into(),
1251 ..Default::default()
1252 };
1253 assert_eq!(
1254 config.validate().await.unwrap_err().to_string(),
1255 "local backend configuration value `memory` cannot be zero"
1256 );
1257 let config = Config {
1258 backends: [(
1259 "default".to_string(),
1260 BackendConfig::Local(LocalBackendConfig {
1261 memory: Some("100 meows".to_string()),
1262 ..Default::default()
1263 }),
1264 )]
1265 .into(),
1266 ..Default::default()
1267 };
1268 assert_eq!(
1269 config.validate().await.unwrap_err().to_string(),
1270 "local backend configuration value `memory` has invalid value `100 meows`"
1271 );
1272
1273 let config = Config {
1274 backends: [(
1275 "default".to_string(),
1276 BackendConfig::Local(LocalBackendConfig {
1277 memory: Some("1000 TiB".to_string()),
1278 ..Default::default()
1279 }),
1280 )]
1281 .into(),
1282 ..Default::default()
1283 };
1284 assert!(
1285 config
1286 .validate()
1287 .await
1288 .unwrap_err()
1289 .to_string()
1290 .starts_with(
1291 "local backend configuration value `memory` cannot exceed the total memory of \
1292 the host"
1293 )
1294 );
1295
1296 let config = Config {
1298 backends: [(
1299 "default".to_string(),
1300 BackendConfig::Tes(Default::default()),
1301 )]
1302 .into(),
1303 ..Default::default()
1304 };
1305 assert_eq!(
1306 config.validate().await.unwrap_err().to_string(),
1307 "TES backend configuration value `url` is required"
1308 );
1309
1310 let config = Config {
1312 backends: [(
1313 "default".to_string(),
1314 BackendConfig::Tes(
1315 TesBackendConfig {
1316 url: Some("https://example.com".parse().unwrap()),
1317 max_concurrency: Some(0),
1318 ..Default::default()
1319 }
1320 .into(),
1321 ),
1322 )]
1323 .into(),
1324 ..Default::default()
1325 };
1326 assert_eq!(
1327 config.validate().await.unwrap_err().to_string(),
1328 "TES backend configuration value `max_concurrency` cannot be zero"
1329 );
1330
1331 let config = Config {
1333 backends: [(
1334 "default".to_string(),
1335 BackendConfig::Tes(
1336 TesBackendConfig {
1337 url: Some("http://example.com".parse().unwrap()),
1338 inputs: Some("http://example.com".parse().unwrap()),
1339 outputs: Some("http://example.com".parse().unwrap()),
1340 ..Default::default()
1341 }
1342 .into(),
1343 ),
1344 )]
1345 .into(),
1346 ..Default::default()
1347 };
1348 assert_eq!(
1349 config.validate().await.unwrap_err().to_string(),
1350 "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
1351 must use a HTTPS scheme"
1352 );
1353
1354 let config = Config {
1356 backends: [(
1357 "default".to_string(),
1358 BackendConfig::Tes(
1359 TesBackendConfig {
1360 url: Some("http://example.com".parse().unwrap()),
1361 inputs: Some("http://example.com".parse().unwrap()),
1362 outputs: Some("http://example.com".parse().unwrap()),
1363 insecure: true,
1364 ..Default::default()
1365 }
1366 .into(),
1367 ),
1368 )]
1369 .into(),
1370 ..Default::default()
1371 };
1372 config
1373 .validate()
1374 .await
1375 .expect("configuration should validate");
1376
1377 let mut config = Config::default();
1378 config.http.parallelism = Some(0);
1379 assert_eq!(
1380 config.validate().await.unwrap_err().to_string(),
1381 "configuration value `http.parallelism` cannot be zero"
1382 );
1383
1384 let mut config = Config::default();
1385 config.http.parallelism = Some(5);
1386 assert!(
1387 config.validate().await.is_ok(),
1388 "should pass for valid configuration"
1389 );
1390
1391 let mut config = Config::default();
1392 config.http.parallelism = None;
1393 assert!(
1394 config.validate().await.is_ok(),
1395 "should pass for default (None)"
1396 );
1397 }
1398}