1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::path::Path;
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use anyhow::bail;
14use anyhow::ensure;
15use bytesize::ByteSize;
16use indexmap::IndexMap;
17use secrecy::ExposeSecret;
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::process::Command;
21use tracing::error;
22use tracing::warn;
23use url::Url;
24
25use crate::CancellationContext;
26use crate::Events;
27use crate::SYSTEM;
28use crate::Value;
29use crate::backend::TaskExecutionBackend;
30use crate::convert_unit_string;
31use crate::path::is_supported_url;
32
33pub(crate) const MAX_RETRIES: u64 = 100;
35
36pub(crate) const DEFAULT_TASK_SHELL: &str = "bash";
38
39pub(crate) const DEFAULT_BACKEND_NAME: &str = "default";
41
42const MAX_LSF_JOB_NAME_PREFIX: usize = 100;
44
45const REDACTED: &str = "<REDACTED>";
47
48pub(crate) fn cache_dir() -> Result<PathBuf> {
50 const CACHE_DIR_ROOT: &str = "sprocket";
52
53 Ok(dirs::cache_dir()
54 .context("failed to determine user cache directory")?
55 .join(CACHE_DIR_ROOT))
56}
57
58#[derive(Debug, Clone)]
62pub struct SecretString {
63 inner: secrecy::SecretString,
67 redacted: bool,
74}
75
76impl SecretString {
77 pub fn redact(&mut self) {
82 self.redacted = true;
83 }
84
85 pub fn unredact(&mut self) {
87 self.redacted = false;
88 }
89
90 pub fn inner(&self) -> &secrecy::SecretString {
92 &self.inner
93 }
94}
95
96impl From<String> for SecretString {
97 fn from(s: String) -> Self {
98 Self {
99 inner: s.into(),
100 redacted: true,
101 }
102 }
103}
104
105impl From<&str> for SecretString {
106 fn from(s: &str) -> Self {
107 Self {
108 inner: s.into(),
109 redacted: true,
110 }
111 }
112}
113
114impl Default for SecretString {
115 fn default() -> Self {
116 Self {
117 inner: Default::default(),
118 redacted: true,
119 }
120 }
121}
122
123impl serde::Serialize for SecretString {
124 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
125 where
126 S: serde::Serializer,
127 {
128 use secrecy::ExposeSecret;
129
130 if self.redacted {
131 serializer.serialize_str(REDACTED)
132 } else {
133 serializer.serialize_str(self.inner.expose_secret())
134 }
135 }
136}
137
138impl<'de> serde::Deserialize<'de> for SecretString {
139 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
140 where
141 D: serde::Deserializer<'de>,
142 {
143 let inner = secrecy::SecretString::deserialize(deserializer)?;
144 Ok(Self {
145 inner,
146 redacted: true,
147 })
148 }
149}
150
151#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
154#[serde(rename_all = "snake_case")]
155pub enum FailureMode {
156 #[default]
159 Slow,
160 Fast,
164}
165
166#[derive(Debug, Default, Clone, Serialize, Deserialize)]
177#[serde(rename_all = "snake_case", deny_unknown_fields)]
178pub struct Config {
179 #[serde(default)]
181 pub http: HttpConfig,
182 #[serde(default)]
184 pub workflow: WorkflowConfig,
185 #[serde(default)]
187 pub task: TaskConfig,
188 #[serde(skip_serializing_if = "Option::is_none")]
193 pub backend: Option<String>,
194 #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
202 pub backends: IndexMap<String, BackendConfig>,
203 #[serde(default)]
205 pub storage: StorageConfig,
206 #[serde(default)]
220 pub suppress_env_specific_output: bool,
221 #[serde(default)]
228 pub experimental_features_enabled: bool,
229 #[serde(default, rename = "fail")]
237 pub failure_mode: FailureMode,
238}
239
240impl Config {
241 pub async fn validate(&self) -> Result<()> {
243 self.http.validate()?;
244 self.workflow.validate()?;
245 self.task.validate()?;
246
247 if self.backend.is_none() && self.backends.len() < 2 {
248 } else {
251 let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
253 if !self.backends.contains_key(backend) {
254 bail!("a backend named `{backend}` is not present in the configuration");
255 }
256 }
257
258 for backend in self.backends.values() {
259 backend.validate(self).await?;
260 }
261
262 self.storage.validate()?;
263
264 if self.suppress_env_specific_output && !self.experimental_features_enabled {
265 bail!("`suppress_env_specific_output` requires enabling experimental features");
266 }
267
268 Ok(())
269 }
270
271 pub fn redact(&mut self) {
275 for backend in self.backends.values_mut() {
276 backend.redact();
277 }
278
279 if let Some(auth) = &mut self.storage.azure.auth {
280 auth.redact();
281 }
282
283 if let Some(auth) = &mut self.storage.s3.auth {
284 auth.redact();
285 }
286
287 if let Some(auth) = &mut self.storage.google.auth {
288 auth.redact();
289 }
290 }
291
292 pub fn unredact(&mut self) {
296 for backend in self.backends.values_mut() {
297 backend.unredact();
298 }
299
300 if let Some(auth) = &mut self.storage.azure.auth {
301 auth.unredact();
302 }
303
304 if let Some(auth) = &mut self.storage.s3.auth {
305 auth.unredact();
306 }
307
308 if let Some(auth) = &mut self.storage.google.auth {
309 auth.unredact();
310 }
311 }
312
313 pub fn backend(&self) -> Result<Cow<'_, BackendConfig>> {
318 if self.backend.is_some() || self.backends.len() >= 2 {
319 let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
321 return Ok(Cow::Borrowed(self.backends.get(backend).ok_or_else(
322 || anyhow!("a backend named `{backend}` is not present in the configuration"),
323 )?));
324 }
325
326 if self.backends.len() == 1 {
327 Ok(Cow::Borrowed(self.backends.values().next().unwrap()))
329 } else {
330 Ok(Cow::Owned(BackendConfig::default()))
332 }
333 }
334
335 pub(crate) async fn create_backend(
337 self: &Arc<Self>,
338 run_root_dir: &Path,
339 events: Events,
340 cancellation: CancellationContext,
341 ) -> Result<Arc<dyn TaskExecutionBackend>> {
342 use crate::backend::*;
343
344 match self.backend()?.as_ref() {
345 BackendConfig::Local(_) => {
346 warn!(
347 "the engine is configured to use the local backend: tasks will not be run \
348 inside of a container"
349 );
350 Ok(Arc::new(LocalBackend::new(
351 self.clone(),
352 events,
353 cancellation,
354 )?))
355 }
356 BackendConfig::Docker(_) => Ok(Arc::new(
357 DockerBackend::new(self.clone(), events, cancellation).await?,
358 )),
359 BackendConfig::Tes(_) => Ok(Arc::new(
360 TesBackend::new(self.clone(), events, cancellation).await?,
361 )),
362 BackendConfig::LsfApptainer(_) => Ok(Arc::new(LsfApptainerBackend::new(
363 self.clone(),
364 run_root_dir,
365 events,
366 cancellation,
367 )?)),
368 BackendConfig::SlurmApptainer(_) => Ok(Arc::new(SlurmApptainerBackend::new(
369 self.clone(),
370 run_root_dir,
371 events,
372 cancellation,
373 )?)),
374 }
375 }
376}
377
378#[derive(Debug, Default, Clone, Serialize, Deserialize)]
380#[serde(rename_all = "snake_case", deny_unknown_fields)]
381pub struct HttpConfig {
382 #[serde(default, skip_serializing_if = "Option::is_none")]
386 pub cache_dir: Option<PathBuf>,
387 #[serde(default, skip_serializing_if = "Option::is_none")]
391 pub retries: Option<usize>,
392 #[serde(default, skip_serializing_if = "Option::is_none")]
396 pub parallelism: Option<usize>,
397}
398
399impl HttpConfig {
400 pub fn validate(&self) -> Result<()> {
402 if let Some(parallelism) = self.parallelism
403 && parallelism == 0
404 {
405 bail!("configuration value `http.parallelism` cannot be zero");
406 }
407 Ok(())
408 }
409}
410
411#[derive(Debug, Default, Clone, Serialize, Deserialize)]
413#[serde(rename_all = "snake_case", deny_unknown_fields)]
414pub struct StorageConfig {
415 #[serde(default)]
417 pub azure: AzureStorageConfig,
418 #[serde(default)]
420 pub s3: S3StorageConfig,
421 #[serde(default)]
423 pub google: GoogleStorageConfig,
424}
425
426impl StorageConfig {
427 pub fn validate(&self) -> Result<()> {
429 self.azure.validate()?;
430 self.s3.validate()?;
431 self.google.validate()?;
432 Ok(())
433 }
434}
435
436#[derive(Debug, Default, Clone, Serialize, Deserialize)]
438#[serde(rename_all = "snake_case", deny_unknown_fields)]
439pub struct AzureStorageAuthConfig {
440 pub account_name: String,
442 pub access_key: SecretString,
444}
445
446impl AzureStorageAuthConfig {
447 pub fn validate(&self) -> Result<()> {
449 if self.account_name.is_empty() {
450 bail!("configuration value `storage.azure.auth.account_name` is required");
451 }
452
453 if self.access_key.inner.expose_secret().is_empty() {
454 bail!("configuration value `storage.azure.auth.access_key` is required");
455 }
456
457 Ok(())
458 }
459
460 pub fn redact(&mut self) {
463 self.access_key.redact();
464 }
465
466 pub fn unredact(&mut self) {
469 self.access_key.unredact();
470 }
471}
472
473#[derive(Debug, Default, Clone, Serialize, Deserialize)]
475#[serde(rename_all = "snake_case", deny_unknown_fields)]
476pub struct AzureStorageConfig {
477 #[serde(default, skip_serializing_if = "Option::is_none")]
479 pub auth: Option<AzureStorageAuthConfig>,
480}
481
482impl AzureStorageConfig {
483 pub fn validate(&self) -> Result<()> {
485 if let Some(auth) = &self.auth {
486 auth.validate()?;
487 }
488
489 Ok(())
490 }
491}
492
493#[derive(Debug, Default, Clone, Serialize, Deserialize)]
495#[serde(rename_all = "snake_case", deny_unknown_fields)]
496pub struct S3StorageAuthConfig {
497 pub access_key_id: String,
499 pub secret_access_key: SecretString,
501}
502
503impl S3StorageAuthConfig {
504 pub fn validate(&self) -> Result<()> {
506 if self.access_key_id.is_empty() {
507 bail!("configuration value `storage.s3.auth.access_key_id` is required");
508 }
509
510 if self.secret_access_key.inner.expose_secret().is_empty() {
511 bail!("configuration value `storage.s3.auth.secret_access_key` is required");
512 }
513
514 Ok(())
515 }
516
517 pub fn redact(&mut self) {
520 self.secret_access_key.redact();
521 }
522
523 pub fn unredact(&mut self) {
526 self.secret_access_key.unredact();
527 }
528}
529
530#[derive(Debug, Default, Clone, Serialize, Deserialize)]
532#[serde(rename_all = "snake_case", deny_unknown_fields)]
533pub struct S3StorageConfig {
534 #[serde(default, skip_serializing_if = "Option::is_none")]
539 pub region: Option<String>,
540
541 #[serde(default, skip_serializing_if = "Option::is_none")]
543 pub auth: Option<S3StorageAuthConfig>,
544}
545
546impl S3StorageConfig {
547 pub fn validate(&self) -> Result<()> {
549 if let Some(auth) = &self.auth {
550 auth.validate()?;
551 }
552
553 Ok(())
554 }
555}
556
557#[derive(Debug, Default, Clone, Serialize, Deserialize)]
559#[serde(rename_all = "snake_case", deny_unknown_fields)]
560pub struct GoogleStorageAuthConfig {
561 pub access_key: String,
563 pub secret: SecretString,
565}
566
567impl GoogleStorageAuthConfig {
568 pub fn validate(&self) -> Result<()> {
570 if self.access_key.is_empty() {
571 bail!("configuration value `storage.google.auth.access_key` is required");
572 }
573
574 if self.secret.inner.expose_secret().is_empty() {
575 bail!("configuration value `storage.google.auth.secret` is required");
576 }
577
578 Ok(())
579 }
580
581 pub fn redact(&mut self) {
584 self.secret.redact();
585 }
586
587 pub fn unredact(&mut self) {
590 self.secret.unredact();
591 }
592}
593
594#[derive(Debug, Default, Clone, Serialize, Deserialize)]
596#[serde(rename_all = "snake_case", deny_unknown_fields)]
597pub struct GoogleStorageConfig {
598 #[serde(default, skip_serializing_if = "Option::is_none")]
600 pub auth: Option<GoogleStorageAuthConfig>,
601}
602
603impl GoogleStorageConfig {
604 pub fn validate(&self) -> Result<()> {
606 if let Some(auth) = &self.auth {
607 auth.validate()?;
608 }
609
610 Ok(())
611 }
612}
613
614#[derive(Debug, Default, Clone, Serialize, Deserialize)]
616#[serde(rename_all = "snake_case", deny_unknown_fields)]
617pub struct WorkflowConfig {
618 #[serde(default)]
620 pub scatter: ScatterConfig,
621}
622
623impl WorkflowConfig {
624 pub fn validate(&self) -> Result<()> {
626 self.scatter.validate()?;
627 Ok(())
628 }
629}
630
631#[derive(Debug, Default, Clone, Serialize, Deserialize)]
633#[serde(rename_all = "snake_case", deny_unknown_fields)]
634pub struct ScatterConfig {
635 #[serde(default, skip_serializing_if = "Option::is_none")]
688 pub concurrency: Option<u64>,
689}
690
691impl ScatterConfig {
692 pub fn validate(&self) -> Result<()> {
694 if let Some(concurrency) = self.concurrency
695 && concurrency == 0
696 {
697 bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
698 }
699
700 Ok(())
701 }
702}
703
704#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
706#[serde(rename_all = "snake_case")]
707pub enum CallCachingMode {
708 #[default]
715 Off,
716 On,
722 Explicit,
730}
731
732#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
734#[serde(rename_all = "snake_case")]
735pub enum ContentDigestMode {
736 Strong,
743 #[default]
754 Weak,
755}
756
757#[derive(Debug, Default, Clone, Serialize, Deserialize)]
759#[serde(rename_all = "snake_case", deny_unknown_fields)]
760pub struct TaskConfig {
761 #[serde(default, skip_serializing_if = "Option::is_none")]
767 pub retries: Option<u64>,
768 #[serde(default, skip_serializing_if = "Option::is_none")]
773 pub container: Option<String>,
774 #[serde(default, skip_serializing_if = "Option::is_none")]
791 pub shell: Option<String>,
792 #[serde(default)]
794 pub cpu_limit_behavior: TaskResourceLimitBehavior,
795 #[serde(default)]
797 pub memory_limit_behavior: TaskResourceLimitBehavior,
798 #[serde(default, skip_serializing_if = "Option::is_none")]
802 pub cache_dir: Option<PathBuf>,
803 #[serde(default)]
805 pub cache: CallCachingMode,
806 #[serde(default)]
810 pub digests: ContentDigestMode,
811 #[serde(default)]
820 pub excluded_cache_requirements: HashSet<String>,
821 #[serde(default)]
829 pub excluded_cache_hints: HashSet<String>,
830 #[serde(default)]
838 pub excluded_cache_inputs: HashSet<String>,
839}
840
841impl TaskConfig {
842 pub fn validate(&self) -> Result<()> {
844 if self.retries.unwrap_or(0) > MAX_RETRIES {
845 bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
846 }
847
848 Ok(())
849 }
850}
851
852#[derive(Debug, Default, Clone, Serialize, Deserialize)]
855#[serde(rename_all = "snake_case", deny_unknown_fields)]
856pub enum TaskResourceLimitBehavior {
857 TryWithMax,
860 #[default]
864 Deny,
865}
866
867#[derive(Debug, Clone, Serialize, Deserialize)]
869#[serde(rename_all = "snake_case", tag = "type")]
870pub enum BackendConfig {
871 Local(LocalBackendConfig),
873 Docker(DockerBackendConfig),
875 Tes(TesBackendConfig),
877 LsfApptainer(LsfApptainerBackendConfig),
881 SlurmApptainer(SlurmApptainerBackendConfig),
885}
886
887impl Default for BackendConfig {
888 fn default() -> Self {
889 Self::Docker(Default::default())
890 }
891}
892
893impl BackendConfig {
894 pub async fn validate(&self, engine_config: &Config) -> Result<()> {
896 match self {
897 Self::Local(config) => config.validate(),
898 Self::Docker(config) => config.validate(),
899 Self::Tes(config) => config.validate(),
900 Self::LsfApptainer(config) => config.validate(engine_config).await,
901 Self::SlurmApptainer(config) => config.validate(engine_config).await,
902 }
903 }
904
905 pub fn as_local(&self) -> Option<&LocalBackendConfig> {
909 match self {
910 Self::Local(config) => Some(config),
911 _ => None,
912 }
913 }
914
915 pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
919 match self {
920 Self::Docker(config) => Some(config),
921 _ => None,
922 }
923 }
924
925 pub fn as_tes(&self) -> Option<&TesBackendConfig> {
929 match self {
930 Self::Tes(config) => Some(config),
931 _ => None,
932 }
933 }
934
935 pub fn as_lsf_apptainer(&self) -> Option<&LsfApptainerBackendConfig> {
940 match self {
941 Self::LsfApptainer(config) => Some(config),
942 _ => None,
943 }
944 }
945
946 pub fn as_slurm_apptainer(&self) -> Option<&SlurmApptainerBackendConfig> {
951 match self {
952 Self::SlurmApptainer(config) => Some(config),
953 _ => None,
954 }
955 }
956
957 pub fn redact(&mut self) {
959 match self {
960 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
961 Self::Tes(config) => config.redact(),
962 }
963 }
964
965 pub fn unredact(&mut self) {
967 match self {
968 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
969 Self::Tes(config) => config.unredact(),
970 }
971 }
972}
973
974#[derive(Debug, Default, Clone, Serialize, Deserialize)]
981#[serde(rename_all = "snake_case", deny_unknown_fields)]
982pub struct LocalBackendConfig {
983 #[serde(default, skip_serializing_if = "Option::is_none")]
989 pub cpu: Option<u64>,
990
991 #[serde(default, skip_serializing_if = "Option::is_none")]
998 pub memory: Option<String>,
999}
1000
1001impl LocalBackendConfig {
1002 pub fn validate(&self) -> Result<()> {
1004 if let Some(cpu) = self.cpu {
1005 if cpu == 0 {
1006 bail!("local backend configuration value `cpu` cannot be zero");
1007 }
1008
1009 let total = SYSTEM.cpus().len() as u64;
1010 if cpu > total {
1011 bail!(
1012 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1013 available to the host ({total})"
1014 );
1015 }
1016 }
1017
1018 if let Some(memory) = &self.memory {
1019 let memory = convert_unit_string(memory).with_context(|| {
1020 format!("local backend configuration value `memory` has invalid value `{memory}`")
1021 })?;
1022
1023 if memory == 0 {
1024 bail!("local backend configuration value `memory` cannot be zero");
1025 }
1026
1027 let total = SYSTEM.total_memory();
1028 if memory > total {
1029 bail!(
1030 "local backend configuration value `memory` cannot exceed the total memory of \
1031 the host ({total} bytes)"
1032 );
1033 }
1034 }
1035
1036 Ok(())
1037 }
1038}
1039
1040const fn cleanup_default() -> bool {
1042 true
1043}
1044
1045#[derive(Debug, Clone, Serialize, Deserialize)]
1047#[serde(rename_all = "snake_case", deny_unknown_fields)]
1048pub struct DockerBackendConfig {
1049 #[serde(default = "cleanup_default")]
1053 pub cleanup: bool,
1054}
1055
1056impl DockerBackendConfig {
1057 pub fn validate(&self) -> Result<()> {
1059 Ok(())
1060 }
1061}
1062
1063impl Default for DockerBackendConfig {
1064 fn default() -> Self {
1065 Self { cleanup: true }
1066 }
1067}
1068
1069#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1071#[serde(rename_all = "snake_case", deny_unknown_fields)]
1072pub struct BasicAuthConfig {
1073 #[serde(default)]
1075 pub username: String,
1076 #[serde(default)]
1078 pub password: SecretString,
1079}
1080
1081impl BasicAuthConfig {
1082 pub fn validate(&self) -> Result<()> {
1084 Ok(())
1085 }
1086
1087 pub fn redact(&mut self) {
1089 self.password.redact();
1090 }
1091
1092 pub fn unredact(&mut self) {
1094 self.password.unredact();
1095 }
1096}
1097
1098#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1100#[serde(rename_all = "snake_case", deny_unknown_fields)]
1101pub struct BearerAuthConfig {
1102 #[serde(default)]
1104 pub token: SecretString,
1105}
1106
1107impl BearerAuthConfig {
1108 pub fn validate(&self) -> Result<()> {
1110 Ok(())
1111 }
1112
1113 pub fn redact(&mut self) {
1115 self.token.redact();
1116 }
1117
1118 pub fn unredact(&mut self) {
1120 self.token.unredact();
1121 }
1122}
1123
1124#[derive(Debug, Clone, Serialize, Deserialize)]
1126#[serde(rename_all = "snake_case", tag = "type")]
1127pub enum TesBackendAuthConfig {
1128 Basic(BasicAuthConfig),
1130 Bearer(BearerAuthConfig),
1132}
1133
1134impl TesBackendAuthConfig {
1135 pub fn validate(&self) -> Result<()> {
1137 match self {
1138 Self::Basic(config) => config.validate(),
1139 Self::Bearer(config) => config.validate(),
1140 }
1141 }
1142
1143 pub fn redact(&mut self) {
1146 match self {
1147 Self::Basic(auth) => auth.redact(),
1148 Self::Bearer(auth) => auth.redact(),
1149 }
1150 }
1151
1152 pub fn unredact(&mut self) {
1155 match self {
1156 Self::Basic(auth) => auth.unredact(),
1157 Self::Bearer(auth) => auth.unredact(),
1158 }
1159 }
1160}
1161
1162#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1164#[serde(rename_all = "snake_case", deny_unknown_fields)]
1165pub struct TesBackendConfig {
1166 #[serde(default, skip_serializing_if = "Option::is_none")]
1168 pub url: Option<Url>,
1169
1170 #[serde(default, skip_serializing_if = "Option::is_none")]
1172 pub auth: Option<TesBackendAuthConfig>,
1173
1174 #[serde(default, skip_serializing_if = "Option::is_none")]
1176 pub inputs: Option<Url>,
1177
1178 #[serde(default, skip_serializing_if = "Option::is_none")]
1180 pub outputs: Option<Url>,
1181
1182 #[serde(default, skip_serializing_if = "Option::is_none")]
1186 pub interval: Option<u64>,
1187
1188 pub retries: Option<u32>,
1193
1194 #[serde(default, skip_serializing_if = "Option::is_none")]
1199 pub max_concurrency: Option<u32>,
1200
1201 #[serde(default)]
1204 pub insecure: bool,
1205}
1206
1207impl TesBackendConfig {
1208 pub fn validate(&self) -> Result<()> {
1210 match &self.url {
1211 Some(url) => {
1212 if !self.insecure && url.scheme() != "https" {
1213 bail!(
1214 "TES backend configuration value `url` has invalid value `{url}`: URL \
1215 must use a HTTPS scheme"
1216 );
1217 }
1218 }
1219 None => bail!("TES backend configuration value `url` is required"),
1220 }
1221
1222 if let Some(auth) = &self.auth {
1223 auth.validate()?;
1224 }
1225
1226 if let Some(max_concurrency) = self.max_concurrency
1227 && max_concurrency == 0
1228 {
1229 bail!("TES backend configuration value `max_concurrency` cannot be zero");
1230 }
1231
1232 match &self.inputs {
1233 Some(url) => {
1234 if !is_supported_url(url.as_str()) {
1235 bail!(
1236 "TES backend storage configuration value `inputs` has invalid value \
1237 `{url}`: URL scheme is not supported"
1238 );
1239 }
1240
1241 if !url.path().ends_with('/') {
1242 bail!(
1243 "TES backend storage configuration value `inputs` has invalid value \
1244 `{url}`: URL path must end with a slash"
1245 );
1246 }
1247 }
1248 None => bail!("TES backend configuration value `inputs` is required"),
1249 }
1250
1251 match &self.outputs {
1252 Some(url) => {
1253 if !is_supported_url(url.as_str()) {
1254 bail!(
1255 "TES backend storage configuration value `outputs` has invalid value \
1256 `{url}`: URL scheme is not supported"
1257 );
1258 }
1259
1260 if !url.path().ends_with('/') {
1261 bail!(
1262 "TES backend storage configuration value `outputs` has invalid value \
1263 `{url}`: URL path must end with a slash"
1264 );
1265 }
1266 }
1267 None => bail!("TES backend storage configuration value `outputs` is required"),
1268 }
1269
1270 Ok(())
1271 }
1272
1273 pub fn redact(&mut self) {
1275 if let Some(auth) = &mut self.auth {
1276 auth.redact();
1277 }
1278 }
1279
1280 pub fn unredact(&mut self) {
1282 if let Some(auth) = &mut self.auth {
1283 auth.unredact();
1284 }
1285 }
1286}
1287
1288#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1290#[serde(rename_all = "snake_case", deny_unknown_fields)]
1291pub struct ApptainerConfig {
1292 #[serde(default = "default_apptainer_executable")]
1298 pub executable: String,
1299
1300 #[serde(default, skip_serializing_if = "Option::is_none")]
1306 pub image_cache_dir: Option<PathBuf>,
1307
1308 pub extra_apptainer_exec_args: Option<Vec<String>>,
1311}
1312
1313const DEFAULT_APPTAINER_EXECUTABLE: &str = "apptainer";
1315
1316fn default_apptainer_executable() -> String {
1318 String::from(DEFAULT_APPTAINER_EXECUTABLE)
1319}
1320
1321impl Default for ApptainerConfig {
1322 fn default() -> Self {
1323 Self {
1324 executable: default_apptainer_executable(),
1325 image_cache_dir: None,
1326 extra_apptainer_exec_args: None,
1327 }
1328 }
1329}
1330
1331impl ApptainerConfig {
1332 pub async fn validate(&self) -> Result<(), anyhow::Error> {
1334 Ok(())
1335 }
1336}
1337
1338#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1347#[serde(rename_all = "snake_case", deny_unknown_fields)]
1348pub struct LsfQueueConfig {
1349 pub name: String,
1352 pub max_cpu_per_task: Option<u64>,
1354 pub max_memory_per_task: Option<ByteSize>,
1356}
1357
1358impl LsfQueueConfig {
1359 pub async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1361 let queue = &self.name;
1362 ensure!(!queue.is_empty(), "{name}_lsf_queue name cannot be empty");
1363 if let Some(max_cpu_per_task) = self.max_cpu_per_task {
1364 ensure!(
1365 max_cpu_per_task > 0,
1366 "{name}_lsf_queue `{queue}` must allow at least 1 CPU to be provisioned"
1367 );
1368 }
1369 if let Some(max_memory_per_task) = self.max_memory_per_task {
1370 ensure!(
1371 max_memory_per_task.as_u64() > 0,
1372 "{name}_lsf_queue `{queue}` must allow at least some memory to be provisioned"
1373 );
1374 }
1375 match tokio::time::timeout(
1376 std::time::Duration::from_secs(10),
1379 Command::new("bqueues").arg(queue).output(),
1380 )
1381 .await
1382 {
1383 Ok(output) => {
1384 let output = output.context("validating LSF queue")?;
1385 if !output.status.success() {
1386 let stdout = String::from_utf8_lossy(&output.stdout);
1387 let stderr = String::from_utf8_lossy(&output.stderr);
1388 error!(%stdout, %stderr, %queue, "failed to validate {name}_lsf_queue");
1389 Err(anyhow!("failed to validate {name}_lsf_queue `{queue}`"))
1390 } else {
1391 Ok(())
1392 }
1393 }
1394 Err(_) => Err(anyhow!(
1395 "timed out trying to validate {name}_lsf_queue `{queue}`"
1396 )),
1397 }
1398 }
1399}
1400
1401#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1405#[serde(rename_all = "snake_case", deny_unknown_fields)]
1406pub struct LsfApptainerBackendConfig {
1407 #[serde(default, skip_serializing_if = "Option::is_none")]
1411 pub interval: Option<u64>,
1412 #[serde(default, skip_serializing_if = "Option::is_none")]
1420 pub max_concurrency: Option<u32>,
1421 pub default_lsf_queue: Option<LsfQueueConfig>,
1428 pub short_task_lsf_queue: Option<LsfQueueConfig>,
1435 pub gpu_lsf_queue: Option<LsfQueueConfig>,
1439 pub fpga_lsf_queue: Option<LsfQueueConfig>,
1443 pub extra_bsub_args: Option<Vec<String>>,
1446 #[serde(default, skip_serializing_if = "Option::is_none")]
1449 pub job_name_prefix: Option<String>,
1450 #[serde(default)]
1457 #[serde(flatten)]
1461 pub apptainer_config: ApptainerConfig,
1462}
1463
1464impl LsfApptainerBackendConfig {
1465 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1467 if cfg!(not(unix)) {
1468 bail!("LSF + Apptainer backend is not supported on non-unix platforms");
1469 }
1470
1471 if !engine_config.experimental_features_enabled {
1472 bail!("LSF + Apptainer backend requires enabling experimental features");
1473 }
1474
1475 if let Some(queue) = &self.default_lsf_queue {
1481 queue.validate("default").await?;
1482 }
1483
1484 if let Some(queue) = &self.short_task_lsf_queue {
1485 queue.validate("short_task").await?;
1486 }
1487
1488 if let Some(queue) = &self.gpu_lsf_queue {
1489 queue.validate("gpu").await?;
1490 }
1491
1492 if let Some(queue) = &self.fpga_lsf_queue {
1493 queue.validate("fpga").await?;
1494 }
1495
1496 if let Some(prefix) = &self.job_name_prefix
1497 && prefix.len() > MAX_LSF_JOB_NAME_PREFIX
1498 {
1499 bail!(
1500 "LSF job name prefix `{prefix}` exceeds the maximum {MAX_LSF_JOB_NAME_PREFIX} \
1501 bytes"
1502 );
1503 }
1504
1505 self.apptainer_config.validate().await?;
1506
1507 Ok(())
1508 }
1509
1510 pub(crate) fn lsf_queue_for_task(
1515 &self,
1516 requirements: &HashMap<String, Value>,
1517 hints: &HashMap<String, Value>,
1518 ) -> Option<&LsfQueueConfig> {
1519 if let Some(queue) = self.fpga_lsf_queue.as_ref()
1521 && let Some(true) = requirements
1522 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1523 .and_then(Value::as_boolean)
1524 {
1525 return Some(queue);
1526 }
1527
1528 if let Some(queue) = self.gpu_lsf_queue.as_ref()
1529 && let Some(true) = requirements
1530 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1531 .and_then(Value::as_boolean)
1532 {
1533 return Some(queue);
1534 }
1535
1536 if let Some(queue) = self.short_task_lsf_queue.as_ref()
1538 && let Some(true) = hints
1539 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1540 .and_then(Value::as_boolean)
1541 {
1542 return Some(queue);
1543 }
1544
1545 self.default_lsf_queue.as_ref()
1548 }
1549}
1550
1551#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1560#[serde(rename_all = "snake_case", deny_unknown_fields)]
1561pub struct SlurmPartitionConfig {
1562 pub name: String,
1565 pub max_cpu_per_task: Option<u64>,
1568 pub max_memory_per_task: Option<ByteSize>,
1570}
1571
1572impl SlurmPartitionConfig {
1573 pub async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1576 let partition = &self.name;
1577 ensure!(
1578 !partition.is_empty(),
1579 "{name}_slurm_partition name cannot be empty"
1580 );
1581 if let Some(max_cpu_per_task) = self.max_cpu_per_task {
1582 ensure!(
1583 max_cpu_per_task > 0,
1584 "{name}_slurm_partition `{partition}` must allow at least 1 CPU to be provisioned"
1585 );
1586 }
1587 if let Some(max_memory_per_task) = self.max_memory_per_task {
1588 ensure!(
1589 max_memory_per_task.as_u64() > 0,
1590 "{name}_slurm_partition `{partition}` must allow at least some memory to be \
1591 provisioned"
1592 );
1593 }
1594 match tokio::time::timeout(
1595 std::time::Duration::from_secs(10),
1598 Command::new("scontrol")
1599 .arg("show")
1600 .arg("partition")
1601 .arg(partition)
1602 .output(),
1603 )
1604 .await
1605 {
1606 Ok(output) => {
1607 let output = output.context("validating Slurm partition")?;
1608 if !output.status.success() {
1609 let stdout = String::from_utf8_lossy(&output.stdout);
1610 let stderr = String::from_utf8_lossy(&output.stderr);
1611 error!(%stdout, %stderr, %partition, "failed to validate {name}_slurm_partition");
1612 Err(anyhow!(
1613 "failed to validate {name}_slurm_partition `{partition}`"
1614 ))
1615 } else {
1616 Ok(())
1617 }
1618 }
1619 Err(_) => Err(anyhow!(
1620 "timed out trying to validate {name}_slurm_partition `{partition}`"
1621 )),
1622 }
1623 }
1624}
1625
1626#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1630#[serde(rename_all = "snake_case", deny_unknown_fields)]
1631pub struct SlurmApptainerBackendConfig {
1632 #[serde(default, skip_serializing_if = "Option::is_none")]
1636 pub interval: Option<u64>,
1637 #[serde(default, skip_serializing_if = "Option::is_none")]
1645 pub max_concurrency: Option<u32>,
1646 pub default_slurm_partition: Option<SlurmPartitionConfig>,
1655 pub short_task_slurm_partition: Option<SlurmPartitionConfig>,
1663 pub gpu_slurm_partition: Option<SlurmPartitionConfig>,
1667 pub fpga_slurm_partition: Option<SlurmPartitionConfig>,
1671 pub extra_sbatch_args: Option<Vec<String>>,
1674 #[serde(default, skip_serializing_if = "Option::is_none")]
1676 pub job_name_prefix: Option<String>,
1677 #[serde(default)]
1684 #[serde(flatten)]
1688 pub apptainer_config: ApptainerConfig,
1689}
1690
1691impl SlurmApptainerBackendConfig {
1692 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1694 if cfg!(not(unix)) {
1695 bail!("Slurm + Apptainer backend is not supported on non-unix platforms");
1696 }
1697 if !engine_config.experimental_features_enabled {
1698 bail!("Slurm + Apptainer backend requires enabling experimental features");
1699 }
1700
1701 if let Some(partition) = &self.default_slurm_partition {
1707 partition.validate("default").await?;
1708 }
1709 if let Some(partition) = &self.short_task_slurm_partition {
1710 partition.validate("short_task").await?;
1711 }
1712 if let Some(partition) = &self.gpu_slurm_partition {
1713 partition.validate("gpu").await?;
1714 }
1715 if let Some(partition) = &self.fpga_slurm_partition {
1716 partition.validate("fpga").await?;
1717 }
1718
1719 self.apptainer_config.validate().await?;
1720
1721 Ok(())
1722 }
1723
1724 pub(crate) fn slurm_partition_for_task(
1729 &self,
1730 requirements: &HashMap<String, Value>,
1731 hints: &HashMap<String, Value>,
1732 ) -> Option<&SlurmPartitionConfig> {
1733 if let Some(partition) = self.fpga_slurm_partition.as_ref()
1739 && let Some(true) = requirements
1740 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1741 .and_then(Value::as_boolean)
1742 {
1743 return Some(partition);
1744 }
1745
1746 if let Some(partition) = self.gpu_slurm_partition.as_ref()
1747 && let Some(true) = requirements
1748 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1749 .and_then(Value::as_boolean)
1750 {
1751 return Some(partition);
1752 }
1753
1754 if let Some(partition) = self.short_task_slurm_partition.as_ref()
1756 && let Some(true) = hints
1757 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1758 .and_then(Value::as_boolean)
1759 {
1760 return Some(partition);
1761 }
1762
1763 self.default_slurm_partition.as_ref()
1766 }
1767}
1768
1769#[cfg(test)]
1770mod test {
1771 use pretty_assertions::assert_eq;
1772
1773 use super::*;
1774
1775 #[test]
1776 fn redacted_secret() {
1777 let mut secret: SecretString = "secret".into();
1778
1779 assert_eq!(
1780 serde_json::to_string(&secret).unwrap(),
1781 format!(r#""{REDACTED}""#)
1782 );
1783
1784 secret.unredact();
1785 assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1786
1787 secret.redact();
1788 assert_eq!(
1789 serde_json::to_string(&secret).unwrap(),
1790 format!(r#""{REDACTED}""#)
1791 );
1792 }
1793
1794 #[test]
1795 fn redacted_config() {
1796 let config = Config {
1797 backends: [
1798 (
1799 "first".to_string(),
1800 BackendConfig::Tes(TesBackendConfig {
1801 auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1802 username: "foo".into(),
1803 password: "secret".into(),
1804 })),
1805 ..Default::default()
1806 }),
1807 ),
1808 (
1809 "second".to_string(),
1810 BackendConfig::Tes(TesBackendConfig {
1811 auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1812 token: "secret".into(),
1813 })),
1814 ..Default::default()
1815 }),
1816 ),
1817 ]
1818 .into(),
1819 storage: StorageConfig {
1820 azure: AzureStorageConfig {
1821 auth: Some(AzureStorageAuthConfig {
1822 account_name: "foo".into(),
1823 access_key: "secret".into(),
1824 }),
1825 },
1826 s3: S3StorageConfig {
1827 auth: Some(S3StorageAuthConfig {
1828 access_key_id: "foo".into(),
1829 secret_access_key: "secret".into(),
1830 }),
1831 ..Default::default()
1832 },
1833 google: GoogleStorageConfig {
1834 auth: Some(GoogleStorageAuthConfig {
1835 access_key: "foo".into(),
1836 secret: "secret".into(),
1837 }),
1838 },
1839 },
1840 ..Default::default()
1841 };
1842
1843 let json = serde_json::to_string_pretty(&config).unwrap();
1844 assert!(json.contains("secret"), "`{json}` contains a secret");
1845 }
1846
1847 #[tokio::test]
1848 async fn test_config_validate() {
1849 let mut config = Config::default();
1851 config.task.retries = Some(1000000);
1852 assert_eq!(
1853 config.validate().await.unwrap_err().to_string(),
1854 "configuration value `task.retries` cannot exceed 100"
1855 );
1856
1857 let mut config = Config::default();
1859 config.workflow.scatter.concurrency = Some(0);
1860 assert_eq!(
1861 config.validate().await.unwrap_err().to_string(),
1862 "configuration value `workflow.scatter.concurrency` cannot be zero"
1863 );
1864
1865 let config = Config {
1867 backend: Some("foo".into()),
1868 ..Default::default()
1869 };
1870 assert_eq!(
1871 config.validate().await.unwrap_err().to_string(),
1872 "a backend named `foo` is not present in the configuration"
1873 );
1874 let config = Config {
1875 backend: Some("bar".into()),
1876 backends: [("foo".to_string(), BackendConfig::default())].into(),
1877 ..Default::default()
1878 };
1879 assert_eq!(
1880 config.validate().await.unwrap_err().to_string(),
1881 "a backend named `bar` is not present in the configuration"
1882 );
1883
1884 let config = Config {
1886 backends: [("foo".to_string(), BackendConfig::default())].into(),
1887 ..Default::default()
1888 };
1889 config.validate().await.expect("config should validate");
1890
1891 let config = Config {
1893 backends: [(
1894 "default".to_string(),
1895 BackendConfig::Local(LocalBackendConfig {
1896 cpu: Some(0),
1897 ..Default::default()
1898 }),
1899 )]
1900 .into(),
1901 ..Default::default()
1902 };
1903 assert_eq!(
1904 config.validate().await.unwrap_err().to_string(),
1905 "local backend configuration value `cpu` cannot be zero"
1906 );
1907 let config = Config {
1908 backends: [(
1909 "default".to_string(),
1910 BackendConfig::Local(LocalBackendConfig {
1911 cpu: Some(10000000),
1912 ..Default::default()
1913 }),
1914 )]
1915 .into(),
1916 ..Default::default()
1917 };
1918 assert!(
1919 config
1920 .validate()
1921 .await
1922 .unwrap_err()
1923 .to_string()
1924 .starts_with(
1925 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1926 available to the host"
1927 )
1928 );
1929
1930 let config = Config {
1932 backends: [(
1933 "default".to_string(),
1934 BackendConfig::Local(LocalBackendConfig {
1935 memory: Some("0 GiB".to_string()),
1936 ..Default::default()
1937 }),
1938 )]
1939 .into(),
1940 ..Default::default()
1941 };
1942 assert_eq!(
1943 config.validate().await.unwrap_err().to_string(),
1944 "local backend configuration value `memory` cannot be zero"
1945 );
1946 let config = Config {
1947 backends: [(
1948 "default".to_string(),
1949 BackendConfig::Local(LocalBackendConfig {
1950 memory: Some("100 meows".to_string()),
1951 ..Default::default()
1952 }),
1953 )]
1954 .into(),
1955 ..Default::default()
1956 };
1957 assert_eq!(
1958 config.validate().await.unwrap_err().to_string(),
1959 "local backend configuration value `memory` has invalid value `100 meows`"
1960 );
1961
1962 let config = Config {
1963 backends: [(
1964 "default".to_string(),
1965 BackendConfig::Local(LocalBackendConfig {
1966 memory: Some("1000 TiB".to_string()),
1967 ..Default::default()
1968 }),
1969 )]
1970 .into(),
1971 ..Default::default()
1972 };
1973 assert!(
1974 config
1975 .validate()
1976 .await
1977 .unwrap_err()
1978 .to_string()
1979 .starts_with(
1980 "local backend configuration value `memory` cannot exceed the total memory of \
1981 the host"
1982 )
1983 );
1984
1985 let config = Config {
1987 backends: [(
1988 "default".to_string(),
1989 BackendConfig::Tes(Default::default()),
1990 )]
1991 .into(),
1992 ..Default::default()
1993 };
1994 assert_eq!(
1995 config.validate().await.unwrap_err().to_string(),
1996 "TES backend configuration value `url` is required"
1997 );
1998
1999 let config = Config {
2001 backends: [(
2002 "default".to_string(),
2003 BackendConfig::Tes(TesBackendConfig {
2004 url: Some("https://example.com".parse().unwrap()),
2005 max_concurrency: Some(0),
2006 ..Default::default()
2007 }),
2008 )]
2009 .into(),
2010 ..Default::default()
2011 };
2012 assert_eq!(
2013 config.validate().await.unwrap_err().to_string(),
2014 "TES backend configuration value `max_concurrency` cannot be zero"
2015 );
2016
2017 let config = Config {
2019 backends: [(
2020 "default".to_string(),
2021 BackendConfig::Tes(TesBackendConfig {
2022 url: Some("http://example.com".parse().unwrap()),
2023 inputs: Some("http://example.com".parse().unwrap()),
2024 outputs: Some("http://example.com".parse().unwrap()),
2025 ..Default::default()
2026 }),
2027 )]
2028 .into(),
2029 ..Default::default()
2030 };
2031 assert_eq!(
2032 config.validate().await.unwrap_err().to_string(),
2033 "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
2034 must use a HTTPS scheme"
2035 );
2036
2037 let config = Config {
2039 backends: [(
2040 "default".to_string(),
2041 BackendConfig::Tes(TesBackendConfig {
2042 url: Some("http://example.com".parse().unwrap()),
2043 inputs: Some("http://example.com".parse().unwrap()),
2044 outputs: Some("http://example.com".parse().unwrap()),
2045 insecure: true,
2046 ..Default::default()
2047 }),
2048 )]
2049 .into(),
2050 ..Default::default()
2051 };
2052 config
2053 .validate()
2054 .await
2055 .expect("configuration should validate");
2056
2057 let mut config = Config::default();
2058 config.http.parallelism = Some(0);
2059 assert_eq!(
2060 config.validate().await.unwrap_err().to_string(),
2061 "configuration value `http.parallelism` cannot be zero"
2062 );
2063
2064 let mut config = Config::default();
2065 config.http.parallelism = Some(5);
2066 assert!(
2067 config.validate().await.is_ok(),
2068 "should pass for valid configuration"
2069 );
2070
2071 let mut config = Config::default();
2072 config.http.parallelism = None;
2073 assert!(
2074 config.validate().await.is_ok(),
2075 "should pass for default (None)"
2076 );
2077
2078 #[cfg(unix)]
2080 {
2081 let job_name_prefix = "A".repeat(MAX_LSF_JOB_NAME_PREFIX * 2);
2082 let mut config = Config {
2083 experimental_features_enabled: true,
2084 ..Default::default()
2085 };
2086 config.backends.insert(
2087 "default".to_string(),
2088 BackendConfig::LsfApptainer(LsfApptainerBackendConfig {
2089 job_name_prefix: Some(job_name_prefix.clone()),
2090 ..Default::default()
2091 }),
2092 );
2093 assert_eq!(
2094 config.validate().await.unwrap_err().to_string(),
2095 format!("LSF job name prefix `{job_name_prefix}` exceeds the maximum 100 bytes")
2096 );
2097 }
2098 }
2099}