1use std::borrow::Cow;
4use std::path::Path;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use anyhow::Context;
9use anyhow::Result;
10use anyhow::anyhow;
11use anyhow::bail;
12use crankshaft::events::Event;
13use indexmap::IndexMap;
14use secrecy::ExposeSecret;
15use serde::Deserialize;
16use serde::Serialize;
17use tokio::sync::broadcast;
18use tracing::warn;
19use url::Url;
20
21use crate::DockerBackend;
22use crate::LocalBackend;
23use crate::LsfApptainerBackend;
24use crate::LsfApptainerBackendConfig;
25use crate::SYSTEM;
26use crate::SlurmApptainerBackend;
27use crate::SlurmApptainerBackendConfig;
28use crate::TaskExecutionBackend;
29use crate::TesBackend;
30use crate::convert_unit_string;
31use crate::path::is_supported_url;
32
33pub const MAX_RETRIES: u64 = 100;
35
36pub const DEFAULT_TASK_SHELL: &str = "bash";
38
39pub const DEFAULT_BACKEND_NAME: &str = "default";
41
42const REDACTED: &str = "<REDACTED>";
44
45pub fn cache_dir() -> Result<PathBuf> {
47 const CACHE_DIR_ROOT: &str = "sprocket";
49
50 Ok(dirs::cache_dir()
51 .context("failed to determine user cache directory")?
52 .join(CACHE_DIR_ROOT))
53}
54
55#[derive(Debug, Clone)]
59pub struct SecretString {
60 inner: secrecy::SecretString,
64 redacted: bool,
71}
72
73impl SecretString {
74 pub fn redact(&mut self) {
79 self.redacted = true;
80 }
81
82 pub fn unredact(&mut self) {
84 self.redacted = false;
85 }
86
87 pub fn inner(&self) -> &secrecy::SecretString {
89 &self.inner
90 }
91}
92
93impl From<String> for SecretString {
94 fn from(s: String) -> Self {
95 Self {
96 inner: s.into(),
97 redacted: true,
98 }
99 }
100}
101
102impl From<&str> for SecretString {
103 fn from(s: &str) -> Self {
104 Self {
105 inner: s.into(),
106 redacted: true,
107 }
108 }
109}
110
111impl Default for SecretString {
112 fn default() -> Self {
113 Self {
114 inner: Default::default(),
115 redacted: true,
116 }
117 }
118}
119
120impl serde::Serialize for SecretString {
121 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
122 where
123 S: serde::Serializer,
124 {
125 use secrecy::ExposeSecret;
126
127 if self.redacted {
128 serializer.serialize_str(REDACTED)
129 } else {
130 serializer.serialize_str(self.inner.expose_secret())
131 }
132 }
133}
134
135impl<'de> serde::Deserialize<'de> for SecretString {
136 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
137 where
138 D: serde::Deserializer<'de>,
139 {
140 let inner = secrecy::SecretString::deserialize(deserializer)?;
141 Ok(Self {
142 inner,
143 redacted: true,
144 })
145 }
146}
147
148#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
151#[serde(rename_all = "snake_case")]
152pub enum FailureMode {
153 #[default]
156 Slow,
157 Fast,
161}
162
163#[derive(Debug, Default, Clone, Serialize, Deserialize)]
174#[serde(rename_all = "snake_case", deny_unknown_fields)]
175pub struct Config {
176 #[serde(default)]
178 pub http: HttpConfig,
179 #[serde(default)]
181 pub workflow: WorkflowConfig,
182 #[serde(default)]
184 pub task: TaskConfig,
185 #[serde(skip_serializing_if = "Option::is_none")]
190 pub backend: Option<String>,
191 #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
199 pub backends: IndexMap<String, BackendConfig>,
200 #[serde(default)]
202 pub storage: StorageConfig,
203 #[serde(default)]
217 pub suppress_env_specific_output: bool,
218 #[serde(default)]
225 pub experimental_features_enabled: bool,
226 #[serde(default, rename = "fail")]
234 pub failure_mode: FailureMode,
235}
236
237impl Config {
238 pub async fn validate(&self) -> Result<()> {
240 self.http.validate()?;
241 self.workflow.validate()?;
242 self.task.validate()?;
243
244 if self.backend.is_none() && self.backends.len() < 2 {
245 } else {
248 let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
250 if !self.backends.contains_key(backend) {
251 bail!("a backend named `{backend}` is not present in the configuration");
252 }
253 }
254
255 for backend in self.backends.values() {
256 backend.validate(self).await?;
257 }
258
259 self.storage.validate()?;
260
261 if self.suppress_env_specific_output && !self.experimental_features_enabled {
262 bail!("`suppress_env_specific_output` requires enabling experimental features");
263 }
264
265 Ok(())
266 }
267
268 pub fn redact(&mut self) {
272 for backend in self.backends.values_mut() {
273 backend.redact();
274 }
275
276 if let Some(auth) = &mut self.storage.azure.auth {
277 auth.redact();
278 }
279
280 if let Some(auth) = &mut self.storage.s3.auth {
281 auth.redact();
282 }
283
284 if let Some(auth) = &mut self.storage.google.auth {
285 auth.redact();
286 }
287 }
288
289 pub fn unredact(&mut self) {
293 for backend in self.backends.values_mut() {
294 backend.unredact();
295 }
296
297 if let Some(auth) = &mut self.storage.azure.auth {
298 auth.unredact();
299 }
300
301 if let Some(auth) = &mut self.storage.s3.auth {
302 auth.unredact();
303 }
304
305 if let Some(auth) = &mut self.storage.google.auth {
306 auth.unredact();
307 }
308 }
309
310 pub async fn create_backend(
312 self: &Arc<Self>,
313 run_root_dir: &Path,
314 events: Option<broadcast::Sender<Event>>,
315 ) -> Result<Arc<dyn TaskExecutionBackend>> {
316 let config = if self.backend.is_none() && self.backends.len() < 2 {
317 if self.backends.len() == 1 {
318 Cow::Borrowed(self.backends.values().next().unwrap())
320 } else {
321 Cow::Owned(BackendConfig::default())
323 }
324 } else {
325 let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
327 Cow::Borrowed(self.backends.get(backend).ok_or_else(|| {
328 anyhow!("a backend named `{backend}` is not present in the configuration")
329 })?)
330 };
331
332 match config.as_ref() {
333 BackendConfig::Local(config) => {
334 warn!(
335 "the engine is configured to use the local backend: tasks will not be run \
336 inside of a container"
337 );
338 Ok(Arc::new(LocalBackend::new(self.clone(), config, events)?))
339 }
340 BackendConfig::Docker(config) => Ok(Arc::new(
341 DockerBackend::new(self.clone(), config, events).await?,
342 )),
343 BackendConfig::Tes(config) => Ok(Arc::new(
344 TesBackend::new(self.clone(), config, events).await?,
345 )),
346 BackendConfig::LsfApptainer(config) => Ok(Arc::new(LsfApptainerBackend::new(
347 run_root_dir,
348 self.clone(),
349 config.clone(),
350 events,
351 ))),
352 BackendConfig::SlurmApptainer(config) => Ok(Arc::new(SlurmApptainerBackend::new(
353 run_root_dir,
354 self.clone(),
355 config.clone(),
356 events,
357 ))),
358 }
359 }
360}
361
362#[derive(Debug, Default, Clone, Serialize, Deserialize)]
364#[serde(rename_all = "snake_case", deny_unknown_fields)]
365pub struct HttpConfig {
366 #[serde(default, skip_serializing_if = "Option::is_none")]
370 pub cache_dir: Option<PathBuf>,
371 #[serde(default, skip_serializing_if = "Option::is_none")]
375 pub retries: Option<usize>,
376 #[serde(default, skip_serializing_if = "Option::is_none")]
380 pub parallelism: Option<usize>,
381}
382
383impl HttpConfig {
384 pub fn validate(&self) -> Result<()> {
386 if let Some(parallelism) = self.parallelism
387 && parallelism == 0
388 {
389 bail!("configuration value `http.parallelism` cannot be zero");
390 }
391 Ok(())
392 }
393}
394
395#[derive(Debug, Default, Clone, Serialize, Deserialize)]
397#[serde(rename_all = "snake_case", deny_unknown_fields)]
398pub struct StorageConfig {
399 #[serde(default)]
401 pub azure: AzureStorageConfig,
402 #[serde(default)]
404 pub s3: S3StorageConfig,
405 #[serde(default)]
407 pub google: GoogleStorageConfig,
408}
409
410impl StorageConfig {
411 pub fn validate(&self) -> Result<()> {
413 self.azure.validate()?;
414 self.s3.validate()?;
415 self.google.validate()?;
416 Ok(())
417 }
418}
419
420#[derive(Debug, Default, Clone, Serialize, Deserialize)]
422#[serde(rename_all = "snake_case", deny_unknown_fields)]
423pub struct AzureStorageAuthConfig {
424 pub account_name: String,
426 pub access_key: SecretString,
428}
429
430impl AzureStorageAuthConfig {
431 pub fn validate(&self) -> Result<()> {
433 if self.account_name.is_empty() {
434 bail!("configuration value `storage.azure.auth.account_name` is required");
435 }
436
437 if self.access_key.inner.expose_secret().is_empty() {
438 bail!("configuration value `storage.azure.auth.access_key` is required");
439 }
440
441 Ok(())
442 }
443
444 pub fn redact(&mut self) {
447 self.access_key.redact();
448 }
449
450 pub fn unredact(&mut self) {
453 self.access_key.unredact();
454 }
455}
456
457#[derive(Debug, Default, Clone, Serialize, Deserialize)]
459#[serde(rename_all = "snake_case", deny_unknown_fields)]
460pub struct AzureStorageConfig {
461 #[serde(default, skip_serializing_if = "Option::is_none")]
463 pub auth: Option<AzureStorageAuthConfig>,
464}
465
466impl AzureStorageConfig {
467 pub fn validate(&self) -> Result<()> {
469 if let Some(auth) = &self.auth {
470 auth.validate()?;
471 }
472
473 Ok(())
474 }
475}
476
477#[derive(Debug, Default, Clone, Serialize, Deserialize)]
479#[serde(rename_all = "snake_case", deny_unknown_fields)]
480pub struct S3StorageAuthConfig {
481 pub access_key_id: String,
483 pub secret_access_key: SecretString,
485}
486
487impl S3StorageAuthConfig {
488 pub fn validate(&self) -> Result<()> {
490 if self.access_key_id.is_empty() {
491 bail!("configuration value `storage.s3.auth.access_key_id` is required");
492 }
493
494 if self.secret_access_key.inner.expose_secret().is_empty() {
495 bail!("configuration value `storage.s3.auth.secret_access_key` is required");
496 }
497
498 Ok(())
499 }
500
501 pub fn redact(&mut self) {
504 self.secret_access_key.redact();
505 }
506
507 pub fn unredact(&mut self) {
510 self.secret_access_key.unredact();
511 }
512}
513
514#[derive(Debug, Default, Clone, Serialize, Deserialize)]
516#[serde(rename_all = "snake_case", deny_unknown_fields)]
517pub struct S3StorageConfig {
518 #[serde(default, skip_serializing_if = "Option::is_none")]
523 pub region: Option<String>,
524
525 #[serde(default, skip_serializing_if = "Option::is_none")]
527 pub auth: Option<S3StorageAuthConfig>,
528}
529
530impl S3StorageConfig {
531 pub fn validate(&self) -> Result<()> {
533 if let Some(auth) = &self.auth {
534 auth.validate()?;
535 }
536
537 Ok(())
538 }
539}
540
541#[derive(Debug, Default, Clone, Serialize, Deserialize)]
543#[serde(rename_all = "snake_case", deny_unknown_fields)]
544pub struct GoogleStorageAuthConfig {
545 pub access_key: String,
547 pub secret: SecretString,
549}
550
551impl GoogleStorageAuthConfig {
552 pub fn validate(&self) -> Result<()> {
554 if self.access_key.is_empty() {
555 bail!("configuration value `storage.google.auth.access_key` is required");
556 }
557
558 if self.secret.inner.expose_secret().is_empty() {
559 bail!("configuration value `storage.google.auth.secret` is required");
560 }
561
562 Ok(())
563 }
564
565 pub fn redact(&mut self) {
568 self.secret.redact();
569 }
570
571 pub fn unredact(&mut self) {
574 self.secret.unredact();
575 }
576}
577
578#[derive(Debug, Default, Clone, Serialize, Deserialize)]
580#[serde(rename_all = "snake_case", deny_unknown_fields)]
581pub struct GoogleStorageConfig {
582 #[serde(default, skip_serializing_if = "Option::is_none")]
584 pub auth: Option<GoogleStorageAuthConfig>,
585}
586
587impl GoogleStorageConfig {
588 pub fn validate(&self) -> Result<()> {
590 if let Some(auth) = &self.auth {
591 auth.validate()?;
592 }
593
594 Ok(())
595 }
596}
597
598#[derive(Debug, Default, Clone, Serialize, Deserialize)]
600#[serde(rename_all = "snake_case", deny_unknown_fields)]
601pub struct WorkflowConfig {
602 #[serde(default)]
604 pub scatter: ScatterConfig,
605}
606
607impl WorkflowConfig {
608 pub fn validate(&self) -> Result<()> {
610 self.scatter.validate()?;
611 Ok(())
612 }
613}
614
615#[derive(Debug, Default, Clone, Serialize, Deserialize)]
617#[serde(rename_all = "snake_case", deny_unknown_fields)]
618pub struct ScatterConfig {
619 #[serde(default, skip_serializing_if = "Option::is_none")]
672 pub concurrency: Option<u64>,
673}
674
675impl ScatterConfig {
676 pub fn validate(&self) -> Result<()> {
678 if let Some(concurrency) = self.concurrency
679 && concurrency == 0
680 {
681 bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
682 }
683
684 Ok(())
685 }
686}
687
688#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
690#[serde(rename_all = "snake_case")]
691pub enum CallCachingMode {
692 #[default]
699 Off,
700 On,
706 Explicit,
714}
715
716#[derive(Debug, Default, Clone, Serialize, Deserialize)]
718#[serde(rename_all = "snake_case", deny_unknown_fields)]
719pub struct TaskConfig {
720 #[serde(default, skip_serializing_if = "Option::is_none")]
726 pub retries: Option<u64>,
727 #[serde(default, skip_serializing_if = "Option::is_none")]
732 pub container: Option<String>,
733 #[serde(default, skip_serializing_if = "Option::is_none")]
741 pub shell: Option<String>,
742 #[serde(default)]
744 pub cpu_limit_behavior: TaskResourceLimitBehavior,
745 #[serde(default)]
747 pub memory_limit_behavior: TaskResourceLimitBehavior,
748 #[serde(default, skip_serializing_if = "Option::is_none")]
752 pub cache_dir: Option<PathBuf>,
753 #[serde(default)]
755 pub cache: CallCachingMode,
756}
757
758impl TaskConfig {
759 pub fn validate(&self) -> Result<()> {
761 if self.retries.unwrap_or(0) > MAX_RETRIES {
762 bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
763 }
764
765 Ok(())
766 }
767}
768
769#[derive(Debug, Default, Clone, Serialize, Deserialize)]
772#[serde(rename_all = "snake_case", deny_unknown_fields)]
773pub enum TaskResourceLimitBehavior {
774 TryWithMax,
777 #[default]
781 Deny,
782}
783
784#[derive(Debug, Clone, Serialize, Deserialize)]
786#[serde(rename_all = "snake_case", tag = "type")]
787pub enum BackendConfig {
788 Local(LocalBackendConfig),
790 Docker(DockerBackendConfig),
792 Tes(Box<TesBackendConfig>),
794 LsfApptainer(Arc<LsfApptainerBackendConfig>),
798 SlurmApptainer(Arc<SlurmApptainerBackendConfig>),
802}
803
804impl Default for BackendConfig {
805 fn default() -> Self {
806 Self::Docker(Default::default())
807 }
808}
809
810impl BackendConfig {
811 pub async fn validate(&self, engine_config: &Config) -> Result<()> {
813 match self {
814 Self::Local(config) => config.validate(),
815 Self::Docker(config) => config.validate(),
816 Self::Tes(config) => config.validate(),
817 Self::LsfApptainer(config) => config.validate(engine_config).await,
818 Self::SlurmApptainer(config) => config.validate(engine_config).await,
819 }
820 }
821
822 pub fn as_local(&self) -> Option<&LocalBackendConfig> {
826 match self {
827 Self::Local(config) => Some(config),
828 _ => None,
829 }
830 }
831
832 pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
836 match self {
837 Self::Docker(config) => Some(config),
838 _ => None,
839 }
840 }
841
842 pub fn as_tes(&self) -> Option<&TesBackendConfig> {
846 match self {
847 Self::Tes(config) => Some(config),
848 _ => None,
849 }
850 }
851
852 pub fn redact(&mut self) {
854 match self {
855 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
856 Self::Tes(config) => config.redact(),
857 }
858 }
859
860 pub fn unredact(&mut self) {
862 match self {
863 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
864 Self::Tes(config) => config.unredact(),
865 }
866 }
867}
868
869#[derive(Debug, Default, Clone, Serialize, Deserialize)]
876#[serde(rename_all = "snake_case", deny_unknown_fields)]
877pub struct LocalBackendConfig {
878 #[serde(default, skip_serializing_if = "Option::is_none")]
884 pub cpu: Option<u64>,
885
886 #[serde(default, skip_serializing_if = "Option::is_none")]
893 pub memory: Option<String>,
894}
895
896impl LocalBackendConfig {
897 pub fn validate(&self) -> Result<()> {
899 if let Some(cpu) = self.cpu {
900 if cpu == 0 {
901 bail!("local backend configuration value `cpu` cannot be zero");
902 }
903
904 let total = SYSTEM.cpus().len() as u64;
905 if cpu > total {
906 bail!(
907 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
908 available to the host ({total})"
909 );
910 }
911 }
912
913 if let Some(memory) = &self.memory {
914 let memory = convert_unit_string(memory).with_context(|| {
915 format!("local backend configuration value `memory` has invalid value `{memory}`")
916 })?;
917
918 if memory == 0 {
919 bail!("local backend configuration value `memory` cannot be zero");
920 }
921
922 let total = SYSTEM.total_memory();
923 if memory > total {
924 bail!(
925 "local backend configuration value `memory` cannot exceed the total memory of \
926 the host ({total} bytes)"
927 );
928 }
929 }
930
931 Ok(())
932 }
933}
934
935const fn cleanup_default() -> bool {
937 true
938}
939
940#[derive(Debug, Clone, Serialize, Deserialize)]
942#[serde(rename_all = "snake_case", deny_unknown_fields)]
943pub struct DockerBackendConfig {
944 #[serde(default = "cleanup_default")]
948 pub cleanup: bool,
949}
950
951impl DockerBackendConfig {
952 pub fn validate(&self) -> Result<()> {
954 Ok(())
955 }
956}
957
958impl Default for DockerBackendConfig {
959 fn default() -> Self {
960 Self { cleanup: true }
961 }
962}
963
964#[derive(Debug, Default, Clone, Serialize, Deserialize)]
966#[serde(rename_all = "snake_case", deny_unknown_fields)]
967pub struct BasicAuthConfig {
968 #[serde(default)]
970 pub username: String,
971 #[serde(default)]
973 pub password: SecretString,
974}
975
976impl BasicAuthConfig {
977 pub fn validate(&self) -> Result<()> {
979 Ok(())
980 }
981
982 pub fn redact(&mut self) {
984 self.password.redact();
985 }
986
987 pub fn unredact(&mut self) {
989 self.password.unredact();
990 }
991}
992
993#[derive(Debug, Default, Clone, Serialize, Deserialize)]
995#[serde(rename_all = "snake_case", deny_unknown_fields)]
996pub struct BearerAuthConfig {
997 #[serde(default)]
999 pub token: SecretString,
1000}
1001
1002impl BearerAuthConfig {
1003 pub fn validate(&self) -> Result<()> {
1005 Ok(())
1006 }
1007
1008 pub fn redact(&mut self) {
1010 self.token.redact();
1011 }
1012
1013 pub fn unredact(&mut self) {
1015 self.token.unredact();
1016 }
1017}
1018
1019#[derive(Debug, Clone, Serialize, Deserialize)]
1021#[serde(rename_all = "snake_case", tag = "type")]
1022pub enum TesBackendAuthConfig {
1023 Basic(BasicAuthConfig),
1025 Bearer(BearerAuthConfig),
1027}
1028
1029impl TesBackendAuthConfig {
1030 pub fn validate(&self) -> Result<()> {
1032 match self {
1033 Self::Basic(config) => config.validate(),
1034 Self::Bearer(config) => config.validate(),
1035 }
1036 }
1037
1038 pub fn redact(&mut self) {
1041 match self {
1042 Self::Basic(auth) => auth.redact(),
1043 Self::Bearer(auth) => auth.redact(),
1044 }
1045 }
1046
1047 pub fn unredact(&mut self) {
1050 match self {
1051 Self::Basic(auth) => auth.unredact(),
1052 Self::Bearer(auth) => auth.unredact(),
1053 }
1054 }
1055}
1056
1057#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1059#[serde(rename_all = "snake_case", deny_unknown_fields)]
1060pub struct TesBackendConfig {
1061 #[serde(default, skip_serializing_if = "Option::is_none")]
1063 pub url: Option<Url>,
1064
1065 #[serde(default, skip_serializing_if = "Option::is_none")]
1067 pub auth: Option<TesBackendAuthConfig>,
1068
1069 #[serde(default, skip_serializing_if = "Option::is_none")]
1071 pub inputs: Option<Url>,
1072
1073 #[serde(default, skip_serializing_if = "Option::is_none")]
1075 pub outputs: Option<Url>,
1076
1077 #[serde(default, skip_serializing_if = "Option::is_none")]
1081 pub interval: Option<u64>,
1082
1083 pub retries: Option<u32>,
1088
1089 #[serde(default, skip_serializing_if = "Option::is_none")]
1094 pub max_concurrency: Option<u32>,
1095
1096 #[serde(default)]
1099 pub insecure: bool,
1100}
1101
1102impl TesBackendConfig {
1103 pub fn validate(&self) -> Result<()> {
1105 match &self.url {
1106 Some(url) => {
1107 if !self.insecure && url.scheme() != "https" {
1108 bail!(
1109 "TES backend configuration value `url` has invalid value `{url}`: URL \
1110 must use a HTTPS scheme"
1111 );
1112 }
1113 }
1114 None => bail!("TES backend configuration value `url` is required"),
1115 }
1116
1117 if let Some(auth) = &self.auth {
1118 auth.validate()?;
1119 }
1120
1121 if let Some(max_concurrency) = self.max_concurrency
1122 && max_concurrency == 0
1123 {
1124 bail!("TES backend configuration value `max_concurrency` cannot be zero");
1125 }
1126
1127 match &self.inputs {
1128 Some(url) => {
1129 if !is_supported_url(url.as_str()) {
1130 bail!(
1131 "TES backend storage configuration value `inputs` has invalid value \
1132 `{url}`: URL scheme is not supported"
1133 );
1134 }
1135
1136 if !url.path().ends_with('/') {
1137 bail!(
1138 "TES backend storage configuration value `inputs` has invalid value \
1139 `{url}`: URL path must end with a slash"
1140 );
1141 }
1142 }
1143 None => bail!("TES backend configuration value `inputs` is required"),
1144 }
1145
1146 match &self.outputs {
1147 Some(url) => {
1148 if !is_supported_url(url.as_str()) {
1149 bail!(
1150 "TES backend storage configuration value `outputs` has invalid value \
1151 `{url}`: URL scheme is not supported"
1152 );
1153 }
1154
1155 if !url.path().ends_with('/') {
1156 bail!(
1157 "TES backend storage configuration value `outputs` has invalid value \
1158 `{url}`: URL path must end with a slash"
1159 );
1160 }
1161 }
1162 None => bail!("TES backend storage configuration value `outputs` is required"),
1163 }
1164
1165 Ok(())
1166 }
1167
1168 pub fn redact(&mut self) {
1170 if let Some(auth) = &mut self.auth {
1171 auth.redact();
1172 }
1173 }
1174
1175 pub fn unredact(&mut self) {
1177 if let Some(auth) = &mut self.auth {
1178 auth.unredact();
1179 }
1180 }
1181}
1182
1183#[cfg(test)]
1184mod test {
1185 use pretty_assertions::assert_eq;
1186
1187 use super::*;
1188
1189 #[test]
1190 fn redacted_secret() {
1191 let mut secret: SecretString = "secret".into();
1192
1193 assert_eq!(
1194 serde_json::to_string(&secret).unwrap(),
1195 format!(r#""{REDACTED}""#)
1196 );
1197
1198 secret.unredact();
1199 assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1200
1201 secret.redact();
1202 assert_eq!(
1203 serde_json::to_string(&secret).unwrap(),
1204 format!(r#""{REDACTED}""#)
1205 );
1206 }
1207
1208 #[test]
1209 fn redacted_config() {
1210 let config = Config {
1211 backends: [
1212 (
1213 "first".to_string(),
1214 BackendConfig::Tes(
1215 TesBackendConfig {
1216 auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1217 username: "foo".into(),
1218 password: "secret".into(),
1219 })),
1220 ..Default::default()
1221 }
1222 .into(),
1223 ),
1224 ),
1225 (
1226 "second".to_string(),
1227 BackendConfig::Tes(
1228 TesBackendConfig {
1229 auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1230 token: "secret".into(),
1231 })),
1232 ..Default::default()
1233 }
1234 .into(),
1235 ),
1236 ),
1237 ]
1238 .into(),
1239 storage: StorageConfig {
1240 azure: AzureStorageConfig {
1241 auth: Some(AzureStorageAuthConfig {
1242 account_name: "foo".into(),
1243 access_key: "secret".into(),
1244 }),
1245 },
1246 s3: S3StorageConfig {
1247 auth: Some(S3StorageAuthConfig {
1248 access_key_id: "foo".into(),
1249 secret_access_key: "secret".into(),
1250 }),
1251 ..Default::default()
1252 },
1253 google: GoogleStorageConfig {
1254 auth: Some(GoogleStorageAuthConfig {
1255 access_key: "foo".into(),
1256 secret: "secret".into(),
1257 }),
1258 },
1259 },
1260 ..Default::default()
1261 };
1262
1263 let json = serde_json::to_string_pretty(&config).unwrap();
1264 assert!(json.contains("secret"), "`{json}` contains a secret");
1265 }
1266
1267 #[tokio::test]
1268 async fn test_config_validate() {
1269 let mut config = Config::default();
1271 config.task.retries = Some(1000000);
1272 assert_eq!(
1273 config.validate().await.unwrap_err().to_string(),
1274 "configuration value `task.retries` cannot exceed 100"
1275 );
1276
1277 let mut config = Config::default();
1279 config.workflow.scatter.concurrency = Some(0);
1280 assert_eq!(
1281 config.validate().await.unwrap_err().to_string(),
1282 "configuration value `workflow.scatter.concurrency` cannot be zero"
1283 );
1284
1285 let config = Config {
1287 backend: Some("foo".into()),
1288 ..Default::default()
1289 };
1290 assert_eq!(
1291 config.validate().await.unwrap_err().to_string(),
1292 "a backend named `foo` is not present in the configuration"
1293 );
1294 let config = Config {
1295 backend: Some("bar".into()),
1296 backends: [("foo".to_string(), BackendConfig::default())].into(),
1297 ..Default::default()
1298 };
1299 assert_eq!(
1300 config.validate().await.unwrap_err().to_string(),
1301 "a backend named `bar` is not present in the configuration"
1302 );
1303
1304 let config = Config {
1306 backends: [("foo".to_string(), BackendConfig::default())].into(),
1307 ..Default::default()
1308 };
1309 config.validate().await.expect("config should validate");
1310
1311 let config = Config {
1313 backends: [(
1314 "default".to_string(),
1315 BackendConfig::Local(LocalBackendConfig {
1316 cpu: Some(0),
1317 ..Default::default()
1318 }),
1319 )]
1320 .into(),
1321 ..Default::default()
1322 };
1323 assert_eq!(
1324 config.validate().await.unwrap_err().to_string(),
1325 "local backend configuration value `cpu` cannot be zero"
1326 );
1327 let config = Config {
1328 backends: [(
1329 "default".to_string(),
1330 BackendConfig::Local(LocalBackendConfig {
1331 cpu: Some(10000000),
1332 ..Default::default()
1333 }),
1334 )]
1335 .into(),
1336 ..Default::default()
1337 };
1338 assert!(
1339 config
1340 .validate()
1341 .await
1342 .unwrap_err()
1343 .to_string()
1344 .starts_with(
1345 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1346 available to the host"
1347 )
1348 );
1349
1350 let config = Config {
1352 backends: [(
1353 "default".to_string(),
1354 BackendConfig::Local(LocalBackendConfig {
1355 memory: Some("0 GiB".to_string()),
1356 ..Default::default()
1357 }),
1358 )]
1359 .into(),
1360 ..Default::default()
1361 };
1362 assert_eq!(
1363 config.validate().await.unwrap_err().to_string(),
1364 "local backend configuration value `memory` cannot be zero"
1365 );
1366 let config = Config {
1367 backends: [(
1368 "default".to_string(),
1369 BackendConfig::Local(LocalBackendConfig {
1370 memory: Some("100 meows".to_string()),
1371 ..Default::default()
1372 }),
1373 )]
1374 .into(),
1375 ..Default::default()
1376 };
1377 assert_eq!(
1378 config.validate().await.unwrap_err().to_string(),
1379 "local backend configuration value `memory` has invalid value `100 meows`"
1380 );
1381
1382 let config = Config {
1383 backends: [(
1384 "default".to_string(),
1385 BackendConfig::Local(LocalBackendConfig {
1386 memory: Some("1000 TiB".to_string()),
1387 ..Default::default()
1388 }),
1389 )]
1390 .into(),
1391 ..Default::default()
1392 };
1393 assert!(
1394 config
1395 .validate()
1396 .await
1397 .unwrap_err()
1398 .to_string()
1399 .starts_with(
1400 "local backend configuration value `memory` cannot exceed the total memory of \
1401 the host"
1402 )
1403 );
1404
1405 let config = Config {
1407 backends: [(
1408 "default".to_string(),
1409 BackendConfig::Tes(Default::default()),
1410 )]
1411 .into(),
1412 ..Default::default()
1413 };
1414 assert_eq!(
1415 config.validate().await.unwrap_err().to_string(),
1416 "TES backend configuration value `url` is required"
1417 );
1418
1419 let config = Config {
1421 backends: [(
1422 "default".to_string(),
1423 BackendConfig::Tes(
1424 TesBackendConfig {
1425 url: Some("https://example.com".parse().unwrap()),
1426 max_concurrency: Some(0),
1427 ..Default::default()
1428 }
1429 .into(),
1430 ),
1431 )]
1432 .into(),
1433 ..Default::default()
1434 };
1435 assert_eq!(
1436 config.validate().await.unwrap_err().to_string(),
1437 "TES backend configuration value `max_concurrency` cannot be zero"
1438 );
1439
1440 let config = Config {
1442 backends: [(
1443 "default".to_string(),
1444 BackendConfig::Tes(
1445 TesBackendConfig {
1446 url: Some("http://example.com".parse().unwrap()),
1447 inputs: Some("http://example.com".parse().unwrap()),
1448 outputs: Some("http://example.com".parse().unwrap()),
1449 ..Default::default()
1450 }
1451 .into(),
1452 ),
1453 )]
1454 .into(),
1455 ..Default::default()
1456 };
1457 assert_eq!(
1458 config.validate().await.unwrap_err().to_string(),
1459 "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
1460 must use a HTTPS scheme"
1461 );
1462
1463 let config = Config {
1465 backends: [(
1466 "default".to_string(),
1467 BackendConfig::Tes(
1468 TesBackendConfig {
1469 url: Some("http://example.com".parse().unwrap()),
1470 inputs: Some("http://example.com".parse().unwrap()),
1471 outputs: Some("http://example.com".parse().unwrap()),
1472 insecure: true,
1473 ..Default::default()
1474 }
1475 .into(),
1476 ),
1477 )]
1478 .into(),
1479 ..Default::default()
1480 };
1481 config
1482 .validate()
1483 .await
1484 .expect("configuration should validate");
1485
1486 let mut config = Config::default();
1487 config.http.parallelism = Some(0);
1488 assert_eq!(
1489 config.validate().await.unwrap_err().to_string(),
1490 "configuration value `http.parallelism` cannot be zero"
1491 );
1492
1493 let mut config = Config::default();
1494 config.http.parallelism = Some(5);
1495 assert!(
1496 config.validate().await.is_ok(),
1497 "should pass for valid configuration"
1498 );
1499
1500 let mut config = Config::default();
1501 config.http.parallelism = None;
1502 assert!(
1503 config.validate().await.is_ok(),
1504 "should pass for default (None)"
1505 );
1506 }
1507}