1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::path::Path;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use anyhow::ensure;
14use bytesize::ByteSize;
15use indexmap::IndexMap;
16use secrecy::ExposeSecret;
17use serde::Deserialize;
18use serde::Serialize;
19use tokio::process::Command;
20use tracing::error;
21use tracing::warn;
22use url::Url;
23
24use crate::CancellationContext;
25use crate::Events;
26use crate::SYSTEM;
27use crate::Value;
28use crate::backend::TaskExecutionBackend;
29use crate::convert_unit_string;
30use crate::path::is_supported_url;
31
32pub(crate) const MAX_RETRIES: u64 = 100;
34
35pub(crate) const DEFAULT_TASK_SHELL: &str = "bash";
37
38pub(crate) const DEFAULT_BACKEND_NAME: &str = "default";
40
41const MAX_LSF_JOB_NAME_PREFIX: usize = 100;
43
44const REDACTED: &str = "<REDACTED>";
46
47pub(crate) fn cache_dir() -> Result<PathBuf> {
49 const CACHE_DIR_ROOT: &str = "sprocket";
51
52 Ok(dirs::cache_dir()
53 .context("failed to determine user cache directory")?
54 .join(CACHE_DIR_ROOT))
55}
56
57#[derive(Debug, Clone)]
61pub struct SecretString {
62 inner: secrecy::SecretString,
66 redacted: bool,
73}
74
75impl SecretString {
76 pub fn redact(&mut self) {
81 self.redacted = true;
82 }
83
84 pub fn unredact(&mut self) {
86 self.redacted = false;
87 }
88
89 pub fn inner(&self) -> &secrecy::SecretString {
91 &self.inner
92 }
93}
94
95impl From<String> for SecretString {
96 fn from(s: String) -> Self {
97 Self {
98 inner: s.into(),
99 redacted: true,
100 }
101 }
102}
103
104impl From<&str> for SecretString {
105 fn from(s: &str) -> Self {
106 Self {
107 inner: s.into(),
108 redacted: true,
109 }
110 }
111}
112
113impl Default for SecretString {
114 fn default() -> Self {
115 Self {
116 inner: Default::default(),
117 redacted: true,
118 }
119 }
120}
121
122impl serde::Serialize for SecretString {
123 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
124 where
125 S: serde::Serializer,
126 {
127 use secrecy::ExposeSecret;
128
129 if self.redacted {
130 serializer.serialize_str(REDACTED)
131 } else {
132 serializer.serialize_str(self.inner.expose_secret())
133 }
134 }
135}
136
137impl<'de> serde::Deserialize<'de> for SecretString {
138 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
139 where
140 D: serde::Deserializer<'de>,
141 {
142 let inner = secrecy::SecretString::deserialize(deserializer)?;
143 Ok(Self {
144 inner,
145 redacted: true,
146 })
147 }
148}
149
150#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
153#[serde(rename_all = "snake_case")]
154pub enum FailureMode {
155 #[default]
158 Slow,
159 Fast,
163}
164
165#[derive(Debug, Default, Clone, Serialize, Deserialize)]
176#[serde(rename_all = "snake_case", deny_unknown_fields)]
177pub struct Config {
178 #[serde(default)]
180 pub http: HttpConfig,
181 #[serde(default)]
183 pub workflow: WorkflowConfig,
184 #[serde(default)]
186 pub task: TaskConfig,
187 #[serde(skip_serializing_if = "Option::is_none")]
192 pub backend: Option<String>,
193 #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
201 pub backends: IndexMap<String, BackendConfig>,
202 #[serde(default)]
204 pub storage: StorageConfig,
205 #[serde(default)]
219 pub suppress_env_specific_output: bool,
220 #[serde(default)]
227 pub experimental_features_enabled: bool,
228 #[serde(default, rename = "fail")]
236 pub failure_mode: FailureMode,
237}
238
239impl Config {
240 pub async fn validate(&self) -> Result<()> {
242 self.http.validate()?;
243 self.workflow.validate()?;
244 self.task.validate()?;
245
246 if self.backend.is_none() && self.backends.len() < 2 {
247 } else {
250 let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
252 if !self.backends.contains_key(backend) {
253 bail!("a backend named `{backend}` is not present in the configuration");
254 }
255 }
256
257 for backend in self.backends.values() {
258 backend.validate(self).await?;
259 }
260
261 self.storage.validate()?;
262
263 if self.suppress_env_specific_output && !self.experimental_features_enabled {
264 bail!("`suppress_env_specific_output` requires enabling experimental features");
265 }
266
267 Ok(())
268 }
269
270 pub fn redact(&mut self) {
274 for backend in self.backends.values_mut() {
275 backend.redact();
276 }
277
278 if let Some(auth) = &mut self.storage.azure.auth {
279 auth.redact();
280 }
281
282 if let Some(auth) = &mut self.storage.s3.auth {
283 auth.redact();
284 }
285
286 if let Some(auth) = &mut self.storage.google.auth {
287 auth.redact();
288 }
289 }
290
291 pub fn unredact(&mut self) {
295 for backend in self.backends.values_mut() {
296 backend.unredact();
297 }
298
299 if let Some(auth) = &mut self.storage.azure.auth {
300 auth.unredact();
301 }
302
303 if let Some(auth) = &mut self.storage.s3.auth {
304 auth.unredact();
305 }
306
307 if let Some(auth) = &mut self.storage.google.auth {
308 auth.unredact();
309 }
310 }
311
312 pub fn backend(&self) -> Result<Cow<'_, BackendConfig>> {
317 if self.backend.is_some() || self.backends.len() >= 2 {
318 let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
320 return Ok(Cow::Borrowed(self.backends.get(backend).ok_or_else(
321 || anyhow!("a backend named `{backend}` is not present in the configuration"),
322 )?));
323 }
324
325 if self.backends.len() == 1 {
326 Ok(Cow::Borrowed(self.backends.values().next().unwrap()))
328 } else {
329 Ok(Cow::Owned(BackendConfig::default()))
331 }
332 }
333
334 pub(crate) async fn create_backend(
336 self: &Arc<Self>,
337 run_root_dir: &Path,
338 events: Events,
339 cancellation: CancellationContext,
340 ) -> Result<Arc<dyn TaskExecutionBackend>> {
341 use crate::backend::*;
342
343 match self.backend()?.as_ref() {
344 BackendConfig::Local(_) => {
345 warn!(
346 "the engine is configured to use the local backend: tasks will not be run \
347 inside of a container"
348 );
349 Ok(Arc::new(LocalBackend::new(
350 self.clone(),
351 events,
352 cancellation,
353 )?))
354 }
355 BackendConfig::Docker(_) => Ok(Arc::new(
356 DockerBackend::new(self.clone(), events, cancellation).await?,
357 )),
358 BackendConfig::Tes(_) => Ok(Arc::new(
359 TesBackend::new(self.clone(), events, cancellation).await?,
360 )),
361 BackendConfig::LsfApptainer(_) => Ok(Arc::new(LsfApptainerBackend::new(
362 self.clone(),
363 run_root_dir,
364 events,
365 cancellation,
366 )?)),
367 BackendConfig::SlurmApptainer(_) => Ok(Arc::new(SlurmApptainerBackend::new(
368 self.clone(),
369 run_root_dir,
370 events,
371 cancellation,
372 )?)),
373 }
374 }
375}
376
377#[derive(Debug, Default, Clone, Serialize, Deserialize)]
379#[serde(rename_all = "snake_case", deny_unknown_fields)]
380pub struct HttpConfig {
381 #[serde(default, skip_serializing_if = "Option::is_none")]
385 pub cache_dir: Option<PathBuf>,
386 #[serde(default, skip_serializing_if = "Option::is_none")]
390 pub retries: Option<usize>,
391 #[serde(default, skip_serializing_if = "Option::is_none")]
395 pub parallelism: Option<usize>,
396}
397
398impl HttpConfig {
399 pub fn validate(&self) -> Result<()> {
401 if let Some(parallelism) = self.parallelism
402 && parallelism == 0
403 {
404 bail!("configuration value `http.parallelism` cannot be zero");
405 }
406 Ok(())
407 }
408}
409
410#[derive(Debug, Default, Clone, Serialize, Deserialize)]
412#[serde(rename_all = "snake_case", deny_unknown_fields)]
413pub struct StorageConfig {
414 #[serde(default)]
416 pub azure: AzureStorageConfig,
417 #[serde(default)]
419 pub s3: S3StorageConfig,
420 #[serde(default)]
422 pub google: GoogleStorageConfig,
423}
424
425impl StorageConfig {
426 pub fn validate(&self) -> Result<()> {
428 self.azure.validate()?;
429 self.s3.validate()?;
430 self.google.validate()?;
431 Ok(())
432 }
433}
434
435#[derive(Debug, Default, Clone, Serialize, Deserialize)]
437#[serde(rename_all = "snake_case", deny_unknown_fields)]
438pub struct AzureStorageAuthConfig {
439 pub account_name: String,
441 pub access_key: SecretString,
443}
444
445impl AzureStorageAuthConfig {
446 pub fn validate(&self) -> Result<()> {
448 if self.account_name.is_empty() {
449 bail!("configuration value `storage.azure.auth.account_name` is required");
450 }
451
452 if self.access_key.inner.expose_secret().is_empty() {
453 bail!("configuration value `storage.azure.auth.access_key` is required");
454 }
455
456 Ok(())
457 }
458
459 pub fn redact(&mut self) {
462 self.access_key.redact();
463 }
464
465 pub fn unredact(&mut self) {
468 self.access_key.unredact();
469 }
470}
471
472#[derive(Debug, Default, Clone, Serialize, Deserialize)]
474#[serde(rename_all = "snake_case", deny_unknown_fields)]
475pub struct AzureStorageConfig {
476 #[serde(default, skip_serializing_if = "Option::is_none")]
478 pub auth: Option<AzureStorageAuthConfig>,
479}
480
481impl AzureStorageConfig {
482 pub fn validate(&self) -> Result<()> {
484 if let Some(auth) = &self.auth {
485 auth.validate()?;
486 }
487
488 Ok(())
489 }
490}
491
492#[derive(Debug, Default, Clone, Serialize, Deserialize)]
494#[serde(rename_all = "snake_case", deny_unknown_fields)]
495pub struct S3StorageAuthConfig {
496 pub access_key_id: String,
498 pub secret_access_key: SecretString,
500}
501
502impl S3StorageAuthConfig {
503 pub fn validate(&self) -> Result<()> {
505 if self.access_key_id.is_empty() {
506 bail!("configuration value `storage.s3.auth.access_key_id` is required");
507 }
508
509 if self.secret_access_key.inner.expose_secret().is_empty() {
510 bail!("configuration value `storage.s3.auth.secret_access_key` is required");
511 }
512
513 Ok(())
514 }
515
516 pub fn redact(&mut self) {
519 self.secret_access_key.redact();
520 }
521
522 pub fn unredact(&mut self) {
525 self.secret_access_key.unredact();
526 }
527}
528
529#[derive(Debug, Default, Clone, Serialize, Deserialize)]
531#[serde(rename_all = "snake_case", deny_unknown_fields)]
532pub struct S3StorageConfig {
533 #[serde(default, skip_serializing_if = "Option::is_none")]
538 pub region: Option<String>,
539
540 #[serde(default, skip_serializing_if = "Option::is_none")]
542 pub auth: Option<S3StorageAuthConfig>,
543}
544
545impl S3StorageConfig {
546 pub fn validate(&self) -> Result<()> {
548 if let Some(auth) = &self.auth {
549 auth.validate()?;
550 }
551
552 Ok(())
553 }
554}
555
556#[derive(Debug, Default, Clone, Serialize, Deserialize)]
558#[serde(rename_all = "snake_case", deny_unknown_fields)]
559pub struct GoogleStorageAuthConfig {
560 pub access_key: String,
562 pub secret: SecretString,
564}
565
566impl GoogleStorageAuthConfig {
567 pub fn validate(&self) -> Result<()> {
569 if self.access_key.is_empty() {
570 bail!("configuration value `storage.google.auth.access_key` is required");
571 }
572
573 if self.secret.inner.expose_secret().is_empty() {
574 bail!("configuration value `storage.google.auth.secret` is required");
575 }
576
577 Ok(())
578 }
579
580 pub fn redact(&mut self) {
583 self.secret.redact();
584 }
585
586 pub fn unredact(&mut self) {
589 self.secret.unredact();
590 }
591}
592
593#[derive(Debug, Default, Clone, Serialize, Deserialize)]
595#[serde(rename_all = "snake_case", deny_unknown_fields)]
596pub struct GoogleStorageConfig {
597 #[serde(default, skip_serializing_if = "Option::is_none")]
599 pub auth: Option<GoogleStorageAuthConfig>,
600}
601
602impl GoogleStorageConfig {
603 pub fn validate(&self) -> Result<()> {
605 if let Some(auth) = &self.auth {
606 auth.validate()?;
607 }
608
609 Ok(())
610 }
611}
612
613#[derive(Debug, Default, Clone, Serialize, Deserialize)]
615#[serde(rename_all = "snake_case", deny_unknown_fields)]
616pub struct WorkflowConfig {
617 #[serde(default)]
619 pub scatter: ScatterConfig,
620}
621
622impl WorkflowConfig {
623 pub fn validate(&self) -> Result<()> {
625 self.scatter.validate()?;
626 Ok(())
627 }
628}
629
630#[derive(Debug, Default, Clone, Serialize, Deserialize)]
632#[serde(rename_all = "snake_case", deny_unknown_fields)]
633pub struct ScatterConfig {
634 #[serde(default, skip_serializing_if = "Option::is_none")]
687 pub concurrency: Option<u64>,
688}
689
690impl ScatterConfig {
691 pub fn validate(&self) -> Result<()> {
693 if let Some(concurrency) = self.concurrency
694 && concurrency == 0
695 {
696 bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
697 }
698
699 Ok(())
700 }
701}
702
703#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
705#[serde(rename_all = "snake_case")]
706pub enum CallCachingMode {
707 #[default]
714 Off,
715 On,
721 Explicit,
729}
730
731#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
733#[serde(rename_all = "snake_case")]
734pub enum ContentDigestMode {
735 Strong,
742 #[default]
753 Weak,
754}
755
756#[derive(Debug, Default, Clone, Serialize, Deserialize)]
758#[serde(rename_all = "snake_case", deny_unknown_fields)]
759pub struct TaskConfig {
760 #[serde(default, skip_serializing_if = "Option::is_none")]
766 pub retries: Option<u64>,
767 #[serde(default, skip_serializing_if = "Option::is_none")]
772 pub container: Option<String>,
773 #[serde(default, skip_serializing_if = "Option::is_none")]
790 pub shell: Option<String>,
791 #[serde(default)]
793 pub cpu_limit_behavior: TaskResourceLimitBehavior,
794 #[serde(default)]
796 pub memory_limit_behavior: TaskResourceLimitBehavior,
797 #[serde(default, skip_serializing_if = "Option::is_none")]
801 pub cache_dir: Option<PathBuf>,
802 #[serde(default)]
804 pub cache: CallCachingMode,
805 #[serde(default)]
809 pub digests: ContentDigestMode,
810}
811
812impl TaskConfig {
813 pub fn validate(&self) -> Result<()> {
815 if self.retries.unwrap_or(0) > MAX_RETRIES {
816 bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
817 }
818
819 Ok(())
820 }
821}
822
823#[derive(Debug, Default, Clone, Serialize, Deserialize)]
826#[serde(rename_all = "snake_case", deny_unknown_fields)]
827pub enum TaskResourceLimitBehavior {
828 TryWithMax,
831 #[default]
835 Deny,
836}
837
838#[derive(Debug, Clone, Serialize, Deserialize)]
840#[serde(rename_all = "snake_case", tag = "type")]
841pub enum BackendConfig {
842 Local(LocalBackendConfig),
844 Docker(DockerBackendConfig),
846 Tes(TesBackendConfig),
848 LsfApptainer(LsfApptainerBackendConfig),
852 SlurmApptainer(SlurmApptainerBackendConfig),
856}
857
858impl Default for BackendConfig {
859 fn default() -> Self {
860 Self::Docker(Default::default())
861 }
862}
863
864impl BackendConfig {
865 pub async fn validate(&self, engine_config: &Config) -> Result<()> {
867 match self {
868 Self::Local(config) => config.validate(),
869 Self::Docker(config) => config.validate(),
870 Self::Tes(config) => config.validate(),
871 Self::LsfApptainer(config) => config.validate(engine_config).await,
872 Self::SlurmApptainer(config) => config.validate(engine_config).await,
873 }
874 }
875
876 pub fn as_local(&self) -> Option<&LocalBackendConfig> {
880 match self {
881 Self::Local(config) => Some(config),
882 _ => None,
883 }
884 }
885
886 pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
890 match self {
891 Self::Docker(config) => Some(config),
892 _ => None,
893 }
894 }
895
896 pub fn as_tes(&self) -> Option<&TesBackendConfig> {
900 match self {
901 Self::Tes(config) => Some(config),
902 _ => None,
903 }
904 }
905
906 pub fn as_lsf_apptainer(&self) -> Option<&LsfApptainerBackendConfig> {
911 match self {
912 Self::LsfApptainer(config) => Some(config),
913 _ => None,
914 }
915 }
916
917 pub fn as_slurm_apptainer(&self) -> Option<&SlurmApptainerBackendConfig> {
922 match self {
923 Self::SlurmApptainer(config) => Some(config),
924 _ => None,
925 }
926 }
927
928 pub fn redact(&mut self) {
930 match self {
931 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
932 Self::Tes(config) => config.redact(),
933 }
934 }
935
936 pub fn unredact(&mut self) {
938 match self {
939 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
940 Self::Tes(config) => config.unredact(),
941 }
942 }
943}
944
945#[derive(Debug, Default, Clone, Serialize, Deserialize)]
952#[serde(rename_all = "snake_case", deny_unknown_fields)]
953pub struct LocalBackendConfig {
954 #[serde(default, skip_serializing_if = "Option::is_none")]
960 pub cpu: Option<u64>,
961
962 #[serde(default, skip_serializing_if = "Option::is_none")]
969 pub memory: Option<String>,
970}
971
972impl LocalBackendConfig {
973 pub fn validate(&self) -> Result<()> {
975 if let Some(cpu) = self.cpu {
976 if cpu == 0 {
977 bail!("local backend configuration value `cpu` cannot be zero");
978 }
979
980 let total = SYSTEM.cpus().len() as u64;
981 if cpu > total {
982 bail!(
983 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
984 available to the host ({total})"
985 );
986 }
987 }
988
989 if let Some(memory) = &self.memory {
990 let memory = convert_unit_string(memory).with_context(|| {
991 format!("local backend configuration value `memory` has invalid value `{memory}`")
992 })?;
993
994 if memory == 0 {
995 bail!("local backend configuration value `memory` cannot be zero");
996 }
997
998 let total = SYSTEM.total_memory();
999 if memory > total {
1000 bail!(
1001 "local backend configuration value `memory` cannot exceed the total memory of \
1002 the host ({total} bytes)"
1003 );
1004 }
1005 }
1006
1007 Ok(())
1008 }
1009}
1010
1011const fn cleanup_default() -> bool {
1013 true
1014}
1015
1016#[derive(Debug, Clone, Serialize, Deserialize)]
1018#[serde(rename_all = "snake_case", deny_unknown_fields)]
1019pub struct DockerBackendConfig {
1020 #[serde(default = "cleanup_default")]
1024 pub cleanup: bool,
1025}
1026
1027impl DockerBackendConfig {
1028 pub fn validate(&self) -> Result<()> {
1030 Ok(())
1031 }
1032}
1033
1034impl Default for DockerBackendConfig {
1035 fn default() -> Self {
1036 Self { cleanup: true }
1037 }
1038}
1039
1040#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1042#[serde(rename_all = "snake_case", deny_unknown_fields)]
1043pub struct BasicAuthConfig {
1044 #[serde(default)]
1046 pub username: String,
1047 #[serde(default)]
1049 pub password: SecretString,
1050}
1051
1052impl BasicAuthConfig {
1053 pub fn validate(&self) -> Result<()> {
1055 Ok(())
1056 }
1057
1058 pub fn redact(&mut self) {
1060 self.password.redact();
1061 }
1062
1063 pub fn unredact(&mut self) {
1065 self.password.unredact();
1066 }
1067}
1068
1069#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1071#[serde(rename_all = "snake_case", deny_unknown_fields)]
1072pub struct BearerAuthConfig {
1073 #[serde(default)]
1075 pub token: SecretString,
1076}
1077
1078impl BearerAuthConfig {
1079 pub fn validate(&self) -> Result<()> {
1081 Ok(())
1082 }
1083
1084 pub fn redact(&mut self) {
1086 self.token.redact();
1087 }
1088
1089 pub fn unredact(&mut self) {
1091 self.token.unredact();
1092 }
1093}
1094
1095#[derive(Debug, Clone, Serialize, Deserialize)]
1097#[serde(rename_all = "snake_case", tag = "type")]
1098pub enum TesBackendAuthConfig {
1099 Basic(BasicAuthConfig),
1101 Bearer(BearerAuthConfig),
1103}
1104
1105impl TesBackendAuthConfig {
1106 pub fn validate(&self) -> Result<()> {
1108 match self {
1109 Self::Basic(config) => config.validate(),
1110 Self::Bearer(config) => config.validate(),
1111 }
1112 }
1113
1114 pub fn redact(&mut self) {
1117 match self {
1118 Self::Basic(auth) => auth.redact(),
1119 Self::Bearer(auth) => auth.redact(),
1120 }
1121 }
1122
1123 pub fn unredact(&mut self) {
1126 match self {
1127 Self::Basic(auth) => auth.unredact(),
1128 Self::Bearer(auth) => auth.unredact(),
1129 }
1130 }
1131}
1132
1133#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1135#[serde(rename_all = "snake_case", deny_unknown_fields)]
1136pub struct TesBackendConfig {
1137 #[serde(default, skip_serializing_if = "Option::is_none")]
1139 pub url: Option<Url>,
1140
1141 #[serde(default, skip_serializing_if = "Option::is_none")]
1143 pub auth: Option<TesBackendAuthConfig>,
1144
1145 #[serde(default, skip_serializing_if = "Option::is_none")]
1147 pub inputs: Option<Url>,
1148
1149 #[serde(default, skip_serializing_if = "Option::is_none")]
1151 pub outputs: Option<Url>,
1152
1153 #[serde(default, skip_serializing_if = "Option::is_none")]
1157 pub interval: Option<u64>,
1158
1159 pub retries: Option<u32>,
1164
1165 #[serde(default, skip_serializing_if = "Option::is_none")]
1170 pub max_concurrency: Option<u32>,
1171
1172 #[serde(default)]
1175 pub insecure: bool,
1176}
1177
1178impl TesBackendConfig {
1179 pub fn validate(&self) -> Result<()> {
1181 match &self.url {
1182 Some(url) => {
1183 if !self.insecure && url.scheme() != "https" {
1184 bail!(
1185 "TES backend configuration value `url` has invalid value `{url}`: URL \
1186 must use a HTTPS scheme"
1187 );
1188 }
1189 }
1190 None => bail!("TES backend configuration value `url` is required"),
1191 }
1192
1193 if let Some(auth) = &self.auth {
1194 auth.validate()?;
1195 }
1196
1197 if let Some(max_concurrency) = self.max_concurrency
1198 && max_concurrency == 0
1199 {
1200 bail!("TES backend configuration value `max_concurrency` cannot be zero");
1201 }
1202
1203 match &self.inputs {
1204 Some(url) => {
1205 if !is_supported_url(url.as_str()) {
1206 bail!(
1207 "TES backend storage configuration value `inputs` has invalid value \
1208 `{url}`: URL scheme is not supported"
1209 );
1210 }
1211
1212 if !url.path().ends_with('/') {
1213 bail!(
1214 "TES backend storage configuration value `inputs` has invalid value \
1215 `{url}`: URL path must end with a slash"
1216 );
1217 }
1218 }
1219 None => bail!("TES backend configuration value `inputs` is required"),
1220 }
1221
1222 match &self.outputs {
1223 Some(url) => {
1224 if !is_supported_url(url.as_str()) {
1225 bail!(
1226 "TES backend storage configuration value `outputs` has invalid value \
1227 `{url}`: URL scheme is not supported"
1228 );
1229 }
1230
1231 if !url.path().ends_with('/') {
1232 bail!(
1233 "TES backend storage configuration value `outputs` has invalid value \
1234 `{url}`: URL path must end with a slash"
1235 );
1236 }
1237 }
1238 None => bail!("TES backend storage configuration value `outputs` is required"),
1239 }
1240
1241 Ok(())
1242 }
1243
1244 pub fn redact(&mut self) {
1246 if let Some(auth) = &mut self.auth {
1247 auth.redact();
1248 }
1249 }
1250
1251 pub fn unredact(&mut self) {
1253 if let Some(auth) = &mut self.auth {
1254 auth.unredact();
1255 }
1256 }
1257}
1258
1259#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)]
1261#[serde(rename_all = "snake_case", deny_unknown_fields)]
1262pub struct ApptainerConfig {
1263 pub extra_apptainer_exec_args: Option<Vec<String>>,
1266}
1267
1268impl ApptainerConfig {
1269 pub async fn validate(&self) -> Result<(), anyhow::Error> {
1271 Ok(())
1272 }
1273}
1274
1275#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1284#[serde(rename_all = "snake_case", deny_unknown_fields)]
1285pub struct LsfQueueConfig {
1286 pub name: String,
1289 pub max_cpu_per_task: Option<u64>,
1291 pub max_memory_per_task: Option<ByteSize>,
1293}
1294
1295impl LsfQueueConfig {
1296 pub async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1298 let queue = &self.name;
1299 ensure!(!queue.is_empty(), "{name}_lsf_queue name cannot be empty");
1300 if let Some(max_cpu_per_task) = self.max_cpu_per_task {
1301 ensure!(
1302 max_cpu_per_task > 0,
1303 "{name}_lsf_queue `{queue}` must allow at least 1 CPU to be provisioned"
1304 );
1305 }
1306 if let Some(max_memory_per_task) = self.max_memory_per_task {
1307 ensure!(
1308 max_memory_per_task.as_u64() > 0,
1309 "{name}_lsf_queue `{queue}` must allow at least some memory to be provisioned"
1310 );
1311 }
1312 match tokio::time::timeout(
1313 std::time::Duration::from_secs(10),
1316 Command::new("bqueues").arg(queue).output(),
1317 )
1318 .await
1319 {
1320 Ok(output) => {
1321 let output = output.context("validating LSF queue")?;
1322 if !output.status.success() {
1323 let stdout = String::from_utf8_lossy(&output.stdout);
1324 let stderr = String::from_utf8_lossy(&output.stderr);
1325 error!(%stdout, %stderr, %queue, "failed to validate {name}_lsf_queue");
1326 Err(anyhow!("failed to validate {name}_lsf_queue `{queue}`"))
1327 } else {
1328 Ok(())
1329 }
1330 }
1331 Err(_) => Err(anyhow!(
1332 "timed out trying to validate {name}_lsf_queue `{queue}`"
1333 )),
1334 }
1335 }
1336}
1337
1338#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1342#[serde(rename_all = "snake_case", deny_unknown_fields)]
1343pub struct LsfApptainerBackendConfig {
1344 #[serde(default, skip_serializing_if = "Option::is_none")]
1348 pub interval: Option<u64>,
1349 #[serde(default, skip_serializing_if = "Option::is_none")]
1357 pub max_concurrency: Option<u32>,
1358 pub default_lsf_queue: Option<LsfQueueConfig>,
1365 pub short_task_lsf_queue: Option<LsfQueueConfig>,
1372 pub gpu_lsf_queue: Option<LsfQueueConfig>,
1376 pub fpga_lsf_queue: Option<LsfQueueConfig>,
1380 pub extra_bsub_args: Option<Vec<String>>,
1383 #[serde(default, skip_serializing_if = "Option::is_none")]
1386 pub job_name_prefix: Option<String>,
1387 #[serde(default)]
1394 #[serde(flatten)]
1398 pub apptainer_config: ApptainerConfig,
1399}
1400
1401impl LsfApptainerBackendConfig {
1402 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1404 if cfg!(not(unix)) {
1405 bail!("LSF + Apptainer backend is not supported on non-unix platforms");
1406 }
1407
1408 if !engine_config.experimental_features_enabled {
1409 bail!("LSF + Apptainer backend requires enabling experimental features");
1410 }
1411
1412 if let Some(queue) = &self.default_lsf_queue {
1418 queue.validate("default").await?;
1419 }
1420
1421 if let Some(queue) = &self.short_task_lsf_queue {
1422 queue.validate("short_task").await?;
1423 }
1424
1425 if let Some(queue) = &self.gpu_lsf_queue {
1426 queue.validate("gpu").await?;
1427 }
1428
1429 if let Some(queue) = &self.fpga_lsf_queue {
1430 queue.validate("fpga").await?;
1431 }
1432
1433 if let Some(prefix) = &self.job_name_prefix
1434 && prefix.len() > MAX_LSF_JOB_NAME_PREFIX
1435 {
1436 bail!(
1437 "LSF job name prefix `{prefix}` exceeds the maximum {MAX_LSF_JOB_NAME_PREFIX} \
1438 bytes"
1439 );
1440 }
1441
1442 self.apptainer_config.validate().await?;
1443
1444 Ok(())
1445 }
1446
1447 pub(crate) fn lsf_queue_for_task(
1452 &self,
1453 requirements: &HashMap<String, Value>,
1454 hints: &HashMap<String, Value>,
1455 ) -> Option<&LsfQueueConfig> {
1456 if let Some(queue) = self.fpga_lsf_queue.as_ref()
1458 && let Some(true) = requirements
1459 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1460 .and_then(Value::as_boolean)
1461 {
1462 return Some(queue);
1463 }
1464
1465 if let Some(queue) = self.gpu_lsf_queue.as_ref()
1466 && let Some(true) = requirements
1467 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1468 .and_then(Value::as_boolean)
1469 {
1470 return Some(queue);
1471 }
1472
1473 if let Some(queue) = self.short_task_lsf_queue.as_ref()
1475 && let Some(true) = hints
1476 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1477 .and_then(Value::as_boolean)
1478 {
1479 return Some(queue);
1480 }
1481
1482 self.default_lsf_queue.as_ref()
1485 }
1486}
1487
1488#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1497#[serde(rename_all = "snake_case", deny_unknown_fields)]
1498pub struct SlurmPartitionConfig {
1499 pub name: String,
1502 pub max_cpu_per_task: Option<u64>,
1505 pub max_memory_per_task: Option<ByteSize>,
1507}
1508
1509impl SlurmPartitionConfig {
1510 pub async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1513 let partition = &self.name;
1514 ensure!(
1515 !partition.is_empty(),
1516 "{name}_slurm_partition name cannot be empty"
1517 );
1518 if let Some(max_cpu_per_task) = self.max_cpu_per_task {
1519 ensure!(
1520 max_cpu_per_task > 0,
1521 "{name}_slurm_partition `{partition}` must allow at least 1 CPU to be provisioned"
1522 );
1523 }
1524 if let Some(max_memory_per_task) = self.max_memory_per_task {
1525 ensure!(
1526 max_memory_per_task.as_u64() > 0,
1527 "{name}_slurm_partition `{partition}` must allow at least some memory to be \
1528 provisioned"
1529 );
1530 }
1531 match tokio::time::timeout(
1532 std::time::Duration::from_secs(10),
1535 Command::new("scontrol")
1536 .arg("show")
1537 .arg("partition")
1538 .arg(partition)
1539 .output(),
1540 )
1541 .await
1542 {
1543 Ok(output) => {
1544 let output = output.context("validating Slurm partition")?;
1545 if !output.status.success() {
1546 let stdout = String::from_utf8_lossy(&output.stdout);
1547 let stderr = String::from_utf8_lossy(&output.stderr);
1548 error!(%stdout, %stderr, %partition, "failed to validate {name}_slurm_partition");
1549 Err(anyhow!(
1550 "failed to validate {name}_slurm_partition `{partition}`"
1551 ))
1552 } else {
1553 Ok(())
1554 }
1555 }
1556 Err(_) => Err(anyhow!(
1557 "timed out trying to validate {name}_slurm_partition `{partition}`"
1558 )),
1559 }
1560 }
1561}
1562
1563#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1567#[serde(rename_all = "snake_case", deny_unknown_fields)]
1568pub struct SlurmApptainerBackendConfig {
1569 pub default_slurm_partition: Option<SlurmPartitionConfig>,
1578 pub short_task_slurm_partition: Option<SlurmPartitionConfig>,
1586 pub gpu_slurm_partition: Option<SlurmPartitionConfig>,
1590 pub fpga_slurm_partition: Option<SlurmPartitionConfig>,
1594 pub extra_sbatch_args: Option<Vec<String>>,
1597 #[serde(default)]
1604 #[serde(flatten)]
1608 pub apptainer_config: ApptainerConfig,
1609}
1610
1611impl SlurmApptainerBackendConfig {
1612 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1614 if cfg!(not(unix)) {
1615 bail!("Slurm + Apptainer backend is not supported on non-unix platforms");
1616 }
1617 if !engine_config.experimental_features_enabled {
1618 bail!("Slurm + Apptainer backend requires enabling experimental features");
1619 }
1620
1621 if let Some(partition) = &self.default_slurm_partition {
1627 partition.validate("default").await?;
1628 }
1629 if let Some(partition) = &self.short_task_slurm_partition {
1630 partition.validate("short_task").await?;
1631 }
1632 if let Some(partition) = &self.gpu_slurm_partition {
1633 partition.validate("gpu").await?;
1634 }
1635 if let Some(partition) = &self.fpga_slurm_partition {
1636 partition.validate("fpga").await?;
1637 }
1638
1639 self.apptainer_config.validate().await?;
1640
1641 Ok(())
1642 }
1643
1644 pub(crate) fn slurm_partition_for_task(
1649 &self,
1650 requirements: &HashMap<String, Value>,
1651 hints: &HashMap<String, Value>,
1652 ) -> Option<&SlurmPartitionConfig> {
1653 if let Some(partition) = self.fpga_slurm_partition.as_ref()
1659 && let Some(true) = requirements
1660 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1661 .and_then(Value::as_boolean)
1662 {
1663 return Some(partition);
1664 }
1665
1666 if let Some(partition) = self.gpu_slurm_partition.as_ref()
1667 && let Some(true) = requirements
1668 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1669 .and_then(Value::as_boolean)
1670 {
1671 return Some(partition);
1672 }
1673
1674 if let Some(partition) = self.short_task_slurm_partition.as_ref()
1676 && let Some(true) = hints
1677 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1678 .and_then(Value::as_boolean)
1679 {
1680 return Some(partition);
1681 }
1682
1683 self.default_slurm_partition.as_ref()
1686 }
1687}
1688
1689#[cfg(test)]
1690mod test {
1691 use pretty_assertions::assert_eq;
1692
1693 use super::*;
1694
1695 #[test]
1696 fn redacted_secret() {
1697 let mut secret: SecretString = "secret".into();
1698
1699 assert_eq!(
1700 serde_json::to_string(&secret).unwrap(),
1701 format!(r#""{REDACTED}""#)
1702 );
1703
1704 secret.unredact();
1705 assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1706
1707 secret.redact();
1708 assert_eq!(
1709 serde_json::to_string(&secret).unwrap(),
1710 format!(r#""{REDACTED}""#)
1711 );
1712 }
1713
1714 #[test]
1715 fn redacted_config() {
1716 let config = Config {
1717 backends: [
1718 (
1719 "first".to_string(),
1720 BackendConfig::Tes(TesBackendConfig {
1721 auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1722 username: "foo".into(),
1723 password: "secret".into(),
1724 })),
1725 ..Default::default()
1726 }),
1727 ),
1728 (
1729 "second".to_string(),
1730 BackendConfig::Tes(TesBackendConfig {
1731 auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1732 token: "secret".into(),
1733 })),
1734 ..Default::default()
1735 }),
1736 ),
1737 ]
1738 .into(),
1739 storage: StorageConfig {
1740 azure: AzureStorageConfig {
1741 auth: Some(AzureStorageAuthConfig {
1742 account_name: "foo".into(),
1743 access_key: "secret".into(),
1744 }),
1745 },
1746 s3: S3StorageConfig {
1747 auth: Some(S3StorageAuthConfig {
1748 access_key_id: "foo".into(),
1749 secret_access_key: "secret".into(),
1750 }),
1751 ..Default::default()
1752 },
1753 google: GoogleStorageConfig {
1754 auth: Some(GoogleStorageAuthConfig {
1755 access_key: "foo".into(),
1756 secret: "secret".into(),
1757 }),
1758 },
1759 },
1760 ..Default::default()
1761 };
1762
1763 let json = serde_json::to_string_pretty(&config).unwrap();
1764 assert!(json.contains("secret"), "`{json}` contains a secret");
1765 }
1766
1767 #[tokio::test]
1768 async fn test_config_validate() {
1769 let mut config = Config::default();
1771 config.task.retries = Some(1000000);
1772 assert_eq!(
1773 config.validate().await.unwrap_err().to_string(),
1774 "configuration value `task.retries` cannot exceed 100"
1775 );
1776
1777 let mut config = Config::default();
1779 config.workflow.scatter.concurrency = Some(0);
1780 assert_eq!(
1781 config.validate().await.unwrap_err().to_string(),
1782 "configuration value `workflow.scatter.concurrency` cannot be zero"
1783 );
1784
1785 let config = Config {
1787 backend: Some("foo".into()),
1788 ..Default::default()
1789 };
1790 assert_eq!(
1791 config.validate().await.unwrap_err().to_string(),
1792 "a backend named `foo` is not present in the configuration"
1793 );
1794 let config = Config {
1795 backend: Some("bar".into()),
1796 backends: [("foo".to_string(), BackendConfig::default())].into(),
1797 ..Default::default()
1798 };
1799 assert_eq!(
1800 config.validate().await.unwrap_err().to_string(),
1801 "a backend named `bar` is not present in the configuration"
1802 );
1803
1804 let config = Config {
1806 backends: [("foo".to_string(), BackendConfig::default())].into(),
1807 ..Default::default()
1808 };
1809 config.validate().await.expect("config should validate");
1810
1811 let config = Config {
1813 backends: [(
1814 "default".to_string(),
1815 BackendConfig::Local(LocalBackendConfig {
1816 cpu: Some(0),
1817 ..Default::default()
1818 }),
1819 )]
1820 .into(),
1821 ..Default::default()
1822 };
1823 assert_eq!(
1824 config.validate().await.unwrap_err().to_string(),
1825 "local backend configuration value `cpu` cannot be zero"
1826 );
1827 let config = Config {
1828 backends: [(
1829 "default".to_string(),
1830 BackendConfig::Local(LocalBackendConfig {
1831 cpu: Some(10000000),
1832 ..Default::default()
1833 }),
1834 )]
1835 .into(),
1836 ..Default::default()
1837 };
1838 assert!(
1839 config
1840 .validate()
1841 .await
1842 .unwrap_err()
1843 .to_string()
1844 .starts_with(
1845 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1846 available to the host"
1847 )
1848 );
1849
1850 let config = Config {
1852 backends: [(
1853 "default".to_string(),
1854 BackendConfig::Local(LocalBackendConfig {
1855 memory: Some("0 GiB".to_string()),
1856 ..Default::default()
1857 }),
1858 )]
1859 .into(),
1860 ..Default::default()
1861 };
1862 assert_eq!(
1863 config.validate().await.unwrap_err().to_string(),
1864 "local backend configuration value `memory` cannot be zero"
1865 );
1866 let config = Config {
1867 backends: [(
1868 "default".to_string(),
1869 BackendConfig::Local(LocalBackendConfig {
1870 memory: Some("100 meows".to_string()),
1871 ..Default::default()
1872 }),
1873 )]
1874 .into(),
1875 ..Default::default()
1876 };
1877 assert_eq!(
1878 config.validate().await.unwrap_err().to_string(),
1879 "local backend configuration value `memory` has invalid value `100 meows`"
1880 );
1881
1882 let config = Config {
1883 backends: [(
1884 "default".to_string(),
1885 BackendConfig::Local(LocalBackendConfig {
1886 memory: Some("1000 TiB".to_string()),
1887 ..Default::default()
1888 }),
1889 )]
1890 .into(),
1891 ..Default::default()
1892 };
1893 assert!(
1894 config
1895 .validate()
1896 .await
1897 .unwrap_err()
1898 .to_string()
1899 .starts_with(
1900 "local backend configuration value `memory` cannot exceed the total memory of \
1901 the host"
1902 )
1903 );
1904
1905 let config = Config {
1907 backends: [(
1908 "default".to_string(),
1909 BackendConfig::Tes(Default::default()),
1910 )]
1911 .into(),
1912 ..Default::default()
1913 };
1914 assert_eq!(
1915 config.validate().await.unwrap_err().to_string(),
1916 "TES backend configuration value `url` is required"
1917 );
1918
1919 let config = Config {
1921 backends: [(
1922 "default".to_string(),
1923 BackendConfig::Tes(TesBackendConfig {
1924 url: Some("https://example.com".parse().unwrap()),
1925 max_concurrency: Some(0),
1926 ..Default::default()
1927 }),
1928 )]
1929 .into(),
1930 ..Default::default()
1931 };
1932 assert_eq!(
1933 config.validate().await.unwrap_err().to_string(),
1934 "TES backend configuration value `max_concurrency` cannot be zero"
1935 );
1936
1937 let config = Config {
1939 backends: [(
1940 "default".to_string(),
1941 BackendConfig::Tes(TesBackendConfig {
1942 url: Some("http://example.com".parse().unwrap()),
1943 inputs: Some("http://example.com".parse().unwrap()),
1944 outputs: Some("http://example.com".parse().unwrap()),
1945 ..Default::default()
1946 }),
1947 )]
1948 .into(),
1949 ..Default::default()
1950 };
1951 assert_eq!(
1952 config.validate().await.unwrap_err().to_string(),
1953 "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
1954 must use a HTTPS scheme"
1955 );
1956
1957 let config = Config {
1959 backends: [(
1960 "default".to_string(),
1961 BackendConfig::Tes(TesBackendConfig {
1962 url: Some("http://example.com".parse().unwrap()),
1963 inputs: Some("http://example.com".parse().unwrap()),
1964 outputs: Some("http://example.com".parse().unwrap()),
1965 insecure: true,
1966 ..Default::default()
1967 }),
1968 )]
1969 .into(),
1970 ..Default::default()
1971 };
1972 config
1973 .validate()
1974 .await
1975 .expect("configuration should validate");
1976
1977 let mut config = Config::default();
1978 config.http.parallelism = Some(0);
1979 assert_eq!(
1980 config.validate().await.unwrap_err().to_string(),
1981 "configuration value `http.parallelism` cannot be zero"
1982 );
1983
1984 let mut config = Config::default();
1985 config.http.parallelism = Some(5);
1986 assert!(
1987 config.validate().await.is_ok(),
1988 "should pass for valid configuration"
1989 );
1990
1991 let mut config = Config::default();
1992 config.http.parallelism = None;
1993 assert!(
1994 config.validate().await.is_ok(),
1995 "should pass for default (None)"
1996 );
1997
1998 #[cfg(unix)]
2000 {
2001 let job_name_prefix = "A".repeat(MAX_LSF_JOB_NAME_PREFIX * 2);
2002 let mut config = Config {
2003 experimental_features_enabled: true,
2004 ..Default::default()
2005 };
2006 config.backends.insert(
2007 "default".to_string(),
2008 BackendConfig::LsfApptainer(LsfApptainerBackendConfig {
2009 job_name_prefix: Some(job_name_prefix.clone()),
2010 ..Default::default()
2011 }),
2012 );
2013 assert_eq!(
2014 config.validate().await.unwrap_err().to_string(),
2015 format!("LSF job name prefix `{job_name_prefix}` exceeds the maximum 100 bytes")
2016 );
2017 }
2018 }
2019}