1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::path::Path;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use anyhow::ensure;
14use bytesize::ByteSize;
15use indexmap::IndexMap;
16use secrecy::ExposeSecret;
17use serde::Deserialize;
18use serde::Serialize;
19use tokio::process::Command;
20use tracing::error;
21use tracing::warn;
22use url::Url;
23
24use crate::CancellationContext;
25use crate::Events;
26use crate::SYSTEM;
27use crate::Value;
28use crate::backend::TaskExecutionBackend;
29use crate::convert_unit_string;
30use crate::path::is_supported_url;
31
32pub(crate) const MAX_RETRIES: u64 = 100;
34
35pub(crate) const DEFAULT_TASK_SHELL: &str = "bash";
37
38pub(crate) const DEFAULT_BACKEND_NAME: &str = "default";
40
41const REDACTED: &str = "<REDACTED>";
43
44pub(crate) fn cache_dir() -> Result<PathBuf> {
46 const CACHE_DIR_ROOT: &str = "sprocket";
48
49 Ok(dirs::cache_dir()
50 .context("failed to determine user cache directory")?
51 .join(CACHE_DIR_ROOT))
52}
53
54#[derive(Debug, Clone)]
58pub struct SecretString {
59 inner: secrecy::SecretString,
63 redacted: bool,
70}
71
72impl SecretString {
73 pub fn redact(&mut self) {
78 self.redacted = true;
79 }
80
81 pub fn unredact(&mut self) {
83 self.redacted = false;
84 }
85
86 pub fn inner(&self) -> &secrecy::SecretString {
88 &self.inner
89 }
90}
91
92impl From<String> for SecretString {
93 fn from(s: String) -> Self {
94 Self {
95 inner: s.into(),
96 redacted: true,
97 }
98 }
99}
100
101impl From<&str> for SecretString {
102 fn from(s: &str) -> Self {
103 Self {
104 inner: s.into(),
105 redacted: true,
106 }
107 }
108}
109
110impl Default for SecretString {
111 fn default() -> Self {
112 Self {
113 inner: Default::default(),
114 redacted: true,
115 }
116 }
117}
118
119impl serde::Serialize for SecretString {
120 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
121 where
122 S: serde::Serializer,
123 {
124 use secrecy::ExposeSecret;
125
126 if self.redacted {
127 serializer.serialize_str(REDACTED)
128 } else {
129 serializer.serialize_str(self.inner.expose_secret())
130 }
131 }
132}
133
134impl<'de> serde::Deserialize<'de> for SecretString {
135 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
136 where
137 D: serde::Deserializer<'de>,
138 {
139 let inner = secrecy::SecretString::deserialize(deserializer)?;
140 Ok(Self {
141 inner,
142 redacted: true,
143 })
144 }
145}
146
147#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
150#[serde(rename_all = "snake_case")]
151pub enum FailureMode {
152 #[default]
155 Slow,
156 Fast,
160}
161
162#[derive(Debug, Default, Clone, Serialize, Deserialize)]
173#[serde(rename_all = "snake_case", deny_unknown_fields)]
174pub struct Config {
175 #[serde(default)]
177 pub http: HttpConfig,
178 #[serde(default)]
180 pub workflow: WorkflowConfig,
181 #[serde(default)]
183 pub task: TaskConfig,
184 #[serde(skip_serializing_if = "Option::is_none")]
189 pub backend: Option<String>,
190 #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
198 pub backends: IndexMap<String, BackendConfig>,
199 #[serde(default)]
201 pub storage: StorageConfig,
202 #[serde(default)]
216 pub suppress_env_specific_output: bool,
217 #[serde(default)]
224 pub experimental_features_enabled: bool,
225 #[serde(default, rename = "fail")]
233 pub failure_mode: FailureMode,
234}
235
236impl Config {
237 pub async fn validate(&self) -> Result<()> {
239 self.http.validate()?;
240 self.workflow.validate()?;
241 self.task.validate()?;
242
243 if self.backend.is_none() && self.backends.len() < 2 {
244 } else {
247 let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
249 if !self.backends.contains_key(backend) {
250 bail!("a backend named `{backend}` is not present in the configuration");
251 }
252 }
253
254 for backend in self.backends.values() {
255 backend.validate(self).await?;
256 }
257
258 self.storage.validate()?;
259
260 if self.suppress_env_specific_output && !self.experimental_features_enabled {
261 bail!("`suppress_env_specific_output` requires enabling experimental features");
262 }
263
264 Ok(())
265 }
266
267 pub fn redact(&mut self) {
271 for backend in self.backends.values_mut() {
272 backend.redact();
273 }
274
275 if let Some(auth) = &mut self.storage.azure.auth {
276 auth.redact();
277 }
278
279 if let Some(auth) = &mut self.storage.s3.auth {
280 auth.redact();
281 }
282
283 if let Some(auth) = &mut self.storage.google.auth {
284 auth.redact();
285 }
286 }
287
288 pub fn unredact(&mut self) {
292 for backend in self.backends.values_mut() {
293 backend.unredact();
294 }
295
296 if let Some(auth) = &mut self.storage.azure.auth {
297 auth.unredact();
298 }
299
300 if let Some(auth) = &mut self.storage.s3.auth {
301 auth.unredact();
302 }
303
304 if let Some(auth) = &mut self.storage.google.auth {
305 auth.unredact();
306 }
307 }
308
309 pub fn backend(&self) -> Result<Cow<'_, BackendConfig>> {
314 if self.backend.is_some() || self.backends.len() >= 2 {
315 let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
317 return Ok(Cow::Borrowed(self.backends.get(backend).ok_or_else(
318 || anyhow!("a backend named `{backend}` is not present in the configuration"),
319 )?));
320 }
321
322 if self.backends.len() == 1 {
323 Ok(Cow::Borrowed(self.backends.values().next().unwrap()))
325 } else {
326 Ok(Cow::Owned(BackendConfig::default()))
328 }
329 }
330
331 pub(crate) async fn create_backend(
333 self: &Arc<Self>,
334 run_root_dir: &Path,
335 events: Events,
336 cancellation: CancellationContext,
337 ) -> Result<Arc<dyn TaskExecutionBackend>> {
338 use crate::backend::*;
339
340 match self.backend()?.as_ref() {
341 BackendConfig::Local(_) => {
342 warn!(
343 "the engine is configured to use the local backend: tasks will not be run \
344 inside of a container"
345 );
346 Ok(Arc::new(LocalBackend::new(
347 self.clone(),
348 events,
349 cancellation,
350 )?))
351 }
352 BackendConfig::Docker(_) => Ok(Arc::new(
353 DockerBackend::new(self.clone(), events, cancellation).await?,
354 )),
355 BackendConfig::Tes(_) => Ok(Arc::new(
356 TesBackend::new(self.clone(), events, cancellation).await?,
357 )),
358 BackendConfig::LsfApptainer(_) => Ok(Arc::new(LsfApptainerBackend::new(
359 self.clone(),
360 run_root_dir,
361 events,
362 cancellation,
363 )?)),
364 BackendConfig::SlurmApptainer(_) => Ok(Arc::new(SlurmApptainerBackend::new(
365 self.clone(),
366 run_root_dir,
367 events,
368 cancellation,
369 )?)),
370 }
371 }
372}
373
374#[derive(Debug, Default, Clone, Serialize, Deserialize)]
376#[serde(rename_all = "snake_case", deny_unknown_fields)]
377pub struct HttpConfig {
378 #[serde(default, skip_serializing_if = "Option::is_none")]
382 pub cache_dir: Option<PathBuf>,
383 #[serde(default, skip_serializing_if = "Option::is_none")]
387 pub retries: Option<usize>,
388 #[serde(default, skip_serializing_if = "Option::is_none")]
392 pub parallelism: Option<usize>,
393}
394
395impl HttpConfig {
396 pub fn validate(&self) -> Result<()> {
398 if let Some(parallelism) = self.parallelism
399 && parallelism == 0
400 {
401 bail!("configuration value `http.parallelism` cannot be zero");
402 }
403 Ok(())
404 }
405}
406
407#[derive(Debug, Default, Clone, Serialize, Deserialize)]
409#[serde(rename_all = "snake_case", deny_unknown_fields)]
410pub struct StorageConfig {
411 #[serde(default)]
413 pub azure: AzureStorageConfig,
414 #[serde(default)]
416 pub s3: S3StorageConfig,
417 #[serde(default)]
419 pub google: GoogleStorageConfig,
420}
421
422impl StorageConfig {
423 pub fn validate(&self) -> Result<()> {
425 self.azure.validate()?;
426 self.s3.validate()?;
427 self.google.validate()?;
428 Ok(())
429 }
430}
431
432#[derive(Debug, Default, Clone, Serialize, Deserialize)]
434#[serde(rename_all = "snake_case", deny_unknown_fields)]
435pub struct AzureStorageAuthConfig {
436 pub account_name: String,
438 pub access_key: SecretString,
440}
441
442impl AzureStorageAuthConfig {
443 pub fn validate(&self) -> Result<()> {
445 if self.account_name.is_empty() {
446 bail!("configuration value `storage.azure.auth.account_name` is required");
447 }
448
449 if self.access_key.inner.expose_secret().is_empty() {
450 bail!("configuration value `storage.azure.auth.access_key` is required");
451 }
452
453 Ok(())
454 }
455
456 pub fn redact(&mut self) {
459 self.access_key.redact();
460 }
461
462 pub fn unredact(&mut self) {
465 self.access_key.unredact();
466 }
467}
468
469#[derive(Debug, Default, Clone, Serialize, Deserialize)]
471#[serde(rename_all = "snake_case", deny_unknown_fields)]
472pub struct AzureStorageConfig {
473 #[serde(default, skip_serializing_if = "Option::is_none")]
475 pub auth: Option<AzureStorageAuthConfig>,
476}
477
478impl AzureStorageConfig {
479 pub fn validate(&self) -> Result<()> {
481 if let Some(auth) = &self.auth {
482 auth.validate()?;
483 }
484
485 Ok(())
486 }
487}
488
489#[derive(Debug, Default, Clone, Serialize, Deserialize)]
491#[serde(rename_all = "snake_case", deny_unknown_fields)]
492pub struct S3StorageAuthConfig {
493 pub access_key_id: String,
495 pub secret_access_key: SecretString,
497}
498
499impl S3StorageAuthConfig {
500 pub fn validate(&self) -> Result<()> {
502 if self.access_key_id.is_empty() {
503 bail!("configuration value `storage.s3.auth.access_key_id` is required");
504 }
505
506 if self.secret_access_key.inner.expose_secret().is_empty() {
507 bail!("configuration value `storage.s3.auth.secret_access_key` is required");
508 }
509
510 Ok(())
511 }
512
513 pub fn redact(&mut self) {
516 self.secret_access_key.redact();
517 }
518
519 pub fn unredact(&mut self) {
522 self.secret_access_key.unredact();
523 }
524}
525
526#[derive(Debug, Default, Clone, Serialize, Deserialize)]
528#[serde(rename_all = "snake_case", deny_unknown_fields)]
529pub struct S3StorageConfig {
530 #[serde(default, skip_serializing_if = "Option::is_none")]
535 pub region: Option<String>,
536
537 #[serde(default, skip_serializing_if = "Option::is_none")]
539 pub auth: Option<S3StorageAuthConfig>,
540}
541
542impl S3StorageConfig {
543 pub fn validate(&self) -> Result<()> {
545 if let Some(auth) = &self.auth {
546 auth.validate()?;
547 }
548
549 Ok(())
550 }
551}
552
553#[derive(Debug, Default, Clone, Serialize, Deserialize)]
555#[serde(rename_all = "snake_case", deny_unknown_fields)]
556pub struct GoogleStorageAuthConfig {
557 pub access_key: String,
559 pub secret: SecretString,
561}
562
563impl GoogleStorageAuthConfig {
564 pub fn validate(&self) -> Result<()> {
566 if self.access_key.is_empty() {
567 bail!("configuration value `storage.google.auth.access_key` is required");
568 }
569
570 if self.secret.inner.expose_secret().is_empty() {
571 bail!("configuration value `storage.google.auth.secret` is required");
572 }
573
574 Ok(())
575 }
576
577 pub fn redact(&mut self) {
580 self.secret.redact();
581 }
582
583 pub fn unredact(&mut self) {
586 self.secret.unredact();
587 }
588}
589
590#[derive(Debug, Default, Clone, Serialize, Deserialize)]
592#[serde(rename_all = "snake_case", deny_unknown_fields)]
593pub struct GoogleStorageConfig {
594 #[serde(default, skip_serializing_if = "Option::is_none")]
596 pub auth: Option<GoogleStorageAuthConfig>,
597}
598
599impl GoogleStorageConfig {
600 pub fn validate(&self) -> Result<()> {
602 if let Some(auth) = &self.auth {
603 auth.validate()?;
604 }
605
606 Ok(())
607 }
608}
609
610#[derive(Debug, Default, Clone, Serialize, Deserialize)]
612#[serde(rename_all = "snake_case", deny_unknown_fields)]
613pub struct WorkflowConfig {
614 #[serde(default)]
616 pub scatter: ScatterConfig,
617}
618
619impl WorkflowConfig {
620 pub fn validate(&self) -> Result<()> {
622 self.scatter.validate()?;
623 Ok(())
624 }
625}
626
627#[derive(Debug, Default, Clone, Serialize, Deserialize)]
629#[serde(rename_all = "snake_case", deny_unknown_fields)]
630pub struct ScatterConfig {
631 #[serde(default, skip_serializing_if = "Option::is_none")]
684 pub concurrency: Option<u64>,
685}
686
687impl ScatterConfig {
688 pub fn validate(&self) -> Result<()> {
690 if let Some(concurrency) = self.concurrency
691 && concurrency == 0
692 {
693 bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
694 }
695
696 Ok(())
697 }
698}
699
700#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
702#[serde(rename_all = "snake_case")]
703pub enum CallCachingMode {
704 #[default]
711 Off,
712 On,
718 Explicit,
726}
727
728#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
730#[serde(rename_all = "snake_case")]
731pub enum ContentDigestMode {
732 Strong,
739 #[default]
750 Weak,
751}
752
753#[derive(Debug, Default, Clone, Serialize, Deserialize)]
755#[serde(rename_all = "snake_case", deny_unknown_fields)]
756pub struct TaskConfig {
757 #[serde(default, skip_serializing_if = "Option::is_none")]
763 pub retries: Option<u64>,
764 #[serde(default, skip_serializing_if = "Option::is_none")]
769 pub container: Option<String>,
770 #[serde(default, skip_serializing_if = "Option::is_none")]
778 pub shell: Option<String>,
779 #[serde(default)]
781 pub cpu_limit_behavior: TaskResourceLimitBehavior,
782 #[serde(default)]
784 pub memory_limit_behavior: TaskResourceLimitBehavior,
785 #[serde(default, skip_serializing_if = "Option::is_none")]
789 pub cache_dir: Option<PathBuf>,
790 #[serde(default)]
792 pub cache: CallCachingMode,
793 #[serde(default)]
797 pub digests: ContentDigestMode,
798}
799
800impl TaskConfig {
801 pub fn validate(&self) -> Result<()> {
803 if self.retries.unwrap_or(0) > MAX_RETRIES {
804 bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
805 }
806
807 Ok(())
808 }
809}
810
811#[derive(Debug, Default, Clone, Serialize, Deserialize)]
814#[serde(rename_all = "snake_case", deny_unknown_fields)]
815pub enum TaskResourceLimitBehavior {
816 TryWithMax,
819 #[default]
823 Deny,
824}
825
826#[derive(Debug, Clone, Serialize, Deserialize)]
828#[serde(rename_all = "snake_case", tag = "type")]
829pub enum BackendConfig {
830 Local(LocalBackendConfig),
832 Docker(DockerBackendConfig),
834 Tes(TesBackendConfig),
836 LsfApptainer(LsfApptainerBackendConfig),
840 SlurmApptainer(SlurmApptainerBackendConfig),
844}
845
846impl Default for BackendConfig {
847 fn default() -> Self {
848 Self::Docker(Default::default())
849 }
850}
851
852impl BackendConfig {
853 pub async fn validate(&self, engine_config: &Config) -> Result<()> {
855 match self {
856 Self::Local(config) => config.validate(),
857 Self::Docker(config) => config.validate(),
858 Self::Tes(config) => config.validate(),
859 Self::LsfApptainer(config) => config.validate(engine_config).await,
860 Self::SlurmApptainer(config) => config.validate(engine_config).await,
861 }
862 }
863
864 pub fn as_local(&self) -> Option<&LocalBackendConfig> {
868 match self {
869 Self::Local(config) => Some(config),
870 _ => None,
871 }
872 }
873
874 pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
878 match self {
879 Self::Docker(config) => Some(config),
880 _ => None,
881 }
882 }
883
884 pub fn as_tes(&self) -> Option<&TesBackendConfig> {
888 match self {
889 Self::Tes(config) => Some(config),
890 _ => None,
891 }
892 }
893
894 pub fn as_lsf_apptainer(&self) -> Option<&LsfApptainerBackendConfig> {
899 match self {
900 Self::LsfApptainer(config) => Some(config),
901 _ => None,
902 }
903 }
904
905 pub fn as_slurm_apptainer(&self) -> Option<&SlurmApptainerBackendConfig> {
910 match self {
911 Self::SlurmApptainer(config) => Some(config),
912 _ => None,
913 }
914 }
915
916 pub fn redact(&mut self) {
918 match self {
919 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
920 Self::Tes(config) => config.redact(),
921 }
922 }
923
924 pub fn unredact(&mut self) {
926 match self {
927 Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
928 Self::Tes(config) => config.unredact(),
929 }
930 }
931}
932
933#[derive(Debug, Default, Clone, Serialize, Deserialize)]
940#[serde(rename_all = "snake_case", deny_unknown_fields)]
941pub struct LocalBackendConfig {
942 #[serde(default, skip_serializing_if = "Option::is_none")]
948 pub cpu: Option<u64>,
949
950 #[serde(default, skip_serializing_if = "Option::is_none")]
957 pub memory: Option<String>,
958}
959
960impl LocalBackendConfig {
961 pub fn validate(&self) -> Result<()> {
963 if let Some(cpu) = self.cpu {
964 if cpu == 0 {
965 bail!("local backend configuration value `cpu` cannot be zero");
966 }
967
968 let total = SYSTEM.cpus().len() as u64;
969 if cpu > total {
970 bail!(
971 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
972 available to the host ({total})"
973 );
974 }
975 }
976
977 if let Some(memory) = &self.memory {
978 let memory = convert_unit_string(memory).with_context(|| {
979 format!("local backend configuration value `memory` has invalid value `{memory}`")
980 })?;
981
982 if memory == 0 {
983 bail!("local backend configuration value `memory` cannot be zero");
984 }
985
986 let total = SYSTEM.total_memory();
987 if memory > total {
988 bail!(
989 "local backend configuration value `memory` cannot exceed the total memory of \
990 the host ({total} bytes)"
991 );
992 }
993 }
994
995 Ok(())
996 }
997}
998
999const fn cleanup_default() -> bool {
1001 true
1002}
1003
1004#[derive(Debug, Clone, Serialize, Deserialize)]
1006#[serde(rename_all = "snake_case", deny_unknown_fields)]
1007pub struct DockerBackendConfig {
1008 #[serde(default = "cleanup_default")]
1012 pub cleanup: bool,
1013}
1014
1015impl DockerBackendConfig {
1016 pub fn validate(&self) -> Result<()> {
1018 Ok(())
1019 }
1020}
1021
1022impl Default for DockerBackendConfig {
1023 fn default() -> Self {
1024 Self { cleanup: true }
1025 }
1026}
1027
1028#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1030#[serde(rename_all = "snake_case", deny_unknown_fields)]
1031pub struct BasicAuthConfig {
1032 #[serde(default)]
1034 pub username: String,
1035 #[serde(default)]
1037 pub password: SecretString,
1038}
1039
1040impl BasicAuthConfig {
1041 pub fn validate(&self) -> Result<()> {
1043 Ok(())
1044 }
1045
1046 pub fn redact(&mut self) {
1048 self.password.redact();
1049 }
1050
1051 pub fn unredact(&mut self) {
1053 self.password.unredact();
1054 }
1055}
1056
1057#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1059#[serde(rename_all = "snake_case", deny_unknown_fields)]
1060pub struct BearerAuthConfig {
1061 #[serde(default)]
1063 pub token: SecretString,
1064}
1065
1066impl BearerAuthConfig {
1067 pub fn validate(&self) -> Result<()> {
1069 Ok(())
1070 }
1071
1072 pub fn redact(&mut self) {
1074 self.token.redact();
1075 }
1076
1077 pub fn unredact(&mut self) {
1079 self.token.unredact();
1080 }
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1085#[serde(rename_all = "snake_case", tag = "type")]
1086pub enum TesBackendAuthConfig {
1087 Basic(BasicAuthConfig),
1089 Bearer(BearerAuthConfig),
1091}
1092
1093impl TesBackendAuthConfig {
1094 pub fn validate(&self) -> Result<()> {
1096 match self {
1097 Self::Basic(config) => config.validate(),
1098 Self::Bearer(config) => config.validate(),
1099 }
1100 }
1101
1102 pub fn redact(&mut self) {
1105 match self {
1106 Self::Basic(auth) => auth.redact(),
1107 Self::Bearer(auth) => auth.redact(),
1108 }
1109 }
1110
1111 pub fn unredact(&mut self) {
1114 match self {
1115 Self::Basic(auth) => auth.unredact(),
1116 Self::Bearer(auth) => auth.unredact(),
1117 }
1118 }
1119}
1120
1121#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1123#[serde(rename_all = "snake_case", deny_unknown_fields)]
1124pub struct TesBackendConfig {
1125 #[serde(default, skip_serializing_if = "Option::is_none")]
1127 pub url: Option<Url>,
1128
1129 #[serde(default, skip_serializing_if = "Option::is_none")]
1131 pub auth: Option<TesBackendAuthConfig>,
1132
1133 #[serde(default, skip_serializing_if = "Option::is_none")]
1135 pub inputs: Option<Url>,
1136
1137 #[serde(default, skip_serializing_if = "Option::is_none")]
1139 pub outputs: Option<Url>,
1140
1141 #[serde(default, skip_serializing_if = "Option::is_none")]
1145 pub interval: Option<u64>,
1146
1147 pub retries: Option<u32>,
1152
1153 #[serde(default, skip_serializing_if = "Option::is_none")]
1158 pub max_concurrency: Option<u32>,
1159
1160 #[serde(default)]
1163 pub insecure: bool,
1164}
1165
1166impl TesBackendConfig {
1167 pub fn validate(&self) -> Result<()> {
1169 match &self.url {
1170 Some(url) => {
1171 if !self.insecure && url.scheme() != "https" {
1172 bail!(
1173 "TES backend configuration value `url` has invalid value `{url}`: URL \
1174 must use a HTTPS scheme"
1175 );
1176 }
1177 }
1178 None => bail!("TES backend configuration value `url` is required"),
1179 }
1180
1181 if let Some(auth) = &self.auth {
1182 auth.validate()?;
1183 }
1184
1185 if let Some(max_concurrency) = self.max_concurrency
1186 && max_concurrency == 0
1187 {
1188 bail!("TES backend configuration value `max_concurrency` cannot be zero");
1189 }
1190
1191 match &self.inputs {
1192 Some(url) => {
1193 if !is_supported_url(url.as_str()) {
1194 bail!(
1195 "TES backend storage configuration value `inputs` has invalid value \
1196 `{url}`: URL scheme is not supported"
1197 );
1198 }
1199
1200 if !url.path().ends_with('/') {
1201 bail!(
1202 "TES backend storage configuration value `inputs` has invalid value \
1203 `{url}`: URL path must end with a slash"
1204 );
1205 }
1206 }
1207 None => bail!("TES backend configuration value `inputs` is required"),
1208 }
1209
1210 match &self.outputs {
1211 Some(url) => {
1212 if !is_supported_url(url.as_str()) {
1213 bail!(
1214 "TES backend storage configuration value `outputs` has invalid value \
1215 `{url}`: URL scheme is not supported"
1216 );
1217 }
1218
1219 if !url.path().ends_with('/') {
1220 bail!(
1221 "TES backend storage configuration value `outputs` has invalid value \
1222 `{url}`: URL path must end with a slash"
1223 );
1224 }
1225 }
1226 None => bail!("TES backend storage configuration value `outputs` is required"),
1227 }
1228
1229 Ok(())
1230 }
1231
1232 pub fn redact(&mut self) {
1234 if let Some(auth) = &mut self.auth {
1235 auth.redact();
1236 }
1237 }
1238
1239 pub fn unredact(&mut self) {
1241 if let Some(auth) = &mut self.auth {
1242 auth.unredact();
1243 }
1244 }
1245}
1246
1247#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)]
1249pub struct ApptainerConfig {
1250 pub extra_apptainer_exec_args: Option<Vec<String>>,
1253}
1254
1255impl ApptainerConfig {
1256 pub async fn validate(&self) -> Result<(), anyhow::Error> {
1258 Ok(())
1259 }
1260}
1261
1262#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1271pub struct LsfQueueConfig {
1272 name: String,
1275 max_cpu_per_task: Option<u64>,
1277 max_memory_per_task: Option<ByteSize>,
1279}
1280
1281impl LsfQueueConfig {
1282 pub fn new(
1284 name: String,
1285 max_cpu_per_task: Option<u64>,
1286 max_memory_per_task: Option<ByteSize>,
1287 ) -> Self {
1288 Self {
1289 name,
1290 max_cpu_per_task,
1291 max_memory_per_task,
1292 }
1293 }
1294
1295 pub fn name(&self) -> &str {
1298 &self.name
1299 }
1300
1301 pub fn max_cpu_per_task(&self) -> Option<u64> {
1303 self.max_cpu_per_task
1304 }
1305
1306 pub fn max_memory_per_task(&self) -> Option<ByteSize> {
1308 self.max_memory_per_task
1309 }
1310
1311 async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1313 let queue = self.name();
1314 ensure!(!queue.is_empty(), "{name}_lsf_queue name cannot be empty");
1315 if let Some(max_cpu_per_task) = self.max_cpu_per_task() {
1316 ensure!(
1317 max_cpu_per_task > 0,
1318 "{name}_lsf_queue `{queue}` must allow at least 1 CPU to be provisioned"
1319 );
1320 }
1321 if let Some(max_memory_per_task) = self.max_memory_per_task() {
1322 ensure!(
1323 max_memory_per_task.as_u64() > 0,
1324 "{name}_lsf_queue `{queue}` must allow at least some memory to be provisioned"
1325 );
1326 }
1327 match tokio::time::timeout(
1328 std::time::Duration::from_secs(10),
1331 Command::new("bqueues").arg(queue).output(),
1332 )
1333 .await
1334 {
1335 Ok(output) => {
1336 let output = output.context("validating LSF queue")?;
1337 if !output.status.success() {
1338 let stdout = String::from_utf8_lossy(&output.stdout);
1339 let stderr = String::from_utf8_lossy(&output.stderr);
1340 error!(%stdout, %stderr, %queue, "failed to validate {name}_lsf_queue");
1341 Err(anyhow!("failed to validate {name}_lsf_queue `{queue}`"))
1342 } else {
1343 Ok(())
1344 }
1345 }
1346 Err(_) => Err(anyhow!(
1347 "timed out trying to validate {name}_lsf_queue `{queue}`"
1348 )),
1349 }
1350 }
1351}
1352
1353#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1357pub struct LsfApptainerBackendConfig {
1358 pub default_lsf_queue: Option<LsfQueueConfig>,
1365 pub short_task_lsf_queue: Option<LsfQueueConfig>,
1372 pub gpu_lsf_queue: Option<LsfQueueConfig>,
1376 pub fpga_lsf_queue: Option<LsfQueueConfig>,
1380 pub extra_bsub_args: Option<Vec<String>>,
1383 #[serde(default)]
1390 #[serde(flatten)]
1394 pub apptainer_config: ApptainerConfig,
1395}
1396
1397impl LsfApptainerBackendConfig {
1398 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1400 if cfg!(not(unix)) {
1401 bail!("LSF + Apptainer backend is not supported on non-unix platforms");
1402 }
1403 if !engine_config.experimental_features_enabled {
1404 bail!("LSF + Apptainer backend requires enabling experimental features");
1405 }
1406
1407 if let Some(queue) = self.default_lsf_queue.as_ref() {
1413 queue.validate("default").await?;
1414 }
1415 if let Some(queue) = self.short_task_lsf_queue.as_ref() {
1416 queue.validate("short_task").await?;
1417 }
1418 if let Some(queue) = self.gpu_lsf_queue.as_ref() {
1419 queue.validate("gpu").await?;
1420 }
1421 if let Some(queue) = self.fpga_lsf_queue.as_ref() {
1422 queue.validate("fpga").await?;
1423 }
1424
1425 self.apptainer_config.validate().await?;
1426
1427 Ok(())
1428 }
1429
1430 pub(crate) fn lsf_queue_for_task(
1435 &self,
1436 requirements: &HashMap<String, Value>,
1437 hints: &HashMap<String, Value>,
1438 ) -> Option<&LsfQueueConfig> {
1439 if let Some(queue) = self.fpga_lsf_queue.as_ref()
1441 && let Some(true) = requirements
1442 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1443 .and_then(Value::as_boolean)
1444 {
1445 return Some(queue);
1446 }
1447
1448 if let Some(queue) = self.gpu_lsf_queue.as_ref()
1449 && let Some(true) = requirements
1450 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1451 .and_then(Value::as_boolean)
1452 {
1453 return Some(queue);
1454 }
1455
1456 if let Some(queue) = self.short_task_lsf_queue.as_ref()
1458 && let Some(true) = hints
1459 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1460 .and_then(Value::as_boolean)
1461 {
1462 return Some(queue);
1463 }
1464
1465 self.default_lsf_queue.as_ref()
1468 }
1469}
1470
1471#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1480pub struct SlurmPartitionConfig {
1481 name: String,
1484 max_cpu_per_task: Option<u64>,
1487 max_memory_per_task: Option<ByteSize>,
1489}
1490
1491impl SlurmPartitionConfig {
1492 pub fn new(
1494 name: String,
1495 max_cpu_per_task: Option<u64>,
1496 max_memory_per_task: Option<ByteSize>,
1497 ) -> Self {
1498 Self {
1499 name,
1500 max_cpu_per_task,
1501 max_memory_per_task,
1502 }
1503 }
1504
1505 pub fn name(&self) -> &str {
1508 &self.name
1509 }
1510
1511 pub fn max_cpu_per_task(&self) -> Option<u64> {
1514 self.max_cpu_per_task
1515 }
1516
1517 pub fn max_memory_per_task(&self) -> Option<ByteSize> {
1519 self.max_memory_per_task
1520 }
1521
1522 async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1525 let partition = self.name();
1526 ensure!(
1527 !partition.is_empty(),
1528 "{name}_slurm_partition name cannot be empty"
1529 );
1530 if let Some(max_cpu_per_task) = self.max_cpu_per_task() {
1531 ensure!(
1532 max_cpu_per_task > 0,
1533 "{name}_slurm_partition `{partition}` must allow at least 1 CPU to be provisioned"
1534 );
1535 }
1536 if let Some(max_memory_per_task) = self.max_memory_per_task() {
1537 ensure!(
1538 max_memory_per_task.as_u64() > 0,
1539 "{name}_slurm_partition `{partition}` must allow at least some memory to be \
1540 provisioned"
1541 );
1542 }
1543 match tokio::time::timeout(
1544 std::time::Duration::from_secs(10),
1547 Command::new("scontrol")
1548 .arg("show")
1549 .arg("partition")
1550 .arg(partition)
1551 .output(),
1552 )
1553 .await
1554 {
1555 Ok(output) => {
1556 let output = output.context("validating Slurm partition")?;
1557 if !output.status.success() {
1558 let stdout = String::from_utf8_lossy(&output.stdout);
1559 let stderr = String::from_utf8_lossy(&output.stderr);
1560 error!(%stdout, %stderr, %partition, "failed to validate {name}_slurm_partition");
1561 Err(anyhow!(
1562 "failed to validate {name}_slurm_partition `{partition}`"
1563 ))
1564 } else {
1565 Ok(())
1566 }
1567 }
1568 Err(_) => Err(anyhow!(
1569 "timed out trying to validate {name}_slurm_partition `{partition}`"
1570 )),
1571 }
1572 }
1573}
1574
1575#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1579pub struct SlurmApptainerBackendConfig {
1580 pub default_slurm_partition: Option<SlurmPartitionConfig>,
1589 pub short_task_slurm_partition: Option<SlurmPartitionConfig>,
1597 pub gpu_slurm_partition: Option<SlurmPartitionConfig>,
1601 pub fpga_slurm_partition: Option<SlurmPartitionConfig>,
1605 pub extra_sbatch_args: Option<Vec<String>>,
1608 #[serde(default)]
1615 #[serde(flatten)]
1619 pub apptainer_config: ApptainerConfig,
1620}
1621
1622impl SlurmApptainerBackendConfig {
1623 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1625 if cfg!(not(unix)) {
1626 bail!("Slurm + Apptainer backend is not supported on non-unix platforms");
1627 }
1628 if !engine_config.experimental_features_enabled {
1629 bail!("Slurm + Apptainer backend requires enabling experimental features");
1630 }
1631
1632 if let Some(partition) = &self.default_slurm_partition {
1638 partition.validate("default").await?;
1639 }
1640 if let Some(partition) = &self.short_task_slurm_partition {
1641 partition.validate("short_task").await?;
1642 }
1643 if let Some(partition) = &self.gpu_slurm_partition {
1644 partition.validate("gpu").await?;
1645 }
1646 if let Some(partition) = &self.fpga_slurm_partition {
1647 partition.validate("fpga").await?;
1648 }
1649
1650 self.apptainer_config.validate().await?;
1651
1652 Ok(())
1653 }
1654
1655 pub(crate) fn slurm_partition_for_task(
1660 &self,
1661 requirements: &HashMap<String, Value>,
1662 hints: &HashMap<String, Value>,
1663 ) -> Option<&SlurmPartitionConfig> {
1664 if let Some(partition) = self.fpga_slurm_partition.as_ref()
1670 && let Some(true) = requirements
1671 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1672 .and_then(Value::as_boolean)
1673 {
1674 return Some(partition);
1675 }
1676
1677 if let Some(partition) = self.gpu_slurm_partition.as_ref()
1678 && let Some(true) = requirements
1679 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1680 .and_then(Value::as_boolean)
1681 {
1682 return Some(partition);
1683 }
1684
1685 if let Some(partition) = self.short_task_slurm_partition.as_ref()
1687 && let Some(true) = hints
1688 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1689 .and_then(Value::as_boolean)
1690 {
1691 return Some(partition);
1692 }
1693
1694 self.default_slurm_partition.as_ref()
1697 }
1698}
1699
1700#[cfg(test)]
1701mod test {
1702 use pretty_assertions::assert_eq;
1703
1704 use super::*;
1705
1706 #[test]
1707 fn redacted_secret() {
1708 let mut secret: SecretString = "secret".into();
1709
1710 assert_eq!(
1711 serde_json::to_string(&secret).unwrap(),
1712 format!(r#""{REDACTED}""#)
1713 );
1714
1715 secret.unredact();
1716 assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1717
1718 secret.redact();
1719 assert_eq!(
1720 serde_json::to_string(&secret).unwrap(),
1721 format!(r#""{REDACTED}""#)
1722 );
1723 }
1724
1725 #[test]
1726 fn redacted_config() {
1727 let config = Config {
1728 backends: [
1729 (
1730 "first".to_string(),
1731 BackendConfig::Tes(TesBackendConfig {
1732 auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1733 username: "foo".into(),
1734 password: "secret".into(),
1735 })),
1736 ..Default::default()
1737 }),
1738 ),
1739 (
1740 "second".to_string(),
1741 BackendConfig::Tes(TesBackendConfig {
1742 auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1743 token: "secret".into(),
1744 })),
1745 ..Default::default()
1746 }),
1747 ),
1748 ]
1749 .into(),
1750 storage: StorageConfig {
1751 azure: AzureStorageConfig {
1752 auth: Some(AzureStorageAuthConfig {
1753 account_name: "foo".into(),
1754 access_key: "secret".into(),
1755 }),
1756 },
1757 s3: S3StorageConfig {
1758 auth: Some(S3StorageAuthConfig {
1759 access_key_id: "foo".into(),
1760 secret_access_key: "secret".into(),
1761 }),
1762 ..Default::default()
1763 },
1764 google: GoogleStorageConfig {
1765 auth: Some(GoogleStorageAuthConfig {
1766 access_key: "foo".into(),
1767 secret: "secret".into(),
1768 }),
1769 },
1770 },
1771 ..Default::default()
1772 };
1773
1774 let json = serde_json::to_string_pretty(&config).unwrap();
1775 assert!(json.contains("secret"), "`{json}` contains a secret");
1776 }
1777
1778 #[tokio::test]
1779 async fn test_config_validate() {
1780 let mut config = Config::default();
1782 config.task.retries = Some(1000000);
1783 assert_eq!(
1784 config.validate().await.unwrap_err().to_string(),
1785 "configuration value `task.retries` cannot exceed 100"
1786 );
1787
1788 let mut config = Config::default();
1790 config.workflow.scatter.concurrency = Some(0);
1791 assert_eq!(
1792 config.validate().await.unwrap_err().to_string(),
1793 "configuration value `workflow.scatter.concurrency` cannot be zero"
1794 );
1795
1796 let config = Config {
1798 backend: Some("foo".into()),
1799 ..Default::default()
1800 };
1801 assert_eq!(
1802 config.validate().await.unwrap_err().to_string(),
1803 "a backend named `foo` is not present in the configuration"
1804 );
1805 let config = Config {
1806 backend: Some("bar".into()),
1807 backends: [("foo".to_string(), BackendConfig::default())].into(),
1808 ..Default::default()
1809 };
1810 assert_eq!(
1811 config.validate().await.unwrap_err().to_string(),
1812 "a backend named `bar` is not present in the configuration"
1813 );
1814
1815 let config = Config {
1817 backends: [("foo".to_string(), BackendConfig::default())].into(),
1818 ..Default::default()
1819 };
1820 config.validate().await.expect("config should validate");
1821
1822 let config = Config {
1824 backends: [(
1825 "default".to_string(),
1826 BackendConfig::Local(LocalBackendConfig {
1827 cpu: Some(0),
1828 ..Default::default()
1829 }),
1830 )]
1831 .into(),
1832 ..Default::default()
1833 };
1834 assert_eq!(
1835 config.validate().await.unwrap_err().to_string(),
1836 "local backend configuration value `cpu` cannot be zero"
1837 );
1838 let config = Config {
1839 backends: [(
1840 "default".to_string(),
1841 BackendConfig::Local(LocalBackendConfig {
1842 cpu: Some(10000000),
1843 ..Default::default()
1844 }),
1845 )]
1846 .into(),
1847 ..Default::default()
1848 };
1849 assert!(
1850 config
1851 .validate()
1852 .await
1853 .unwrap_err()
1854 .to_string()
1855 .starts_with(
1856 "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1857 available to the host"
1858 )
1859 );
1860
1861 let config = Config {
1863 backends: [(
1864 "default".to_string(),
1865 BackendConfig::Local(LocalBackendConfig {
1866 memory: Some("0 GiB".to_string()),
1867 ..Default::default()
1868 }),
1869 )]
1870 .into(),
1871 ..Default::default()
1872 };
1873 assert_eq!(
1874 config.validate().await.unwrap_err().to_string(),
1875 "local backend configuration value `memory` cannot be zero"
1876 );
1877 let config = Config {
1878 backends: [(
1879 "default".to_string(),
1880 BackendConfig::Local(LocalBackendConfig {
1881 memory: Some("100 meows".to_string()),
1882 ..Default::default()
1883 }),
1884 )]
1885 .into(),
1886 ..Default::default()
1887 };
1888 assert_eq!(
1889 config.validate().await.unwrap_err().to_string(),
1890 "local backend configuration value `memory` has invalid value `100 meows`"
1891 );
1892
1893 let config = Config {
1894 backends: [(
1895 "default".to_string(),
1896 BackendConfig::Local(LocalBackendConfig {
1897 memory: Some("1000 TiB".to_string()),
1898 ..Default::default()
1899 }),
1900 )]
1901 .into(),
1902 ..Default::default()
1903 };
1904 assert!(
1905 config
1906 .validate()
1907 .await
1908 .unwrap_err()
1909 .to_string()
1910 .starts_with(
1911 "local backend configuration value `memory` cannot exceed the total memory of \
1912 the host"
1913 )
1914 );
1915
1916 let config = Config {
1918 backends: [(
1919 "default".to_string(),
1920 BackendConfig::Tes(Default::default()),
1921 )]
1922 .into(),
1923 ..Default::default()
1924 };
1925 assert_eq!(
1926 config.validate().await.unwrap_err().to_string(),
1927 "TES backend configuration value `url` is required"
1928 );
1929
1930 let config = Config {
1932 backends: [(
1933 "default".to_string(),
1934 BackendConfig::Tes(TesBackendConfig {
1935 url: Some("https://example.com".parse().unwrap()),
1936 max_concurrency: Some(0),
1937 ..Default::default()
1938 }),
1939 )]
1940 .into(),
1941 ..Default::default()
1942 };
1943 assert_eq!(
1944 config.validate().await.unwrap_err().to_string(),
1945 "TES backend configuration value `max_concurrency` cannot be zero"
1946 );
1947
1948 let config = Config {
1950 backends: [(
1951 "default".to_string(),
1952 BackendConfig::Tes(TesBackendConfig {
1953 url: Some("http://example.com".parse().unwrap()),
1954 inputs: Some("http://example.com".parse().unwrap()),
1955 outputs: Some("http://example.com".parse().unwrap()),
1956 ..Default::default()
1957 }),
1958 )]
1959 .into(),
1960 ..Default::default()
1961 };
1962 assert_eq!(
1963 config.validate().await.unwrap_err().to_string(),
1964 "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
1965 must use a HTTPS scheme"
1966 );
1967
1968 let config = Config {
1970 backends: [(
1971 "default".to_string(),
1972 BackendConfig::Tes(TesBackendConfig {
1973 url: Some("http://example.com".parse().unwrap()),
1974 inputs: Some("http://example.com".parse().unwrap()),
1975 outputs: Some("http://example.com".parse().unwrap()),
1976 insecure: true,
1977 ..Default::default()
1978 }),
1979 )]
1980 .into(),
1981 ..Default::default()
1982 };
1983 config
1984 .validate()
1985 .await
1986 .expect("configuration should validate");
1987
1988 let mut config = Config::default();
1989 config.http.parallelism = Some(0);
1990 assert_eq!(
1991 config.validate().await.unwrap_err().to_string(),
1992 "configuration value `http.parallelism` cannot be zero"
1993 );
1994
1995 let mut config = Config::default();
1996 config.http.parallelism = Some(5);
1997 assert!(
1998 config.validate().await.is_ok(),
1999 "should pass for valid configuration"
2000 );
2001
2002 let mut config = Config::default();
2003 config.http.parallelism = None;
2004 assert!(
2005 config.validate().await.is_ok(),
2006 "should pass for default (None)"
2007 );
2008 }
2009}