1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::path::Path;
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use anyhow::bail;
14use anyhow::ensure;
15use bytesize::ByteSize;
16use indexmap::IndexMap;
17use secrecy::ExposeSecret;
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::process::Command;
21use tracing::error;
22use tracing::warn;
23use url::Url;
24
25use crate::CancellationContext;
26use crate::Events;
27use crate::SYSTEM;
28use crate::Value;
29use crate::backend::TaskExecutionBackend;
30use crate::convert_unit_string;
31use crate::path::is_supported_url;
32
33pub(crate) const MAX_RETRIES: u64 = 100;
35
36pub(crate) const DEFAULT_TASK_SHELL: &str = "bash";
38
39pub(crate) const DEFAULT_TASK_CONTAINER: &str = "ubuntu:latest";
41
42const DEFAULT_BACKEND_NAME: &str = "default";
44
45const MAX_LSF_JOB_NAME_PREFIX: usize = 100;
47
48const REDACTED: &str = "<REDACTED>";
50
51const CACHE_DIR_SENTINEL: &str = "system";
53
54pub(crate) fn cache_dir() -> Result<PathBuf> {
56 const CACHE_DIR_ROOT: &str = "sprocket";
58
59 Ok(dirs::cache_dir()
60 .context("failed to determine user cache directory")?
61 .join(CACHE_DIR_ROOT))
62}
63
64fn is_default_shell(shell: &str) -> bool {
66 shell == DEFAULT_TASK_SHELL
67}
68
69fn get_default_shell() -> String {
71 DEFAULT_TASK_SHELL.to_string()
72}
73
74fn get_default_container() -> String {
76 DEFAULT_TASK_CONTAINER.to_string()
77}
78
79fn get_default_backend_name() -> String {
81 DEFAULT_BACKEND_NAME.to_string()
82}
83
84fn get_sentinel_cache_dir() -> String {
86 CACHE_DIR_SENTINEL.to_string()
87}
88
89#[derive(Debug, Clone)]
93pub struct SecretString {
94 inner: secrecy::SecretString,
98 redacted: bool,
105}
106
107impl SecretString {
108 pub fn redact(&mut self) {
113 self.redacted = true;
114 }
115
116 pub fn unredact(&mut self) {
118 self.redacted = false;
119 }
120
121 pub fn inner(&self) -> &secrecy::SecretString {
123 &self.inner
124 }
125}
126
127impl From<String> for SecretString {
128 fn from(s: String) -> Self {
129 Self {
130 inner: s.into(),
131 redacted: true,
132 }
133 }
134}
135
136impl From<&str> for SecretString {
137 fn from(s: &str) -> Self {
138 Self {
139 inner: s.into(),
140 redacted: true,
141 }
142 }
143}
144
145impl Default for SecretString {
146 fn default() -> Self {
147 Self {
148 inner: Default::default(),
149 redacted: true,
150 }
151 }
152}
153
154impl serde::Serialize for SecretString {
155 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
156 where
157 S: serde::Serializer,
158 {
159 use secrecy::ExposeSecret;
160
161 if self.redacted {
162 serializer.serialize_str(REDACTED)
163 } else {
164 serializer.serialize_str(self.inner.expose_secret())
165 }
166 }
167}
168
169impl<'de> serde::Deserialize<'de> for SecretString {
170 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
171 where
172 D: serde::Deserializer<'de>,
173 {
174 let inner = secrecy::SecretString::deserialize(deserializer)?;
175 Ok(Self {
176 inner,
177 redacted: true,
178 })
179 }
180}
181
182#[macro_export]
187macro_rules! nullable_config_type {
188 (
189 $name:ident,
190 $inner:ty,
191 $sentinel:literal,
192 $value:ident,
193 $validation:expr,
194 $expected:literal,
195 $default:expr
196 ) => {
197 #[doc = concat!("Configuration for [`", stringify!($name), "`].")]
198 #[derive(Clone, Debug)]
199 pub struct $name(Option<$inner>);
200
201 impl $name {
202 #[doc = concat!("Get the inner [`", stringify!($inner), "`].")]
203 pub fn inner(&self) -> Option<&$inner> {
204 self.0.as_ref()
205 }
206
207 #[doc = concat!("Try to create a new `", stringify!($name), "` from a `", stringify!($inner), "`.")]
208 pub fn try_new(val: Option<$inner>) -> std::result::Result<Self, anyhow::Error> {
209 match val {
210 None => Ok(Self(None)),
211 Some($value) if $validation => Ok(Self(Some($value))),
212 Some($value) => Err(anyhow::anyhow!(format!(
213 "expected {}, got `{}`",
214 $expected, $value
215 ))),
216 }
217 }
218 }
219
220 impl Default for $name {
221 fn default() -> Self {
222 Self($default)
223 }
224 }
225
226 impl Serialize for $name {
227 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
228 where
229 S: serde::Serializer,
230 {
231 match self {
232 $name(None) => $sentinel.serialize(serializer),
233 $name(Some(i)) => i.serialize(serializer),
234 }
235 }
236 }
237
238 impl<'de> Deserialize<'de> for $name {
239 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
240 where
241 D: serde::Deserializer<'de>,
242 {
243 #[derive(Deserialize)]
244 #[serde(untagged)]
245 enum Value {
246 Inner($inner),
247 Str(String),
248 Null,
249 }
250
251 match Value::deserialize(deserializer)? {
252 Value::Inner(i) => $name::try_new(Some(i)).map_err(serde::de::Error::custom),
253 Value::Str(s) if s == $sentinel => Ok($name(None)),
254 Value::Str($value) => Err(serde::de::Error::custom(format!(
255 "expected {} or `{}`, got `{}`",
256 $expected, $sentinel, $value
257 ))),
258 Value::Null => Ok($name(None)),
259 }
260 }
261 }
262 };
263}
264
265#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
268#[serde(rename_all = "snake_case")]
269pub enum FailureMode {
270 #[default]
273 Slow,
274 Fast,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
291#[serde(rename_all = "snake_case", deny_unknown_fields)]
292pub struct Config {
293 #[serde(default)]
295 pub http: HttpConfig,
296 #[serde(default)]
298 pub workflow: WorkflowConfig,
299 #[serde(default)]
301 pub task: TaskConfig,
302 #[serde(default = "get_default_backend_name")]
304 pub backend: String,
305 #[serde(default)]
310 pub backends: IndexMap<String, BackendConfig>,
311 #[serde(default)]
313 pub storage: StorageConfig,
314 #[serde(default)]
328 pub suppress_env_specific_output: bool,
329 #[serde(default)]
336 pub experimental_features_enabled: bool,
337 #[serde(default, rename = "fail")]
345 pub failure_mode: FailureMode,
346}
347
348impl Default for Config {
349 fn default() -> Self {
350 Self {
351 http: Default::default(),
352 workflow: Default::default(),
353 task: Default::default(),
354 backend: get_default_backend_name(),
355 backends: Default::default(),
356 storage: Default::default(),
357 suppress_env_specific_output: Default::default(),
358 experimental_features_enabled: Default::default(),
359 failure_mode: Default::default(),
360 }
361 }
362}
363
364impl Config {
365 pub async fn validate(&self) -> Result<()> {
367 self.http.validate()?;
368 self.workflow.validate()?;
369 self.task.validate()?;
370
371 if self.backends.is_empty() && self.backend == DEFAULT_BACKEND_NAME {
372 } else {
374 let backend = &self.backend;
375 if !self.backends.contains_key(backend) {
376 bail!("a backend named `{backend}` is not present in the configuration");
377 }
378 }
379
380 for backend in self.backends.values() {
381 backend.validate(self).await?;
382 }
383
384 self.storage.validate()?;
385
386 if self.suppress_env_specific_output && !self.experimental_features_enabled {
387 bail!("`suppress_env_specific_output` requires enabling experimental features");
388 }
389
390 Ok(())
391 }
392
393 pub fn redact(&mut self) {
397 for backend in self.backends.values_mut() {
398 backend.redact();
399 }
400
401 if let Some(auth) = &mut self.storage.azure.auth {
402 auth.redact();
403 }
404
405 if let Some(auth) = &mut self.storage.s3.auth {
406 auth.redact();
407 }
408
409 if let Some(auth) = &mut self.storage.google.auth {
410 auth.redact();
411 }
412 }
413
414 pub fn unredact(&mut self) {
418 for backend in self.backends.values_mut() {
419 backend.unredact();
420 }
421
422 if let Some(auth) = &mut self.storage.azure.auth {
423 auth.unredact();
424 }
425
426 if let Some(auth) = &mut self.storage.s3.auth {
427 auth.unredact();
428 }
429
430 if let Some(auth) = &mut self.storage.google.auth {
431 auth.unredact();
432 }
433 }
434
435 pub fn backend(&self) -> Result<Cow<'_, BackendConfig>> {
440 if !self.backends.is_empty() {
441 let backend = &self.backend;
442 return Ok(Cow::Borrowed(self.backends.get(backend).ok_or_else(
443 || anyhow!("a backend named `{backend}` is not present in the configuration"),
444 )?));
445 }
446 Ok(Cow::Owned(BackendConfig::default()))
448 }
449
450 pub(crate) async fn create_backend(
452 self: &Arc<Self>,
453 run_root_dir: &Path,
454 events: Events,
455 cancellation: CancellationContext,
456 ) -> Result<Arc<dyn TaskExecutionBackend>> {
457 use crate::backend::*;
458
459 match self.backend()?.as_ref() {
460 BackendConfig::Local(_) => {
461 warn!(
462 "the engine is configured to use the local backend: tasks will not be run \
463 inside of a container"
464 );
465 Ok(Arc::new(LocalBackend::new(
466 self.clone(),
467 events,
468 cancellation,
469 )?))
470 }
471 BackendConfig::Docker(_) => Ok(Arc::new(
472 DockerBackend::new(self.clone(), events, cancellation).await?,
473 )),
474 BackendConfig::Tes(_) => Ok(Arc::new(
475 TesBackend::new(self.clone(), events, cancellation).await?,
476 )),
477 BackendConfig::LsfApptainer(_) => Ok(Arc::new(LsfApptainerBackend::new(
478 self.clone(),
479 run_root_dir,
480 events,
481 cancellation,
482 )?)),
483 BackendConfig::SlurmApptainer(_) => Ok(Arc::new(SlurmApptainerBackend::new(
484 self.clone(),
485 run_root_dir,
486 events,
487 cancellation,
488 )?)),
489 }
490 }
491}
492
493#[derive(Debug, Clone, Serialize, Deserialize)]
495#[serde(rename_all = "snake_case", deny_unknown_fields)]
496pub struct HttpConfig {
497 #[serde(default = "get_sentinel_cache_dir")]
501 pub cache_dir: String,
502 pub retries: usize,
504 pub parallelism: Parallelism,
508}
509
510nullable_config_type!(
511 Parallelism,
512 usize,
513 "available",
514 value,
515 value > 0,
516 "a positive number",
517 None
518);
519
520impl Default for HttpConfig {
521 fn default() -> Self {
522 Self {
523 cache_dir: get_sentinel_cache_dir(),
524 retries: 5, parallelism: Default::default(),
526 }
527 }
528}
529
530impl HttpConfig {
531 pub fn validate(&self) -> Result<()> {
533 if let Some(parallelism) = self.parallelism.inner()
534 && *parallelism == 0
535 {
536 bail!("configuration value `http.parallelism` cannot be zero");
537 }
538 Ok(())
539 }
540
541 pub fn cache_dir(&self) -> Result<PathBuf> {
543 const DOWNLOADS_CACHE_SUBDIR: &str = "downloads";
544
545 if self.using_system_cache_dir() {
546 cache_dir().map(|d| d.join(DOWNLOADS_CACHE_SUBDIR))
547 } else {
548 Ok(PathBuf::from(&self.cache_dir))
549 }
550 }
551
552 pub fn using_system_cache_dir(&self) -> bool {
554 self.cache_dir == CACHE_DIR_SENTINEL
555 }
556}
557
558#[derive(Debug, Default, Clone, Serialize, Deserialize)]
560#[serde(rename_all = "snake_case", deny_unknown_fields)]
561pub struct StorageConfig {
562 #[serde(default)]
564 pub azure: AzureStorageConfig,
565 #[serde(default)]
567 pub s3: S3StorageConfig,
568 #[serde(default)]
570 pub google: GoogleStorageConfig,
571}
572
573impl StorageConfig {
574 pub fn validate(&self) -> Result<()> {
576 self.azure.validate()?;
577 self.s3.validate()?;
578 self.google.validate()?;
579 Ok(())
580 }
581}
582
583#[derive(Debug, Default, Clone, Serialize, Deserialize)]
585#[serde(rename_all = "snake_case", deny_unknown_fields)]
586pub struct AzureStorageAuthConfig {
587 pub account_name: String,
589 pub access_key: SecretString,
591}
592
593impl AzureStorageAuthConfig {
594 pub fn validate(&self) -> Result<()> {
596 if self.account_name.is_empty() {
597 bail!("configuration value `storage.azure.auth.account_name` is required");
598 }
599
600 if self.access_key.inner.expose_secret().is_empty() {
601 bail!("configuration value `storage.azure.auth.access_key` is required");
602 }
603
604 Ok(())
605 }
606
607 pub fn redact(&mut self) {
610 self.access_key.redact();
611 }
612
613 pub fn unredact(&mut self) {
616 self.access_key.unredact();
617 }
618}
619
620#[derive(Debug, Default, Clone, Serialize, Deserialize)]
622#[serde(rename_all = "snake_case", deny_unknown_fields)]
623pub struct AzureStorageConfig {
624 #[serde(default, skip_serializing_if = "Option::is_none")]
626 pub auth: Option<AzureStorageAuthConfig>,
627}
628
629impl AzureStorageConfig {
630 pub fn validate(&self) -> Result<()> {
632 if let Some(auth) = &self.auth {
633 auth.validate()?;
634 }
635
636 Ok(())
637 }
638}
639
640#[derive(Debug, Default, Clone, Serialize, Deserialize)]
642#[serde(rename_all = "snake_case", deny_unknown_fields)]
643pub struct S3StorageAuthConfig {
644 pub access_key_id: String,
646 pub secret_access_key: SecretString,
648}
649
650impl S3StorageAuthConfig {
651 pub fn validate(&self) -> Result<()> {
653 if self.access_key_id.is_empty() {
654 bail!("configuration value `storage.s3.auth.access_key_id` is required");
655 }
656
657 if self.secret_access_key.inner.expose_secret().is_empty() {
658 bail!("configuration value `storage.s3.auth.secret_access_key` is required");
659 }
660
661 Ok(())
662 }
663
664 pub fn redact(&mut self) {
667 self.secret_access_key.redact();
668 }
669
670 pub fn unredact(&mut self) {
673 self.secret_access_key.unredact();
674 }
675}
676
677#[derive(Debug, Default, Clone, Serialize, Deserialize)]
679#[serde(rename_all = "snake_case", deny_unknown_fields)]
680pub struct S3StorageConfig {
681 #[serde(default, skip_serializing_if = "Option::is_none")]
686 pub region: Option<String>,
687
688 #[serde(default, skip_serializing_if = "Option::is_none")]
690 pub auth: Option<S3StorageAuthConfig>,
691}
692
693impl S3StorageConfig {
694 pub fn validate(&self) -> Result<()> {
696 if let Some(auth) = &self.auth {
697 auth.validate()?;
698 }
699
700 Ok(())
701 }
702}
703
704#[derive(Debug, Default, Clone, Serialize, Deserialize)]
706#[serde(rename_all = "snake_case", deny_unknown_fields)]
707pub struct GoogleStorageAuthConfig {
708 pub access_key: String,
710 pub secret: SecretString,
712}
713
714impl GoogleStorageAuthConfig {
715 pub fn validate(&self) -> Result<()> {
717 if self.access_key.is_empty() {
718 bail!("configuration value `storage.google.auth.access_key` is required");
719 }
720
721 if self.secret.inner.expose_secret().is_empty() {
722 bail!("configuration value `storage.google.auth.secret` is required");
723 }
724
725 Ok(())
726 }
727
728 pub fn redact(&mut self) {
731 self.secret.redact();
732 }
733
734 pub fn unredact(&mut self) {
737 self.secret.unredact();
738 }
739}
740
741#[derive(Debug, Default, Clone, Serialize, Deserialize)]
743#[serde(rename_all = "snake_case", deny_unknown_fields)]
744pub struct GoogleStorageConfig {
745 #[serde(default, skip_serializing_if = "Option::is_none")]
747 pub auth: Option<GoogleStorageAuthConfig>,
748}
749
750impl GoogleStorageConfig {
751 pub fn validate(&self) -> Result<()> {
753 if let Some(auth) = &self.auth {
754 auth.validate()?;
755 }
756
757 Ok(())
758 }
759}
760
761#[derive(Debug, Default, Clone, Serialize, Deserialize)]
763#[serde(rename_all = "snake_case", deny_unknown_fields)]
764pub struct WorkflowConfig {
765 #[serde(default)]
767 pub scatter: ScatterConfig,
768}
769
770impl WorkflowConfig {
771 pub fn validate(&self) -> Result<()> {
773 self.scatter.validate()?;
774 Ok(())
775 }
776}
777
778const DEFAULT_SCATTER_CONCURRENCY: u64 = 1000;
781
782#[derive(Debug, Clone, Serialize, Deserialize)]
784#[serde(rename_all = "snake_case", deny_unknown_fields)]
785pub struct ScatterConfig {
786 pub concurrency: u64,
839}
840
841impl Default for ScatterConfig {
842 fn default() -> Self {
843 Self {
844 concurrency: DEFAULT_SCATTER_CONCURRENCY,
845 }
846 }
847}
848
849impl ScatterConfig {
850 pub fn validate(&self) -> Result<()> {
852 if self.concurrency == 0 {
853 bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
854 }
855
856 Ok(())
857 }
858}
859
860#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
862#[serde(rename_all = "snake_case")]
863pub enum CallCachingMode {
864 #[default]
871 Off,
872 On,
878 Explicit,
886}
887
888#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
890#[serde(rename_all = "snake_case")]
891pub enum ContentDigestMode {
892 Strong,
899 #[default]
910 Weak,
911}
912
913#[derive(Debug, Clone, Serialize, Deserialize)]
915#[serde(rename_all = "snake_case", deny_unknown_fields)]
916pub struct TaskConfig {
917 pub retries: Retries,
921 #[serde(default = "get_default_container")]
924 pub container: String,
925 #[serde(
940 default = "get_default_shell",
941 skip_serializing_if = "is_default_shell"
942 )]
943 pub shell: String,
944 pub cpu_limit_behavior: TaskResourceLimitBehavior,
946 pub memory_limit_behavior: TaskResourceLimitBehavior,
948 #[serde(default = "get_sentinel_cache_dir")]
952 pub cache_dir: String,
953 pub cache: CallCachingMode,
955 pub digests: ContentDigestMode,
959 #[serde(default)]
968 pub excluded_cache_requirements: HashSet<String>,
969 #[serde(default)]
977 pub excluded_cache_hints: HashSet<String>,
978 #[serde(default)]
986 pub excluded_cache_inputs: HashSet<String>,
987}
988
989nullable_config_type!(
990 Retries,
991 u64,
992 "default",
993 value,
994 value <= MAX_RETRIES,
995 "a number less than or equal to 100",
996 None
997);
998
999impl Default for TaskConfig {
1000 fn default() -> Self {
1001 Self {
1002 retries: Default::default(),
1003 container: get_default_container(),
1004 shell: get_default_shell(),
1005 cpu_limit_behavior: Default::default(),
1006 memory_limit_behavior: Default::default(),
1007 cache_dir: get_sentinel_cache_dir(),
1008 cache: Default::default(),
1009 digests: Default::default(),
1010 excluded_cache_requirements: Default::default(),
1011 excluded_cache_hints: Default::default(),
1012 excluded_cache_inputs: Default::default(),
1013 }
1014 }
1015}
1016
1017impl TaskConfig {
1018 pub fn validate(&self) -> Result<()> {
1020 if self.retries.inner().cloned().unwrap_or(0) > MAX_RETRIES {
1021 bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
1022 }
1023
1024 Ok(())
1025 }
1026
1027 pub fn cache_dir(&self) -> Option<PathBuf> {
1029 if self.cache_dir == CACHE_DIR_SENTINEL {
1030 None
1031 } else {
1032 Some(PathBuf::from(&self.cache_dir))
1033 }
1034 }
1035}
1036
1037#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1040#[serde(rename_all = "snake_case", deny_unknown_fields)]
1041pub enum TaskResourceLimitBehavior {
1042 TryWithMax,
1045 #[default]
1049 Deny,
1050}
1051
1052#[derive(Debug, Clone, Serialize, Deserialize)]
1054#[serde(rename_all = "snake_case", tag = "type")]
1055pub enum BackendConfig {
1056 Local(LocalBackendConfig),
1058 Docker(DockerBackendConfig),
1060 Tes(TesBackendConfig),
1062 LsfApptainer(LsfApptainerBackendConfig),
1066 SlurmApptainer(SlurmApptainerBackendConfig),
1070}
1071
1072impl Default for BackendConfig {
1073 fn default() -> Self {
1074 Self::Docker(Default::default())
1075 }
1076}
1077
1078impl BackendConfig {
1079 pub async fn validate(&self, engine_config: &Config) -> Result<()> {
1081 match self {
1082 Self::Local(config) => config.validate(),
1083 Self::Docker(config) => config.validate(),
1084 Self::Tes(config) => config.validate(),
1085 Self::LsfApptainer(config) => config.validate(engine_config).await,
1086 Self::SlurmApptainer(config) => config.validate(engine_config).await,
1087 }
1088 }
1089
1090 pub fn as_local(&self) -> Option<&LocalBackendConfig> {
1094 match self {
1095 Self::Local(config) => Some(config),
1096 _ => None,
1097 }
1098 }
1099
1100 pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
1104 match self {
1105 Self::Docker(config) => Some(config),
1106 _ => None,
1107 }
1108 }
1109
1110 pub fn as_tes(&self) -> Option<&TesBackendConfig> {
1114 match self {
1115 Self::Tes(config) => Some(config),
1116 _ => None,
1117 }
1118 }
1119
1120 pub fn as_lsf_apptainer(&self) -> Option<&LsfApptainerBackendConfig> {
1125 match self {
1126 Self::LsfApptainer(config) => Some(config),
1127 _ => None,
1128 }
1129 }
1130
1131 pub fn as_slurm_apptainer(&self) -> Option<&SlurmApptainerBackendConfig> {
1136 match self {
1137 Self::SlurmApptainer(config) => Some(config),
1138 _ => None,
1139 }
1140 }
1141
1142 pub fn redact(&mut self) {
1144 match self {
1145 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
1146 Self::Tes(config) => config.redact(),
1147 }
1148 }
1149
1150 pub fn unredact(&mut self) {
1152 match self {
1153 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
1154 Self::Tes(config) => config.unredact(),
1155 }
1156 }
1157}
1158
1159#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1166#[serde(rename_all = "snake_case", deny_unknown_fields)]
1167pub struct LocalBackendConfig {
1168 #[serde(default, skip_serializing_if = "Option::is_none")]
1174 pub cpu: Option<u64>,
1175
1176 #[serde(default, skip_serializing_if = "Option::is_none")]
1183 pub memory: Option<String>,
1184}
1185
1186impl LocalBackendConfig {
1187 pub fn validate(&self) -> Result<()> {
1189 if let Some(cpu) = self.cpu {
1190 if cpu == 0 {
1191 bail!("local backend configuration value `cpu` cannot be zero");
1192 }
1193
1194 let total = SYSTEM.cpus().len() as u64;
1195 if cpu > total {
1196 bail!(
1197 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1198 available to the host ({total})"
1199 );
1200 }
1201 }
1202
1203 if let Some(memory) = &self.memory {
1204 let memory = convert_unit_string(memory).with_context(|| {
1205 format!("local backend configuration value `memory` has invalid value `{memory}`")
1206 })?;
1207
1208 if memory == 0 {
1209 bail!("local backend configuration value `memory` cannot be zero");
1210 }
1211
1212 let total = SYSTEM.total_memory();
1213 if memory > total {
1214 bail!(
1215 "local backend configuration value `memory` cannot exceed the total memory of \
1216 the host ({total} bytes)"
1217 );
1218 }
1219 }
1220
1221 Ok(())
1222 }
1223}
1224
1225const fn cleanup_default() -> bool {
1227 true
1228}
1229
1230#[derive(Debug, Clone, Serialize, Deserialize)]
1232#[serde(rename_all = "snake_case", deny_unknown_fields)]
1233pub struct DockerBackendConfig {
1234 #[serde(default = "cleanup_default")]
1238 pub cleanup: bool,
1239}
1240
1241impl DockerBackendConfig {
1242 pub fn validate(&self) -> Result<()> {
1244 Ok(())
1245 }
1246}
1247
1248impl Default for DockerBackendConfig {
1249 fn default() -> Self {
1250 Self { cleanup: true }
1251 }
1252}
1253
1254#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1256#[serde(rename_all = "snake_case", deny_unknown_fields)]
1257pub struct BasicAuthConfig {
1258 #[serde(default)]
1260 pub username: String,
1261 #[serde(default)]
1263 pub password: SecretString,
1264}
1265
1266impl BasicAuthConfig {
1267 pub fn validate(&self) -> Result<()> {
1269 Ok(())
1270 }
1271
1272 pub fn redact(&mut self) {
1274 self.password.redact();
1275 }
1276
1277 pub fn unredact(&mut self) {
1279 self.password.unredact();
1280 }
1281}
1282
1283#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1285#[serde(rename_all = "snake_case", deny_unknown_fields)]
1286pub struct BearerAuthConfig {
1287 #[serde(default)]
1289 pub token: SecretString,
1290}
1291
1292impl BearerAuthConfig {
1293 pub fn validate(&self) -> Result<()> {
1295 Ok(())
1296 }
1297
1298 pub fn redact(&mut self) {
1300 self.token.redact();
1301 }
1302
1303 pub fn unredact(&mut self) {
1305 self.token.unredact();
1306 }
1307}
1308
1309#[derive(Debug, Clone, Serialize, Deserialize)]
1311#[serde(rename_all = "snake_case", tag = "type")]
1312pub enum TesBackendAuthConfig {
1313 Basic(BasicAuthConfig),
1315 Bearer(BearerAuthConfig),
1317}
1318
1319impl TesBackendAuthConfig {
1320 pub fn validate(&self) -> Result<()> {
1322 match self {
1323 Self::Basic(config) => config.validate(),
1324 Self::Bearer(config) => config.validate(),
1325 }
1326 }
1327
1328 pub fn redact(&mut self) {
1331 match self {
1332 Self::Basic(auth) => auth.redact(),
1333 Self::Bearer(auth) => auth.redact(),
1334 }
1335 }
1336
1337 pub fn unredact(&mut self) {
1340 match self {
1341 Self::Basic(auth) => auth.unredact(),
1342 Self::Bearer(auth) => auth.unredact(),
1343 }
1344 }
1345}
1346
1347#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1349#[serde(rename_all = "snake_case", deny_unknown_fields)]
1350pub struct TesBackendConfig {
1351 #[serde(default, skip_serializing_if = "Option::is_none")]
1353 pub url: Option<Url>,
1354
1355 #[serde(default, skip_serializing_if = "Option::is_none")]
1357 pub auth: Option<TesBackendAuthConfig>,
1358
1359 #[serde(default, skip_serializing_if = "Option::is_none")]
1361 pub inputs: Option<Url>,
1362
1363 #[serde(default, skip_serializing_if = "Option::is_none")]
1365 pub outputs: Option<Url>,
1366
1367 #[serde(default, skip_serializing_if = "Option::is_none")]
1371 pub interval: Option<u64>,
1372
1373 pub retries: Option<u32>,
1378
1379 #[serde(default, skip_serializing_if = "Option::is_none")]
1384 pub max_concurrency: Option<u32>,
1385
1386 #[serde(default)]
1389 pub insecure: bool,
1390}
1391
1392impl TesBackendConfig {
1393 pub fn validate(&self) -> Result<()> {
1395 match &self.url {
1396 Some(url) => {
1397 if !self.insecure && url.scheme() != "https" {
1398 bail!(
1399 "TES backend configuration value `url` has invalid value `{url}`: URL \
1400 must use a HTTPS scheme"
1401 );
1402 }
1403 }
1404 None => bail!("TES backend configuration value `url` is required"),
1405 }
1406
1407 if let Some(auth) = &self.auth {
1408 auth.validate()?;
1409 }
1410
1411 if let Some(max_concurrency) = self.max_concurrency
1412 && max_concurrency == 0
1413 {
1414 bail!("TES backend configuration value `max_concurrency` cannot be zero");
1415 }
1416
1417 match &self.inputs {
1418 Some(url) => {
1419 if !is_supported_url(url.as_str()) {
1420 bail!(
1421 "TES backend storage configuration value `inputs` has invalid value \
1422 `{url}`: URL scheme is not supported"
1423 );
1424 }
1425
1426 if !url.path().ends_with('/') {
1427 bail!(
1428 "TES backend storage configuration value `inputs` has invalid value \
1429 `{url}`: URL path must end with a slash"
1430 );
1431 }
1432 }
1433 None => bail!("TES backend configuration value `inputs` is required"),
1434 }
1435
1436 match &self.outputs {
1437 Some(url) => {
1438 if !is_supported_url(url.as_str()) {
1439 bail!(
1440 "TES backend storage configuration value `outputs` has invalid value \
1441 `{url}`: URL scheme is not supported"
1442 );
1443 }
1444
1445 if !url.path().ends_with('/') {
1446 bail!(
1447 "TES backend storage configuration value `outputs` has invalid value \
1448 `{url}`: URL path must end with a slash"
1449 );
1450 }
1451 }
1452 None => bail!("TES backend storage configuration value `outputs` is required"),
1453 }
1454
1455 Ok(())
1456 }
1457
1458 pub fn redact(&mut self) {
1460 if let Some(auth) = &mut self.auth {
1461 auth.redact();
1462 }
1463 }
1464
1465 pub fn unredact(&mut self) {
1467 if let Some(auth) = &mut self.auth {
1468 auth.unredact();
1469 }
1470 }
1471}
1472
1473#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1475#[serde(rename_all = "snake_case", deny_unknown_fields)]
1476pub struct ApptainerConfig {
1477 #[serde(default = "default_apptainer_executable")]
1483 pub executable: String,
1484
1485 #[serde(default, skip_serializing_if = "Option::is_none")]
1491 pub image_cache_dir: Option<PathBuf>,
1492
1493 pub extra_apptainer_exec_args: Option<Vec<String>>,
1496}
1497
1498const DEFAULT_APPTAINER_EXECUTABLE: &str = "apptainer";
1500
1501fn default_apptainer_executable() -> String {
1503 String::from(DEFAULT_APPTAINER_EXECUTABLE)
1504}
1505
1506impl Default for ApptainerConfig {
1507 fn default() -> Self {
1508 Self {
1509 executable: default_apptainer_executable(),
1510 image_cache_dir: None,
1511 extra_apptainer_exec_args: None,
1512 }
1513 }
1514}
1515
1516impl ApptainerConfig {
1517 pub async fn validate(&self) -> Result<(), anyhow::Error> {
1519 Ok(())
1520 }
1521}
1522
1523#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1532#[serde(rename_all = "snake_case", deny_unknown_fields)]
1533pub struct LsfQueueConfig {
1534 pub name: String,
1537 pub max_cpu_per_task: Option<u64>,
1539 pub max_memory_per_task: Option<ByteSize>,
1541}
1542
1543impl LsfQueueConfig {
1544 pub async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1546 let queue = &self.name;
1547 ensure!(!queue.is_empty(), "{name}_lsf_queue name cannot be empty");
1548 if let Some(max_cpu_per_task) = self.max_cpu_per_task {
1549 ensure!(
1550 max_cpu_per_task > 0,
1551 "{name}_lsf_queue `{queue}` must allow at least 1 CPU to be provisioned"
1552 );
1553 }
1554 if let Some(max_memory_per_task) = self.max_memory_per_task {
1555 ensure!(
1556 max_memory_per_task.as_u64() > 0,
1557 "{name}_lsf_queue `{queue}` must allow at least some memory to be provisioned"
1558 );
1559 }
1560 match tokio::time::timeout(
1561 std::time::Duration::from_secs(10),
1564 Command::new("bqueues").arg(queue).output(),
1565 )
1566 .await
1567 {
1568 Ok(output) => {
1569 let output = output.context("validating LSF queue")?;
1570 if !output.status.success() {
1571 let stdout = String::from_utf8_lossy(&output.stdout);
1572 let stderr = String::from_utf8_lossy(&output.stderr);
1573 error!(%stdout, %stderr, %queue, "failed to validate {name}_lsf_queue");
1574 Err(anyhow!("failed to validate {name}_lsf_queue `{queue}`"))
1575 } else {
1576 Ok(())
1577 }
1578 }
1579 Err(_) => Err(anyhow!(
1580 "timed out trying to validate {name}_lsf_queue `{queue}`"
1581 )),
1582 }
1583 }
1584}
1585
1586#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1590#[serde(rename_all = "snake_case", deny_unknown_fields)]
1591pub struct LsfApptainerBackendConfig {
1592 #[serde(default, skip_serializing_if = "Option::is_none")]
1596 pub interval: Option<u64>,
1597 #[serde(default, skip_serializing_if = "Option::is_none")]
1605 pub max_concurrency: Option<u32>,
1606 pub default_lsf_queue: Option<LsfQueueConfig>,
1613 pub short_task_lsf_queue: Option<LsfQueueConfig>,
1620 pub gpu_lsf_queue: Option<LsfQueueConfig>,
1624 pub fpga_lsf_queue: Option<LsfQueueConfig>,
1628 pub extra_bsub_args: Option<Vec<String>>,
1631 #[serde(default, skip_serializing_if = "Option::is_none")]
1634 pub job_name_prefix: Option<String>,
1635 #[serde(default)]
1642 #[serde(flatten)]
1646 pub apptainer_config: ApptainerConfig,
1647}
1648
1649impl LsfApptainerBackendConfig {
1650 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1652 if cfg!(not(unix)) {
1653 bail!("LSF + Apptainer backend is not supported on non-unix platforms");
1654 }
1655
1656 if !engine_config.experimental_features_enabled {
1657 bail!("LSF + Apptainer backend requires enabling experimental features");
1658 }
1659
1660 if let Some(queue) = &self.default_lsf_queue {
1666 queue.validate("default").await?;
1667 }
1668
1669 if let Some(queue) = &self.short_task_lsf_queue {
1670 queue.validate("short_task").await?;
1671 }
1672
1673 if let Some(queue) = &self.gpu_lsf_queue {
1674 queue.validate("gpu").await?;
1675 }
1676
1677 if let Some(queue) = &self.fpga_lsf_queue {
1678 queue.validate("fpga").await?;
1679 }
1680
1681 if let Some(prefix) = &self.job_name_prefix
1682 && prefix.len() > MAX_LSF_JOB_NAME_PREFIX
1683 {
1684 bail!(
1685 "LSF job name prefix `{prefix}` exceeds the maximum {MAX_LSF_JOB_NAME_PREFIX} \
1686 bytes"
1687 );
1688 }
1689
1690 self.apptainer_config.validate().await?;
1691
1692 Ok(())
1693 }
1694
1695 pub(crate) fn lsf_queue_for_task(
1700 &self,
1701 requirements: &HashMap<String, Value>,
1702 hints: &HashMap<String, Value>,
1703 ) -> Option<&LsfQueueConfig> {
1704 if let Some(queue) = self.fpga_lsf_queue.as_ref()
1706 && let Some(true) = requirements
1707 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1708 .and_then(Value::as_boolean)
1709 {
1710 return Some(queue);
1711 }
1712
1713 if let Some(queue) = self.gpu_lsf_queue.as_ref()
1714 && let Some(true) = requirements
1715 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1716 .and_then(Value::as_boolean)
1717 {
1718 return Some(queue);
1719 }
1720
1721 if let Some(queue) = self.short_task_lsf_queue.as_ref()
1723 && let Some(true) = hints
1724 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1725 .and_then(Value::as_boolean)
1726 {
1727 return Some(queue);
1728 }
1729
1730 self.default_lsf_queue.as_ref()
1733 }
1734}
1735
1736#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1745#[serde(rename_all = "snake_case", deny_unknown_fields)]
1746pub struct SlurmPartitionConfig {
1747 pub name: String,
1750 pub max_cpu_per_task: Option<u64>,
1753 pub max_memory_per_task: Option<ByteSize>,
1755}
1756
1757impl SlurmPartitionConfig {
1758 pub async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1761 let partition = &self.name;
1762 ensure!(
1763 !partition.is_empty(),
1764 "{name}_slurm_partition name cannot be empty"
1765 );
1766 if let Some(max_cpu_per_task) = self.max_cpu_per_task {
1767 ensure!(
1768 max_cpu_per_task > 0,
1769 "{name}_slurm_partition `{partition}` must allow at least 1 CPU to be provisioned"
1770 );
1771 }
1772 if let Some(max_memory_per_task) = self.max_memory_per_task {
1773 ensure!(
1774 max_memory_per_task.as_u64() > 0,
1775 "{name}_slurm_partition `{partition}` must allow at least some memory to be \
1776 provisioned"
1777 );
1778 }
1779 match tokio::time::timeout(
1780 std::time::Duration::from_secs(10),
1783 Command::new("scontrol")
1784 .arg("show")
1785 .arg("partition")
1786 .arg(partition)
1787 .output(),
1788 )
1789 .await
1790 {
1791 Ok(output) => {
1792 let output = output.context("validating Slurm partition")?;
1793 if !output.status.success() {
1794 let stdout = String::from_utf8_lossy(&output.stdout);
1795 let stderr = String::from_utf8_lossy(&output.stderr);
1796 error!(%stdout, %stderr, %partition, "failed to validate {name}_slurm_partition");
1797 Err(anyhow!(
1798 "failed to validate {name}_slurm_partition `{partition}`"
1799 ))
1800 } else {
1801 Ok(())
1802 }
1803 }
1804 Err(_) => Err(anyhow!(
1805 "timed out trying to validate {name}_slurm_partition `{partition}`"
1806 )),
1807 }
1808 }
1809}
1810
1811#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1815#[serde(rename_all = "snake_case", deny_unknown_fields)]
1816pub struct SlurmApptainerBackendConfig {
1817 #[serde(default, skip_serializing_if = "Option::is_none")]
1821 pub interval: Option<u64>,
1822 #[serde(default, skip_serializing_if = "Option::is_none")]
1830 pub max_concurrency: Option<u32>,
1831 pub default_slurm_partition: Option<SlurmPartitionConfig>,
1840 pub short_task_slurm_partition: Option<SlurmPartitionConfig>,
1848 pub gpu_slurm_partition: Option<SlurmPartitionConfig>,
1852 pub fpga_slurm_partition: Option<SlurmPartitionConfig>,
1856 pub extra_sbatch_args: Option<Vec<String>>,
1859 #[serde(default, skip_serializing_if = "Option::is_none")]
1861 pub job_name_prefix: Option<String>,
1862 #[serde(default)]
1869 #[serde(flatten)]
1873 pub apptainer_config: ApptainerConfig,
1874}
1875
1876impl SlurmApptainerBackendConfig {
1877 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1879 if cfg!(not(unix)) {
1880 bail!("Slurm + Apptainer backend is not supported on non-unix platforms");
1881 }
1882 if !engine_config.experimental_features_enabled {
1883 bail!("Slurm + Apptainer backend requires enabling experimental features");
1884 }
1885
1886 if let Some(partition) = &self.default_slurm_partition {
1892 partition.validate("default").await?;
1893 }
1894 if let Some(partition) = &self.short_task_slurm_partition {
1895 partition.validate("short_task").await?;
1896 }
1897 if let Some(partition) = &self.gpu_slurm_partition {
1898 partition.validate("gpu").await?;
1899 }
1900 if let Some(partition) = &self.fpga_slurm_partition {
1901 partition.validate("fpga").await?;
1902 }
1903
1904 self.apptainer_config.validate().await?;
1905
1906 Ok(())
1907 }
1908
1909 pub(crate) fn slurm_partition_for_task(
1914 &self,
1915 requirements: &HashMap<String, Value>,
1916 hints: &HashMap<String, Value>,
1917 ) -> Option<&SlurmPartitionConfig> {
1918 if let Some(partition) = self.fpga_slurm_partition.as_ref()
1924 && let Some(true) = requirements
1925 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1926 .and_then(Value::as_boolean)
1927 {
1928 return Some(partition);
1929 }
1930
1931 if let Some(partition) = self.gpu_slurm_partition.as_ref()
1932 && let Some(true) = requirements
1933 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1934 .and_then(Value::as_boolean)
1935 {
1936 return Some(partition);
1937 }
1938
1939 if let Some(partition) = self.short_task_slurm_partition.as_ref()
1941 && let Some(true) = hints
1942 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1943 .and_then(Value::as_boolean)
1944 {
1945 return Some(partition);
1946 }
1947
1948 self.default_slurm_partition.as_ref()
1951 }
1952}
1953
1954#[cfg(test)]
1955mod test {
1956 use pretty_assertions::assert_eq;
1957
1958 use super::*;
1959
1960 #[test]
1961 fn redacted_secret() {
1962 let mut secret: SecretString = "secret".into();
1963
1964 assert_eq!(
1965 serde_json::to_string(&secret).unwrap(),
1966 format!(r#""{REDACTED}""#)
1967 );
1968
1969 secret.unredact();
1970 assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1971
1972 secret.redact();
1973 assert_eq!(
1974 serde_json::to_string(&secret).unwrap(),
1975 format!(r#""{REDACTED}""#)
1976 );
1977 }
1978
1979 #[test]
1980 fn redacted_config() {
1981 let config = Config {
1982 backends: [
1983 (
1984 "first".to_string(),
1985 BackendConfig::Tes(TesBackendConfig {
1986 auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1987 username: "foo".into(),
1988 password: "secret".into(),
1989 })),
1990 ..Default::default()
1991 }),
1992 ),
1993 (
1994 "second".to_string(),
1995 BackendConfig::Tes(TesBackendConfig {
1996 auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1997 token: "secret".into(),
1998 })),
1999 ..Default::default()
2000 }),
2001 ),
2002 ]
2003 .into(),
2004 storage: StorageConfig {
2005 azure: AzureStorageConfig {
2006 auth: Some(AzureStorageAuthConfig {
2007 account_name: "foo".into(),
2008 access_key: "secret".into(),
2009 }),
2010 },
2011 s3: S3StorageConfig {
2012 auth: Some(S3StorageAuthConfig {
2013 access_key_id: "foo".into(),
2014 secret_access_key: "secret".into(),
2015 }),
2016 ..Default::default()
2017 },
2018 google: GoogleStorageConfig {
2019 auth: Some(GoogleStorageAuthConfig {
2020 access_key: "foo".into(),
2021 secret: "secret".into(),
2022 }),
2023 },
2024 },
2025 ..Default::default()
2026 };
2027
2028 let json = serde_json::to_string_pretty(&config).unwrap();
2029 assert!(json.contains("secret"), "`{json}` contains a secret");
2030 }
2031
2032 #[tokio::test]
2033 async fn test_config_validate() {
2034 let mut config = Config::default();
2036 config.task.retries = Retries(Some(255));
2037 assert_eq!(
2038 config.validate().await.unwrap_err().to_string(),
2039 "configuration value `task.retries` cannot exceed 100"
2040 );
2041
2042 let mut config = Config::default();
2044 config.workflow.scatter.concurrency = 0;
2045 assert_eq!(
2046 config.validate().await.unwrap_err().to_string(),
2047 "configuration value `workflow.scatter.concurrency` cannot be zero"
2048 );
2049
2050 let config = Config {
2052 backend: "foo".into(),
2053 ..Default::default()
2054 };
2055 assert_eq!(
2056 config.validate().await.unwrap_err().to_string(),
2057 "a backend named `foo` is not present in the configuration"
2058 );
2059 let config = Config {
2060 backend: "bar".into(),
2061 backends: [("foo".to_string(), BackendConfig::default())].into(),
2062 ..Default::default()
2063 };
2064 assert_eq!(
2065 config.validate().await.unwrap_err().to_string(),
2066 "a backend named `bar` is not present in the configuration"
2067 );
2068
2069 let config = Config {
2071 backend: "foo".to_string(),
2072 backends: [("foo".to_string(), BackendConfig::default())].into(),
2073 ..Default::default()
2074 };
2075 config.validate().await.expect("config should validate");
2076
2077 let config = Config {
2079 backends: [(
2080 "default".to_string(),
2081 BackendConfig::Local(LocalBackendConfig {
2082 cpu: Some(0),
2083 ..Default::default()
2084 }),
2085 )]
2086 .into(),
2087 ..Default::default()
2088 };
2089 assert_eq!(
2090 config.validate().await.unwrap_err().to_string(),
2091 "local backend configuration value `cpu` cannot be zero"
2092 );
2093 let config = Config {
2094 backends: [(
2095 "default".to_string(),
2096 BackendConfig::Local(LocalBackendConfig {
2097 cpu: Some(10000000),
2098 ..Default::default()
2099 }),
2100 )]
2101 .into(),
2102 ..Default::default()
2103 };
2104 assert!(
2105 config
2106 .validate()
2107 .await
2108 .unwrap_err()
2109 .to_string()
2110 .starts_with(
2111 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
2112 available to the host"
2113 )
2114 );
2115
2116 let config = Config {
2118 backends: [(
2119 "default".to_string(),
2120 BackendConfig::Local(LocalBackendConfig {
2121 memory: Some("0 GiB".to_string()),
2122 ..Default::default()
2123 }),
2124 )]
2125 .into(),
2126 ..Default::default()
2127 };
2128 assert_eq!(
2129 config.validate().await.unwrap_err().to_string(),
2130 "local backend configuration value `memory` cannot be zero"
2131 );
2132 let config = Config {
2133 backends: [(
2134 "default".to_string(),
2135 BackendConfig::Local(LocalBackendConfig {
2136 memory: Some("100 meows".to_string()),
2137 ..Default::default()
2138 }),
2139 )]
2140 .into(),
2141 ..Default::default()
2142 };
2143 assert_eq!(
2144 config.validate().await.unwrap_err().to_string(),
2145 "local backend configuration value `memory` has invalid value `100 meows`"
2146 );
2147
2148 let config = Config {
2149 backends: [(
2150 "default".to_string(),
2151 BackendConfig::Local(LocalBackendConfig {
2152 memory: Some("1000 TiB".to_string()),
2153 ..Default::default()
2154 }),
2155 )]
2156 .into(),
2157 ..Default::default()
2158 };
2159 assert!(
2160 config
2161 .validate()
2162 .await
2163 .unwrap_err()
2164 .to_string()
2165 .starts_with(
2166 "local backend configuration value `memory` cannot exceed the total memory of \
2167 the host"
2168 )
2169 );
2170
2171 let config = Config {
2173 backends: [(
2174 "default".to_string(),
2175 BackendConfig::Tes(Default::default()),
2176 )]
2177 .into(),
2178 ..Default::default()
2179 };
2180 assert_eq!(
2181 config.validate().await.unwrap_err().to_string(),
2182 "TES backend configuration value `url` is required"
2183 );
2184
2185 let config = Config {
2187 backends: [(
2188 "default".to_string(),
2189 BackendConfig::Tes(TesBackendConfig {
2190 url: Some("https://example.com".parse().unwrap()),
2191 max_concurrency: Some(0),
2192 ..Default::default()
2193 }),
2194 )]
2195 .into(),
2196 ..Default::default()
2197 };
2198 assert_eq!(
2199 config.validate().await.unwrap_err().to_string(),
2200 "TES backend configuration value `max_concurrency` cannot be zero"
2201 );
2202
2203 let config = Config {
2205 backends: [(
2206 "default".to_string(),
2207 BackendConfig::Tes(TesBackendConfig {
2208 url: Some("http://example.com".parse().unwrap()),
2209 inputs: Some("http://example.com".parse().unwrap()),
2210 outputs: Some("http://example.com".parse().unwrap()),
2211 ..Default::default()
2212 }),
2213 )]
2214 .into(),
2215 ..Default::default()
2216 };
2217 assert_eq!(
2218 config.validate().await.unwrap_err().to_string(),
2219 "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
2220 must use a HTTPS scheme"
2221 );
2222
2223 let config = Config {
2225 backends: [(
2226 "default".to_string(),
2227 BackendConfig::Tes(TesBackendConfig {
2228 url: Some("http://example.com".parse().unwrap()),
2229 inputs: Some("http://example.com".parse().unwrap()),
2230 outputs: Some("http://example.com".parse().unwrap()),
2231 insecure: true,
2232 ..Default::default()
2233 }),
2234 )]
2235 .into(),
2236 ..Default::default()
2237 };
2238 config
2239 .validate()
2240 .await
2241 .expect("configuration should validate");
2242
2243 let mut config = Config::default();
2245 config.http.parallelism = Parallelism(Some(0));
2246 assert_eq!(
2247 config.validate().await.unwrap_err().to_string(),
2248 "configuration value `http.parallelism` cannot be zero"
2249 );
2250
2251 let mut config = Config::default();
2253 config.http.parallelism = Parallelism(Some(5));
2254 assert!(
2255 config.validate().await.is_ok(),
2256 "should pass for valid configuration"
2257 );
2258 let mut config = Config::default();
2259 config.http.parallelism = Parallelism(None);
2260 assert!(
2261 config.validate().await.is_ok(),
2262 "should pass for default (None)"
2263 );
2264
2265 #[cfg(unix)]
2267 {
2268 let job_name_prefix = "A".repeat(MAX_LSF_JOB_NAME_PREFIX * 2);
2269 let mut config = Config {
2270 experimental_features_enabled: true,
2271 ..Default::default()
2272 };
2273 config.backends.insert(
2274 "default".to_string(),
2275 BackendConfig::LsfApptainer(LsfApptainerBackendConfig {
2276 job_name_prefix: Some(job_name_prefix.clone()),
2277 ..Default::default()
2278 }),
2279 );
2280 assert_eq!(
2281 config.validate().await.unwrap_err().to_string(),
2282 format!("LSF job name prefix `{job_name_prefix}` exceeds the maximum 100 bytes")
2283 );
2284 }
2285 }
2286}