Skip to main content

wdl_engine/
config.rs

1//! Implementation of engine configuration.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::path::Path;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use anyhow::ensure;
14use bytesize::ByteSize;
15use indexmap::IndexMap;
16use secrecy::ExposeSecret;
17use serde::Deserialize;
18use serde::Serialize;
19use tokio::process::Command;
20use tracing::error;
21use tracing::warn;
22use url::Url;
23
24use crate::CancellationContext;
25use crate::Events;
26use crate::SYSTEM;
27use crate::Value;
28use crate::backend::TaskExecutionBackend;
29use crate::convert_unit_string;
30use crate::path::is_supported_url;
31
32/// The inclusive maximum number of task retries the engine supports.
33pub(crate) const MAX_RETRIES: u64 = 100;
34
35/// The default task shell.
36pub(crate) const DEFAULT_TASK_SHELL: &str = "bash";
37
38/// The default backend name.
39pub(crate) const DEFAULT_BACKEND_NAME: &str = "default";
40
41/// The maximum size, in bytes, for an LSF job name prefix.
42const MAX_LSF_JOB_NAME_PREFIX: usize = 100;
43
44/// The string that replaces redacted serialization fields.
45const REDACTED: &str = "<REDACTED>";
46
47/// Gets tne default root cache directory for the user.
48pub(crate) fn cache_dir() -> Result<PathBuf> {
49    /// The subdirectory within the user's cache directory for all caches
50    const CACHE_DIR_ROOT: &str = "sprocket";
51
52    Ok(dirs::cache_dir()
53        .context("failed to determine user cache directory")?
54        .join(CACHE_DIR_ROOT))
55}
56
57/// Represents a secret string that is, by default, redacted for serialization.
58///
59/// This type is a wrapper around [`secrecy::SecretString`].
60#[derive(Debug, Clone)]
61pub struct SecretString {
62    /// The inner secret string.
63    ///
64    /// This type is not serializable.
65    inner: secrecy::SecretString,
66    /// Whether or not the secret string is redacted for serialization.
67    ///
68    /// If `true` (the default), `<REDACTED>` is serialized for the string's
69    /// value.
70    ///
71    /// If `false`, the inner secret string is exposed for serialization.
72    redacted: bool,
73}
74
75impl SecretString {
76    /// Redacts the secret for serialization.
77    ///
78    /// By default, a [`SecretString`] is redacted; when redacted, the string is
79    /// replaced with `<REDACTED>` when serialized.
80    pub fn redact(&mut self) {
81        self.redacted = true;
82    }
83
84    /// Unredacts the secret for serialization.
85    pub fn unredact(&mut self) {
86        self.redacted = false;
87    }
88
89    /// Gets the inner [`secrecy::SecretString`].
90    pub fn inner(&self) -> &secrecy::SecretString {
91        &self.inner
92    }
93}
94
95impl From<String> for SecretString {
96    fn from(s: String) -> Self {
97        Self {
98            inner: s.into(),
99            redacted: true,
100        }
101    }
102}
103
104impl From<&str> for SecretString {
105    fn from(s: &str) -> Self {
106        Self {
107            inner: s.into(),
108            redacted: true,
109        }
110    }
111}
112
113impl Default for SecretString {
114    fn default() -> Self {
115        Self {
116            inner: Default::default(),
117            redacted: true,
118        }
119    }
120}
121
122impl serde::Serialize for SecretString {
123    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
124    where
125        S: serde::Serializer,
126    {
127        use secrecy::ExposeSecret;
128
129        if self.redacted {
130            serializer.serialize_str(REDACTED)
131        } else {
132            serializer.serialize_str(self.inner.expose_secret())
133        }
134    }
135}
136
137impl<'de> serde::Deserialize<'de> for SecretString {
138    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
139    where
140        D: serde::Deserializer<'de>,
141    {
142        let inner = secrecy::SecretString::deserialize(deserializer)?;
143        Ok(Self {
144            inner,
145            redacted: true,
146        })
147    }
148}
149
150/// Represents how an evaluation error or cancellation should be handled by the
151/// engine.
152#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
153#[serde(rename_all = "snake_case")]
154pub enum FailureMode {
155    /// When an error is encountered or evaluation is canceled, evaluation waits
156    /// for any outstanding tasks to complete.
157    #[default]
158    Slow,
159    /// When an error is encountered or evaluation is canceled, any outstanding
160    /// tasks that are executing are immediately canceled and evaluation waits
161    /// for cancellation to complete.
162    Fast,
163}
164
165/// Represents WDL evaluation configuration.
166///
167/// <div class="warning">
168///
169/// By default, serialization of [`Config`] will redact the values of secrets.
170///
171/// Use the [`Config::unredact`] method before serialization to prevent the
172/// secrets from being redacted.
173///
174/// </div>
175#[derive(Debug, Default, Clone, Serialize, Deserialize)]
176#[serde(rename_all = "snake_case", deny_unknown_fields)]
177pub struct Config {
178    /// HTTP configuration.
179    #[serde(default)]
180    pub http: HttpConfig,
181    /// Workflow evaluation configuration.
182    #[serde(default)]
183    pub workflow: WorkflowConfig,
184    /// Task evaluation configuration.
185    #[serde(default)]
186    pub task: TaskConfig,
187    /// The name of the backend to use.
188    ///
189    /// If not specified and `backends` has multiple entries, it will use a name
190    /// of `default`.
191    #[serde(skip_serializing_if = "Option::is_none")]
192    pub backend: Option<String>,
193    /// Task execution backends configuration.
194    ///
195    /// If the collection is empty and `backend` is not specified, the engine
196    /// default backend is used.
197    ///
198    /// If the collection has exactly one entry and `backend` is not specified,
199    /// the singular entry will be used.
200    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
201    pub backends: IndexMap<String, BackendConfig>,
202    /// Storage configuration.
203    #[serde(default)]
204    pub storage: StorageConfig,
205    /// (Experimental) Avoid environment-specific output; default is `false`.
206    ///
207    /// If this option is `true`, selected error messages and log output will
208    /// avoid emitting environment-specific output such as absolute paths
209    /// and system resource counts.
210    ///
211    /// This is largely meant to support "golden testing" where a test's success
212    /// depends on matching an expected set of outputs exactly. Cues that
213    /// help users overcome errors, such as the path to a temporary
214    /// directory or the number of CPUs available to the system, confound this
215    /// style of testing. This flag is a best-effort experimental attempt to
216    /// reduce the impact of these differences in order to allow a wider
217    /// range of golden tests to be written.
218    #[serde(default)]
219    pub suppress_env_specific_output: bool,
220    /// (Experimental) Whether experimental features are enabled; default is
221    /// `false`.
222    ///
223    /// Experimental features are provided to users with heavy caveats about
224    /// their stability and rough edges. Use at your own risk, but feedback
225    /// is quite welcome.
226    #[serde(default)]
227    pub experimental_features_enabled: bool,
228    /// The failure mode for workflow or task evaluation.
229    ///
230    /// A value of [`FailureMode::Slow`] will result in evaluation waiting for
231    /// executing tasks to complete upon error or interruption.
232    ///
233    /// A value of [`FailureMode::Fast`] will immediately attempt to cancel
234    /// executing tasks upon error or interruption.
235    #[serde(default, rename = "fail")]
236    pub failure_mode: FailureMode,
237}
238
239impl Config {
240    /// Validates the evaluation configuration.
241    pub async fn validate(&self) -> Result<()> {
242        self.http.validate()?;
243        self.workflow.validate()?;
244        self.task.validate()?;
245
246        if self.backend.is_none() && self.backends.len() < 2 {
247            // This is OK, we'll use either the singular backends entry (1) or
248            // the default (0)
249        } else {
250            // Check the backends map for the backend name (or "default")
251            let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
252            if !self.backends.contains_key(backend) {
253                bail!("a backend named `{backend}` is not present in the configuration");
254            }
255        }
256
257        for backend in self.backends.values() {
258            backend.validate(self).await?;
259        }
260
261        self.storage.validate()?;
262
263        if self.suppress_env_specific_output && !self.experimental_features_enabled {
264            bail!("`suppress_env_specific_output` requires enabling experimental features");
265        }
266
267        Ok(())
268    }
269
270    /// Redacts the secrets contained in the configuration.
271    ///
272    /// By default, secrets are redacted for serialization.
273    pub fn redact(&mut self) {
274        for backend in self.backends.values_mut() {
275            backend.redact();
276        }
277
278        if let Some(auth) = &mut self.storage.azure.auth {
279            auth.redact();
280        }
281
282        if let Some(auth) = &mut self.storage.s3.auth {
283            auth.redact();
284        }
285
286        if let Some(auth) = &mut self.storage.google.auth {
287            auth.redact();
288        }
289    }
290
291    /// Unredacts the secrets contained in the configuration.
292    ///
293    /// Calling this method will expose secrets for serialization.
294    pub fn unredact(&mut self) {
295        for backend in self.backends.values_mut() {
296            backend.unredact();
297        }
298
299        if let Some(auth) = &mut self.storage.azure.auth {
300            auth.unredact();
301        }
302
303        if let Some(auth) = &mut self.storage.s3.auth {
304            auth.unredact();
305        }
306
307        if let Some(auth) = &mut self.storage.google.auth {
308            auth.unredact();
309        }
310    }
311
312    /// Gets the backend configuration.
313    ///
314    /// Returns an error if the configuration specifies a named backend that
315    /// isn't present in the configuration.
316    pub fn backend(&self) -> Result<Cow<'_, BackendConfig>> {
317        if self.backend.is_some() || self.backends.len() >= 2 {
318            // Lookup the backend to use
319            let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
320            return Ok(Cow::Borrowed(self.backends.get(backend).ok_or_else(
321                || anyhow!("a backend named `{backend}` is not present in the configuration"),
322            )?));
323        }
324
325        if self.backends.len() == 1 {
326            // Use the singular entry
327            Ok(Cow::Borrowed(self.backends.values().next().unwrap()))
328        } else {
329            // Use the default
330            Ok(Cow::Owned(BackendConfig::default()))
331        }
332    }
333
334    /// Creates a new task execution backend based on this configuration.
335    pub(crate) async fn create_backend(
336        self: &Arc<Self>,
337        run_root_dir: &Path,
338        events: Events,
339        cancellation: CancellationContext,
340    ) -> Result<Arc<dyn TaskExecutionBackend>> {
341        use crate::backend::*;
342
343        match self.backend()?.as_ref() {
344            BackendConfig::Local(_) => {
345                warn!(
346                    "the engine is configured to use the local backend: tasks will not be run \
347                     inside of a container"
348                );
349                Ok(Arc::new(LocalBackend::new(
350                    self.clone(),
351                    events,
352                    cancellation,
353                )?))
354            }
355            BackendConfig::Docker(_) => Ok(Arc::new(
356                DockerBackend::new(self.clone(), events, cancellation).await?,
357            )),
358            BackendConfig::Tes(_) => Ok(Arc::new(
359                TesBackend::new(self.clone(), events, cancellation).await?,
360            )),
361            BackendConfig::LsfApptainer(_) => Ok(Arc::new(LsfApptainerBackend::new(
362                self.clone(),
363                run_root_dir,
364                events,
365                cancellation,
366            )?)),
367            BackendConfig::SlurmApptainer(_) => Ok(Arc::new(SlurmApptainerBackend::new(
368                self.clone(),
369                run_root_dir,
370                events,
371                cancellation,
372            )?)),
373        }
374    }
375}
376
377/// Represents HTTP configuration.
378#[derive(Debug, Default, Clone, Serialize, Deserialize)]
379#[serde(rename_all = "snake_case", deny_unknown_fields)]
380pub struct HttpConfig {
381    /// The HTTP download cache location.
382    ///
383    /// Defaults to an operating system specific cache directory for the user.
384    #[serde(default, skip_serializing_if = "Option::is_none")]
385    pub cache_dir: Option<PathBuf>,
386    /// The number of retries for transferring files.
387    ///
388    /// Defaults to `5`.
389    #[serde(default, skip_serializing_if = "Option::is_none")]
390    pub retries: Option<usize>,
391    /// The maximum parallelism for file transfers.
392    ///
393    /// Defaults to the host's available parallelism.
394    #[serde(default, skip_serializing_if = "Option::is_none")]
395    pub parallelism: Option<usize>,
396}
397
398impl HttpConfig {
399    /// Validates the HTTP configuration.
400    pub fn validate(&self) -> Result<()> {
401        if let Some(parallelism) = self.parallelism
402            && parallelism == 0
403        {
404            bail!("configuration value `http.parallelism` cannot be zero");
405        }
406        Ok(())
407    }
408}
409
410/// Represents storage configuration.
411#[derive(Debug, Default, Clone, Serialize, Deserialize)]
412#[serde(rename_all = "snake_case", deny_unknown_fields)]
413pub struct StorageConfig {
414    /// Azure Blob Storage configuration.
415    #[serde(default)]
416    pub azure: AzureStorageConfig,
417    /// AWS S3 configuration.
418    #[serde(default)]
419    pub s3: S3StorageConfig,
420    /// Google Cloud Storage configuration.
421    #[serde(default)]
422    pub google: GoogleStorageConfig,
423}
424
425impl StorageConfig {
426    /// Validates the HTTP configuration.
427    pub fn validate(&self) -> Result<()> {
428        self.azure.validate()?;
429        self.s3.validate()?;
430        self.google.validate()?;
431        Ok(())
432    }
433}
434
435/// Represents authentication information for Azure Blob Storage.
436#[derive(Debug, Default, Clone, Serialize, Deserialize)]
437#[serde(rename_all = "snake_case", deny_unknown_fields)]
438pub struct AzureStorageAuthConfig {
439    /// The Azure Storage account name to use.
440    pub account_name: String,
441    /// The Azure Storage access key to use.
442    pub access_key: SecretString,
443}
444
445impl AzureStorageAuthConfig {
446    /// Validates the Azure Blob Storage authentication configuration.
447    pub fn validate(&self) -> Result<()> {
448        if self.account_name.is_empty() {
449            bail!("configuration value `storage.azure.auth.account_name` is required");
450        }
451
452        if self.access_key.inner.expose_secret().is_empty() {
453            bail!("configuration value `storage.azure.auth.access_key` is required");
454        }
455
456        Ok(())
457    }
458
459    /// Redacts the secrets contained in the Azure Blob Storage storage
460    /// authentication configuration.
461    pub fn redact(&mut self) {
462        self.access_key.redact();
463    }
464
465    /// Unredacts the secrets contained in the Azure Blob Storage authentication
466    /// configuration.
467    pub fn unredact(&mut self) {
468        self.access_key.unredact();
469    }
470}
471
472/// Represents configuration for Azure Blob Storage.
473#[derive(Debug, Default, Clone, Serialize, Deserialize)]
474#[serde(rename_all = "snake_case", deny_unknown_fields)]
475pub struct AzureStorageConfig {
476    /// The Azure Blob Storage authentication configuration.
477    #[serde(default, skip_serializing_if = "Option::is_none")]
478    pub auth: Option<AzureStorageAuthConfig>,
479}
480
481impl AzureStorageConfig {
482    /// Validates the Azure Blob Storage configuration.
483    pub fn validate(&self) -> Result<()> {
484        if let Some(auth) = &self.auth {
485            auth.validate()?;
486        }
487
488        Ok(())
489    }
490}
491
492/// Represents authentication information for AWS S3 storage.
493#[derive(Debug, Default, Clone, Serialize, Deserialize)]
494#[serde(rename_all = "snake_case", deny_unknown_fields)]
495pub struct S3StorageAuthConfig {
496    /// The AWS Access Key ID to use.
497    pub access_key_id: String,
498    /// The AWS Secret Access Key to use.
499    pub secret_access_key: SecretString,
500}
501
502impl S3StorageAuthConfig {
503    /// Validates the AWS S3 storage authentication configuration.
504    pub fn validate(&self) -> Result<()> {
505        if self.access_key_id.is_empty() {
506            bail!("configuration value `storage.s3.auth.access_key_id` is required");
507        }
508
509        if self.secret_access_key.inner.expose_secret().is_empty() {
510            bail!("configuration value `storage.s3.auth.secret_access_key` is required");
511        }
512
513        Ok(())
514    }
515
516    /// Redacts the secrets contained in the AWS S3 storage authentication
517    /// configuration.
518    pub fn redact(&mut self) {
519        self.secret_access_key.redact();
520    }
521
522    /// Unredacts the secrets contained in the AWS S3 storage authentication
523    /// configuration.
524    pub fn unredact(&mut self) {
525        self.secret_access_key.unredact();
526    }
527}
528
529/// Represents configuration for AWS S3 storage.
530#[derive(Debug, Default, Clone, Serialize, Deserialize)]
531#[serde(rename_all = "snake_case", deny_unknown_fields)]
532pub struct S3StorageConfig {
533    /// The default region to use for S3-schemed URLs (e.g.
534    /// `s3://<bucket>/<blob>`).
535    ///
536    /// Defaults to `us-east-1`.
537    #[serde(default, skip_serializing_if = "Option::is_none")]
538    pub region: Option<String>,
539
540    /// The AWS S3 storage authentication configuration.
541    #[serde(default, skip_serializing_if = "Option::is_none")]
542    pub auth: Option<S3StorageAuthConfig>,
543}
544
545impl S3StorageConfig {
546    /// Validates the AWS S3 storage configuration.
547    pub fn validate(&self) -> Result<()> {
548        if let Some(auth) = &self.auth {
549            auth.validate()?;
550        }
551
552        Ok(())
553    }
554}
555
556/// Represents authentication information for Google Cloud Storage.
557#[derive(Debug, Default, Clone, Serialize, Deserialize)]
558#[serde(rename_all = "snake_case", deny_unknown_fields)]
559pub struct GoogleStorageAuthConfig {
560    /// The HMAC Access Key to use.
561    pub access_key: String,
562    /// The HMAC Secret to use.
563    pub secret: SecretString,
564}
565
566impl GoogleStorageAuthConfig {
567    /// Validates the Google Cloud Storage authentication configuration.
568    pub fn validate(&self) -> Result<()> {
569        if self.access_key.is_empty() {
570            bail!("configuration value `storage.google.auth.access_key` is required");
571        }
572
573        if self.secret.inner.expose_secret().is_empty() {
574            bail!("configuration value `storage.google.auth.secret` is required");
575        }
576
577        Ok(())
578    }
579
580    /// Redacts the secrets contained in the Google Cloud Storage authentication
581    /// configuration.
582    pub fn redact(&mut self) {
583        self.secret.redact();
584    }
585
586    /// Unredacts the secrets contained in the Google Cloud Storage
587    /// authentication configuration.
588    pub fn unredact(&mut self) {
589        self.secret.unredact();
590    }
591}
592
593/// Represents configuration for Google Cloud Storage.
594#[derive(Debug, Default, Clone, Serialize, Deserialize)]
595#[serde(rename_all = "snake_case", deny_unknown_fields)]
596pub struct GoogleStorageConfig {
597    /// The Google Cloud Storage authentication configuration.
598    #[serde(default, skip_serializing_if = "Option::is_none")]
599    pub auth: Option<GoogleStorageAuthConfig>,
600}
601
602impl GoogleStorageConfig {
603    /// Validates the Google Cloud Storage configuration.
604    pub fn validate(&self) -> Result<()> {
605        if let Some(auth) = &self.auth {
606            auth.validate()?;
607        }
608
609        Ok(())
610    }
611}
612
613/// Represents workflow evaluation configuration.
614#[derive(Debug, Default, Clone, Serialize, Deserialize)]
615#[serde(rename_all = "snake_case", deny_unknown_fields)]
616pub struct WorkflowConfig {
617    /// Scatter statement evaluation configuration.
618    #[serde(default)]
619    pub scatter: ScatterConfig,
620}
621
622impl WorkflowConfig {
623    /// Validates the workflow configuration.
624    pub fn validate(&self) -> Result<()> {
625        self.scatter.validate()?;
626        Ok(())
627    }
628}
629
630/// Represents scatter statement evaluation configuration.
631#[derive(Debug, Default, Clone, Serialize, Deserialize)]
632#[serde(rename_all = "snake_case", deny_unknown_fields)]
633pub struct ScatterConfig {
634    /// The number of scatter array elements to process concurrently.
635    ///
636    /// Defaults to `1000`.
637    ///
638    /// A value of `0` is invalid.
639    ///
640    /// Lower values use less memory for evaluation and higher values may better
641    /// saturate the task execution backend with tasks to execute for large
642    /// scatters.
643    ///
644    /// This setting does not change how many tasks an execution backend can run
645    /// concurrently, but may affect how many tasks are sent to the backend to
646    /// run at a time.
647    ///
648    /// For example, if `concurrency` was set to 10 and we evaluate the
649    /// following scatters:
650    ///
651    /// ```wdl
652    /// scatter (i in range(100)) {
653    ///     call my_task
654    /// }
655    ///
656    /// scatter (j in range(100)) {
657    ///     call my_task as my_task2
658    /// }
659    /// ```
660    ///
661    /// Here each scatter is independent and therefore there will be 20 calls
662    /// (10 for each scatter) made concurrently. If the task execution
663    /// backend can only execute 5 tasks concurrently, 5 tasks will execute
664    /// and 15 will be "ready" to execute and waiting for an executing task
665    /// to complete.
666    ///
667    /// If instead we evaluate the following scatters:
668    ///
669    /// ```wdl
670    /// scatter (i in range(100)) {
671    ///     scatter (j in range(100)) {
672    ///         call my_task
673    ///     }
674    /// }
675    /// ```
676    ///
677    /// Then there will be 100 calls (10*10 as 10 are made for each outer
678    /// element) made concurrently. If the task execution backend can only
679    /// execute 5 tasks concurrently, 5 tasks will execute and 95 will be
680    /// "ready" to execute and waiting for an executing task to complete.
681    ///
682    /// <div class="warning">
683    /// Warning: nested scatter statements cause exponential memory usage based
684    /// on this value, as each scatter statement evaluation requires allocating
685    /// new scopes for scatter array elements being processed. </div>
686    #[serde(default, skip_serializing_if = "Option::is_none")]
687    pub concurrency: Option<u64>,
688}
689
690impl ScatterConfig {
691    /// Validates the scatter configuration.
692    pub fn validate(&self) -> Result<()> {
693        if let Some(concurrency) = self.concurrency
694            && concurrency == 0
695        {
696            bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
697        }
698
699        Ok(())
700    }
701}
702
703/// Represents the supported call caching modes.
704#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
705#[serde(rename_all = "snake_case")]
706pub enum CallCachingMode {
707    /// Call caching is disabled.
708    ///
709    /// The call cache is not checked and new entries are not added to the
710    /// cache.
711    ///
712    /// This is the default value.
713    #[default]
714    Off,
715    /// Call caching is enabled.
716    ///
717    /// The call cache is checked and new entries are added to the cache.
718    ///
719    /// Defaults the `cacheable` task hint to `true`.
720    On,
721    /// Call caching is enabled only for tasks that explicitly have a
722    /// `cacheable` hint set to `true`.
723    ///
724    /// The call cache is checked and new entries are added to the cache *only*
725    /// for tasks that have the `cacheable` hint set to `true`.
726    ///
727    /// Defaults the `cacheable` task hint to `false`.
728    Explicit,
729}
730
731/// Represents the supported modes for calculating content digests.
732#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
733#[serde(rename_all = "snake_case")]
734pub enum ContentDigestMode {
735    /// Use a strong digest for file content.
736    ///
737    /// Strong digests require hashing all of the contents of a file; this may
738    /// noticeably impact performance for very large files.
739    ///
740    /// This setting guarantees that a modified file will be detected.
741    Strong,
742    /// Use a weak digest for file content.
743    ///
744    /// A weak digest is based solely off of file metadata, such as size and
745    /// last modified time.
746    ///
747    /// This setting cannot guarantee the detection of modified files and may
748    /// result in a modified file not causing a call cache entry to be
749    /// invalidated.
750    ///
751    /// However, it is substantially faster than using a strong digest.
752    #[default]
753    Weak,
754}
755
756/// Represents task evaluation configuration.
757#[derive(Debug, Default, Clone, Serialize, Deserialize)]
758#[serde(rename_all = "snake_case", deny_unknown_fields)]
759pub struct TaskConfig {
760    /// The default maximum number of retries to attempt if a task fails.
761    ///
762    /// A task's `max_retries` requirement will override this value.
763    ///
764    /// Defaults to 0 (no retries).
765    #[serde(default, skip_serializing_if = "Option::is_none")]
766    pub retries: Option<u64>,
767    /// The default container to use if a container is not specified in a task's
768    /// requirements.
769    ///
770    /// Defaults to `ubuntu:latest`.
771    #[serde(default, skip_serializing_if = "Option::is_none")]
772    pub container: Option<String>,
773    /// The default shell to use for tasks.
774    ///
775    /// Defaults to `bash`.
776    ///
777    /// <div class="warning">
778    /// Warning: the use of a shell other than `bash` may lead to tasks that may
779    /// not be portable to other execution engines.
780    ///
781    /// The shell must support a `-c` option to run a specific script file (i.e.
782    /// an evaluated task command).
783    ///
784    /// Note that this option affects all task commands, so every container that
785    /// is used must contain the specified shell.
786    ///
787    /// If using this setting causes your tasks to fail, please do not file an
788    /// issue. </div>
789    #[serde(default, skip_serializing_if = "Option::is_none")]
790    pub shell: Option<String>,
791    /// The behavior when a task's `cpu` requirement cannot be met.
792    #[serde(default)]
793    pub cpu_limit_behavior: TaskResourceLimitBehavior,
794    /// The behavior when a task's `memory` requirement cannot be met.
795    #[serde(default)]
796    pub memory_limit_behavior: TaskResourceLimitBehavior,
797    /// The call cache directory to use for caching task execution results.
798    ///
799    /// Defaults to an operating system specific cache directory for the user.
800    #[serde(default, skip_serializing_if = "Option::is_none")]
801    pub cache_dir: Option<PathBuf>,
802    /// The call caching mode to use for tasks.
803    #[serde(default)]
804    pub cache: CallCachingMode,
805    /// The content digest mode to use.
806    ///
807    /// Used as part of call caching.
808    #[serde(default)]
809    pub digests: ContentDigestMode,
810}
811
812impl TaskConfig {
813    /// Validates the task evaluation configuration.
814    pub fn validate(&self) -> Result<()> {
815        if self.retries.unwrap_or(0) > MAX_RETRIES {
816            bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
817        }
818
819        Ok(())
820    }
821}
822
823/// The behavior when a task resource requirement, such as `cpu` or `memory`,
824/// cannot be met.
825#[derive(Debug, Default, Clone, Serialize, Deserialize)]
826#[serde(rename_all = "snake_case", deny_unknown_fields)]
827pub enum TaskResourceLimitBehavior {
828    /// Try executing a task with the maximum amount of the resource available
829    /// when the task's corresponding requirement cannot be met.
830    TryWithMax,
831    /// Do not execute a task if its corresponding requirement cannot be met.
832    ///
833    /// This is the default behavior.
834    #[default]
835    Deny,
836}
837
838/// Represents supported task execution backends.
839#[derive(Debug, Clone, Serialize, Deserialize)]
840#[serde(rename_all = "snake_case", tag = "type")]
841pub enum BackendConfig {
842    /// Use the local task execution backend.
843    Local(LocalBackendConfig),
844    /// Use the Docker task execution backend.
845    Docker(DockerBackendConfig),
846    /// Use the TES task execution backend.
847    Tes(TesBackendConfig),
848    /// Use the experimental LSF + Apptainer task execution backend.
849    ///
850    /// Requires enabling experimental features.
851    LsfApptainer(LsfApptainerBackendConfig),
852    /// Use the experimental Slurm + Apptainer task execution backend.
853    ///
854    /// Requires enabling experimental features.
855    SlurmApptainer(SlurmApptainerBackendConfig),
856}
857
858impl Default for BackendConfig {
859    fn default() -> Self {
860        Self::Docker(Default::default())
861    }
862}
863
864impl BackendConfig {
865    /// Validates the backend configuration.
866    pub async fn validate(&self, engine_config: &Config) -> Result<()> {
867        match self {
868            Self::Local(config) => config.validate(),
869            Self::Docker(config) => config.validate(),
870            Self::Tes(config) => config.validate(),
871            Self::LsfApptainer(config) => config.validate(engine_config).await,
872            Self::SlurmApptainer(config) => config.validate(engine_config).await,
873        }
874    }
875
876    /// Converts the backend configuration into a local backend configuration
877    ///
878    /// Returns `None` if the backend configuration is not local.
879    pub fn as_local(&self) -> Option<&LocalBackendConfig> {
880        match self {
881            Self::Local(config) => Some(config),
882            _ => None,
883        }
884    }
885
886    /// Converts the backend configuration into a Docker backend configuration
887    ///
888    /// Returns `None` if the backend configuration is not Docker.
889    pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
890        match self {
891            Self::Docker(config) => Some(config),
892            _ => None,
893        }
894    }
895
896    /// Converts the backend configuration into a TES backend configuration
897    ///
898    /// Returns `None` if the backend configuration is not TES.
899    pub fn as_tes(&self) -> Option<&TesBackendConfig> {
900        match self {
901            Self::Tes(config) => Some(config),
902            _ => None,
903        }
904    }
905
906    /// Converts the backend configuration into a LSF Apptainer backend
907    /// configuration
908    ///
909    /// Returns `None` if the backend configuration is not LSF Apptainer.
910    pub fn as_lsf_apptainer(&self) -> Option<&LsfApptainerBackendConfig> {
911        match self {
912            Self::LsfApptainer(config) => Some(config),
913            _ => None,
914        }
915    }
916
917    /// Converts the backend configuration into a Slurm Apptainer backend
918    /// configuration
919    ///
920    /// Returns `None` if the backend configuration is not Slurm Apptainer.
921    pub fn as_slurm_apptainer(&self) -> Option<&SlurmApptainerBackendConfig> {
922        match self {
923            Self::SlurmApptainer(config) => Some(config),
924            _ => None,
925        }
926    }
927
928    /// Redacts the secrets contained in the backend configuration.
929    pub fn redact(&mut self) {
930        match self {
931            Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
932            Self::Tes(config) => config.redact(),
933        }
934    }
935
936    /// Unredacts the secrets contained in the backend configuration.
937    pub fn unredact(&mut self) {
938        match self {
939            Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
940            Self::Tes(config) => config.unredact(),
941        }
942    }
943}
944
945/// Represents configuration for the local task execution backend.
946///
947/// <div class="warning">
948/// Warning: the local task execution backend spawns processes on the host
949/// directly without the use of a container; only use this backend on trusted
950/// WDL. </div>
951#[derive(Debug, Default, Clone, Serialize, Deserialize)]
952#[serde(rename_all = "snake_case", deny_unknown_fields)]
953pub struct LocalBackendConfig {
954    /// Set the number of CPUs available for task execution.
955    ///
956    /// Defaults to the number of logical CPUs for the host.
957    ///
958    /// The value cannot be zero or exceed the host's number of CPUs.
959    #[serde(default, skip_serializing_if = "Option::is_none")]
960    pub cpu: Option<u64>,
961
962    /// Set the total amount of memory for task execution as a unit string (e.g.
963    /// `2 GiB`).
964    ///
965    /// Defaults to the total amount of memory for the host.
966    ///
967    /// The value cannot be zero or exceed the host's total amount of memory.
968    #[serde(default, skip_serializing_if = "Option::is_none")]
969    pub memory: Option<String>,
970}
971
972impl LocalBackendConfig {
973    /// Validates the local task execution backend configuration.
974    pub fn validate(&self) -> Result<()> {
975        if let Some(cpu) = self.cpu {
976            if cpu == 0 {
977                bail!("local backend configuration value `cpu` cannot be zero");
978            }
979
980            let total = SYSTEM.cpus().len() as u64;
981            if cpu > total {
982                bail!(
983                    "local backend configuration value `cpu` cannot exceed the virtual CPUs \
984                     available to the host ({total})"
985                );
986            }
987        }
988
989        if let Some(memory) = &self.memory {
990            let memory = convert_unit_string(memory).with_context(|| {
991                format!("local backend configuration value `memory` has invalid value `{memory}`")
992            })?;
993
994            if memory == 0 {
995                bail!("local backend configuration value `memory` cannot be zero");
996            }
997
998            let total = SYSTEM.total_memory();
999            if memory > total {
1000                bail!(
1001                    "local backend configuration value `memory` cannot exceed the total memory of \
1002                     the host ({total} bytes)"
1003                );
1004            }
1005        }
1006
1007        Ok(())
1008    }
1009}
1010
1011/// Gets the default value for the docker `cleanup` field.
1012const fn cleanup_default() -> bool {
1013    true
1014}
1015
1016/// Represents configuration for the Docker backend.
1017#[derive(Debug, Clone, Serialize, Deserialize)]
1018#[serde(rename_all = "snake_case", deny_unknown_fields)]
1019pub struct DockerBackendConfig {
1020    /// Whether or not to remove a task's container after the task completes.
1021    ///
1022    /// Defaults to `true`.
1023    #[serde(default = "cleanup_default")]
1024    pub cleanup: bool,
1025}
1026
1027impl DockerBackendConfig {
1028    /// Validates the Docker backend configuration.
1029    pub fn validate(&self) -> Result<()> {
1030        Ok(())
1031    }
1032}
1033
1034impl Default for DockerBackendConfig {
1035    fn default() -> Self {
1036        Self { cleanup: true }
1037    }
1038}
1039
1040/// Represents HTTP basic authentication configuration.
1041#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1042#[serde(rename_all = "snake_case", deny_unknown_fields)]
1043pub struct BasicAuthConfig {
1044    /// The HTTP basic authentication username.
1045    #[serde(default)]
1046    pub username: String,
1047    /// The HTTP basic authentication password.
1048    #[serde(default)]
1049    pub password: SecretString,
1050}
1051
1052impl BasicAuthConfig {
1053    /// Validates the HTTP basic auth configuration.
1054    pub fn validate(&self) -> Result<()> {
1055        Ok(())
1056    }
1057
1058    /// Redacts the secrets contained in the HTTP basic auth configuration.
1059    pub fn redact(&mut self) {
1060        self.password.redact();
1061    }
1062
1063    /// Unredacts the secrets contained in the HTTP basic auth configuration.
1064    pub fn unredact(&mut self) {
1065        self.password.unredact();
1066    }
1067}
1068
1069/// Represents HTTP bearer token authentication configuration.
1070#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1071#[serde(rename_all = "snake_case", deny_unknown_fields)]
1072pub struct BearerAuthConfig {
1073    /// The HTTP bearer authentication token.
1074    #[serde(default)]
1075    pub token: SecretString,
1076}
1077
1078impl BearerAuthConfig {
1079    /// Validates the HTTP bearer auth configuration.
1080    pub fn validate(&self) -> Result<()> {
1081        Ok(())
1082    }
1083
1084    /// Redacts the secrets contained in the HTTP bearer auth configuration.
1085    pub fn redact(&mut self) {
1086        self.token.redact();
1087    }
1088
1089    /// Unredacts the secrets contained in the HTTP bearer auth configuration.
1090    pub fn unredact(&mut self) {
1091        self.token.unredact();
1092    }
1093}
1094
1095/// Represents the kind of authentication for a TES backend.
1096#[derive(Debug, Clone, Serialize, Deserialize)]
1097#[serde(rename_all = "snake_case", tag = "type")]
1098pub enum TesBackendAuthConfig {
1099    /// Use basic authentication for the TES backend.
1100    Basic(BasicAuthConfig),
1101    /// Use bearer token authentication for the TES backend.
1102    Bearer(BearerAuthConfig),
1103}
1104
1105impl TesBackendAuthConfig {
1106    /// Validates the TES backend authentication configuration.
1107    pub fn validate(&self) -> Result<()> {
1108        match self {
1109            Self::Basic(config) => config.validate(),
1110            Self::Bearer(config) => config.validate(),
1111        }
1112    }
1113
1114    /// Redacts the secrets contained in the TES backend authentication
1115    /// configuration.
1116    pub fn redact(&mut self) {
1117        match self {
1118            Self::Basic(auth) => auth.redact(),
1119            Self::Bearer(auth) => auth.redact(),
1120        }
1121    }
1122
1123    /// Unredacts the secrets contained in the TES backend authentication
1124    /// configuration.
1125    pub fn unredact(&mut self) {
1126        match self {
1127            Self::Basic(auth) => auth.unredact(),
1128            Self::Bearer(auth) => auth.unredact(),
1129        }
1130    }
1131}
1132
1133/// Represents configuration for the Task Execution Service (TES) backend.
1134#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1135#[serde(rename_all = "snake_case", deny_unknown_fields)]
1136pub struct TesBackendConfig {
1137    /// The URL of the Task Execution Service.
1138    #[serde(default, skip_serializing_if = "Option::is_none")]
1139    pub url: Option<Url>,
1140
1141    /// The authentication configuration for the TES backend.
1142    #[serde(default, skip_serializing_if = "Option::is_none")]
1143    pub auth: Option<TesBackendAuthConfig>,
1144
1145    /// The root cloud storage URL for storing inputs.
1146    #[serde(default, skip_serializing_if = "Option::is_none")]
1147    pub inputs: Option<Url>,
1148
1149    /// The root cloud storage URL for storing outputs.
1150    #[serde(default, skip_serializing_if = "Option::is_none")]
1151    pub outputs: Option<Url>,
1152
1153    /// The polling interval, in seconds, for checking task status.
1154    ///
1155    /// Defaults to 1 second.
1156    #[serde(default, skip_serializing_if = "Option::is_none")]
1157    pub interval: Option<u64>,
1158
1159    /// The number of retries after encountering an error communicating with the
1160    /// TES server.
1161    ///
1162    /// Defaults to no retries.
1163    pub retries: Option<u32>,
1164
1165    /// The maximum number of concurrent requests the backend will send to the
1166    /// TES server.
1167    ///
1168    /// Defaults to 10 concurrent requests.
1169    #[serde(default, skip_serializing_if = "Option::is_none")]
1170    pub max_concurrency: Option<u32>,
1171
1172    /// Whether or not the TES server URL may use an insecure protocol like
1173    /// HTTP.
1174    #[serde(default)]
1175    pub insecure: bool,
1176}
1177
1178impl TesBackendConfig {
1179    /// Validates the TES backend configuration.
1180    pub fn validate(&self) -> Result<()> {
1181        match &self.url {
1182            Some(url) => {
1183                if !self.insecure && url.scheme() != "https" {
1184                    bail!(
1185                        "TES backend configuration value `url` has invalid value `{url}`: URL \
1186                         must use a HTTPS scheme"
1187                    );
1188                }
1189            }
1190            None => bail!("TES backend configuration value `url` is required"),
1191        }
1192
1193        if let Some(auth) = &self.auth {
1194            auth.validate()?;
1195        }
1196
1197        if let Some(max_concurrency) = self.max_concurrency
1198            && max_concurrency == 0
1199        {
1200            bail!("TES backend configuration value `max_concurrency` cannot be zero");
1201        }
1202
1203        match &self.inputs {
1204            Some(url) => {
1205                if !is_supported_url(url.as_str()) {
1206                    bail!(
1207                        "TES backend storage configuration value `inputs` has invalid value \
1208                         `{url}`: URL scheme is not supported"
1209                    );
1210                }
1211
1212                if !url.path().ends_with('/') {
1213                    bail!(
1214                        "TES backend storage configuration value `inputs` has invalid value \
1215                         `{url}`: URL path must end with a slash"
1216                    );
1217                }
1218            }
1219            None => bail!("TES backend configuration value `inputs` is required"),
1220        }
1221
1222        match &self.outputs {
1223            Some(url) => {
1224                if !is_supported_url(url.as_str()) {
1225                    bail!(
1226                        "TES backend storage configuration value `outputs` has invalid value \
1227                         `{url}`: URL scheme is not supported"
1228                    );
1229                }
1230
1231                if !url.path().ends_with('/') {
1232                    bail!(
1233                        "TES backend storage configuration value `outputs` has invalid value \
1234                         `{url}`: URL path must end with a slash"
1235                    );
1236                }
1237            }
1238            None => bail!("TES backend storage configuration value `outputs` is required"),
1239        }
1240
1241        Ok(())
1242    }
1243
1244    /// Redacts the secrets contained in the TES backend configuration.
1245    pub fn redact(&mut self) {
1246        if let Some(auth) = &mut self.auth {
1247            auth.redact();
1248        }
1249    }
1250
1251    /// Unredacts the secrets contained in the TES backend configuration.
1252    pub fn unredact(&mut self) {
1253        if let Some(auth) = &mut self.auth {
1254            auth.unredact();
1255        }
1256    }
1257}
1258
1259/// Configuration for the Apptainer container runtime.
1260#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)]
1261#[serde(rename_all = "snake_case", deny_unknown_fields)]
1262pub struct ApptainerConfig {
1263    /// Additional command-line arguments to pass to `apptainer exec` when
1264    /// executing tasks.
1265    pub extra_apptainer_exec_args: Option<Vec<String>>,
1266}
1267
1268impl ApptainerConfig {
1269    /// Validate that Apptainer is appropriately configured.
1270    pub async fn validate(&self) -> Result<(), anyhow::Error> {
1271        Ok(())
1272    }
1273}
1274
1275/// Configuration for an LSF queue.
1276///
1277/// Each queue can optionally have per-task CPU and memory limits set so that
1278/// tasks which are too large to be scheduled on that queue will fail
1279/// immediately instead of pending indefinitely. In the future, these limits may
1280/// be populated or validated by live information from the cluster, but
1281/// for now they must be manually based on the user's understanding of the
1282/// cluster configuration.
1283#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1284#[serde(rename_all = "snake_case", deny_unknown_fields)]
1285pub struct LsfQueueConfig {
1286    /// The name of the queue; this is the string passed to `bsub -q
1287    /// <queue_name>`.
1288    pub name: String,
1289    /// The maximum number of CPUs this queue can provision for a single task.
1290    pub max_cpu_per_task: Option<u64>,
1291    /// The maximum memory this queue can provision for a single task.
1292    pub max_memory_per_task: Option<ByteSize>,
1293}
1294
1295impl LsfQueueConfig {
1296    /// Validate that this LSF queue exists according to the local `bqueues`.
1297    pub async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1298        let queue = &self.name;
1299        ensure!(!queue.is_empty(), "{name}_lsf_queue name cannot be empty");
1300        if let Some(max_cpu_per_task) = self.max_cpu_per_task {
1301            ensure!(
1302                max_cpu_per_task > 0,
1303                "{name}_lsf_queue `{queue}` must allow at least 1 CPU to be provisioned"
1304            );
1305        }
1306        if let Some(max_memory_per_task) = self.max_memory_per_task {
1307            ensure!(
1308                max_memory_per_task.as_u64() > 0,
1309                "{name}_lsf_queue `{queue}` must allow at least some memory to be provisioned"
1310            );
1311        }
1312        match tokio::time::timeout(
1313            // 10 seconds is rather arbitrary; `bqueues` ordinarily returns extremely quickly, but
1314            // we don't want things to run away on a misconfigured system
1315            std::time::Duration::from_secs(10),
1316            Command::new("bqueues").arg(queue).output(),
1317        )
1318        .await
1319        {
1320            Ok(output) => {
1321                let output = output.context("validating LSF queue")?;
1322                if !output.status.success() {
1323                    let stdout = String::from_utf8_lossy(&output.stdout);
1324                    let stderr = String::from_utf8_lossy(&output.stderr);
1325                    error!(%stdout, %stderr, %queue, "failed to validate {name}_lsf_queue");
1326                    Err(anyhow!("failed to validate {name}_lsf_queue `{queue}`"))
1327                } else {
1328                    Ok(())
1329                }
1330            }
1331            Err(_) => Err(anyhow!(
1332                "timed out trying to validate {name}_lsf_queue `{queue}`"
1333            )),
1334        }
1335    }
1336}
1337
1338/// Configuration for the LSF + Apptainer backend.
1339// TODO ACF 2025-09-23: add a Apptainer/Singularity mode config that switches around executable
1340// name, env var names, etc.
1341#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1342#[serde(rename_all = "snake_case", deny_unknown_fields)]
1343pub struct LsfApptainerBackendConfig {
1344    /// The task monitor polling interval, in seconds.
1345    ///
1346    /// Defaults to 30 seconds.
1347    #[serde(default, skip_serializing_if = "Option::is_none")]
1348    pub interval: Option<u64>,
1349    /// The maximum number of concurrent LSF operations the backend will
1350    /// perform.
1351    ///
1352    /// This controls the maximum concurrent number of `bsub` processes the
1353    /// backend will spawn to queue tasks.
1354    ///
1355    /// Defaults to 10 concurrent operations.
1356    #[serde(default, skip_serializing_if = "Option::is_none")]
1357    pub max_concurrency: Option<u32>,
1358    /// Which queue, if any, to specify when submitting normal jobs to LSF.
1359    ///
1360    /// This may be superseded by
1361    /// [`short_task_lsf_queue`][Self::short_task_lsf_queue],
1362    /// [`gpu_lsf_queue`][Self::gpu_lsf_queue], or
1363    /// [`fpga_lsf_queue`][Self::fpga_lsf_queue] for corresponding tasks.
1364    pub default_lsf_queue: Option<LsfQueueConfig>,
1365    /// Which queue, if any, to specify when submitting [short
1366    /// tasks](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#short_task) to LSF.
1367    ///
1368    /// This may be superseded by [`gpu_lsf_queue`][Self::gpu_lsf_queue] or
1369    /// [`fpga_lsf_queue`][Self::fpga_lsf_queue] for tasks which require
1370    /// specialized hardware.
1371    pub short_task_lsf_queue: Option<LsfQueueConfig>,
1372    /// Which queue, if any, to specify when submitting [tasks which require a
1373    /// GPU](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
1374    /// to LSF.
1375    pub gpu_lsf_queue: Option<LsfQueueConfig>,
1376    /// Which queue, if any, to specify when submitting [tasks which require an
1377    /// FPGA](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
1378    /// to LSF.
1379    pub fpga_lsf_queue: Option<LsfQueueConfig>,
1380    /// Additional command-line arguments to pass to `bsub` when submitting jobs
1381    /// to LSF.
1382    pub extra_bsub_args: Option<Vec<String>>,
1383    /// Prefix to add to every LSF job name before the task identifier. This is
1384    /// truncated as needed to satisfy the byte-oriented LSF job name limit.
1385    #[serde(default, skip_serializing_if = "Option::is_none")]
1386    pub job_name_prefix: Option<String>,
1387    /// The configuration of Apptainer, which is used as the container runtime
1388    /// on the compute nodes where LSF dispatches tasks.
1389    ///
1390    /// Note that this will likely be replaced by an abstraction over multiple
1391    /// container execution runtimes in the future, rather than being
1392    /// hardcoded to Apptainer.
1393    #[serde(default)]
1394    // TODO ACF 2025-10-16: temporarily flatten this into the overall config so that it doesn't
1395    // break existing serialized configs. We'll save breaking the config file format for when we
1396    // actually have meaningful composition of in-place runtimes.
1397    #[serde(flatten)]
1398    pub apptainer_config: ApptainerConfig,
1399}
1400
1401impl LsfApptainerBackendConfig {
1402    /// Validate that the backend is appropriately configured.
1403    pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1404        if cfg!(not(unix)) {
1405            bail!("LSF + Apptainer backend is not supported on non-unix platforms");
1406        }
1407
1408        if !engine_config.experimental_features_enabled {
1409            bail!("LSF + Apptainer backend requires enabling experimental features");
1410        }
1411
1412        // Do what we can to validate options that are dependent on the dynamic
1413        // environment. These are a bit fraught, particularly if the behavior of
1414        // the external tools changes based on where a job gets dispatched, but
1415        // querying from the perspective of the current node allows
1416        // us to get better error messages in circumstances typical to a cluster.
1417        if let Some(queue) = &self.default_lsf_queue {
1418            queue.validate("default").await?;
1419        }
1420
1421        if let Some(queue) = &self.short_task_lsf_queue {
1422            queue.validate("short_task").await?;
1423        }
1424
1425        if let Some(queue) = &self.gpu_lsf_queue {
1426            queue.validate("gpu").await?;
1427        }
1428
1429        if let Some(queue) = &self.fpga_lsf_queue {
1430            queue.validate("fpga").await?;
1431        }
1432
1433        if let Some(prefix) = &self.job_name_prefix
1434            && prefix.len() > MAX_LSF_JOB_NAME_PREFIX
1435        {
1436            bail!(
1437                "LSF job name prefix `{prefix}` exceeds the maximum {MAX_LSF_JOB_NAME_PREFIX} \
1438                 bytes"
1439            );
1440        }
1441
1442        self.apptainer_config.validate().await?;
1443
1444        Ok(())
1445    }
1446
1447    /// Get the appropriate LSF queue for a task under this configuration.
1448    ///
1449    /// Specialized hardware requirements are prioritized over other
1450    /// characteristics, with FPGA taking precedence over GPU.
1451    pub(crate) fn lsf_queue_for_task(
1452        &self,
1453        requirements: &HashMap<String, Value>,
1454        hints: &HashMap<String, Value>,
1455    ) -> Option<&LsfQueueConfig> {
1456        // Specialized hardware gets priority.
1457        if let Some(queue) = self.fpga_lsf_queue.as_ref()
1458            && let Some(true) = requirements
1459                .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1460                .and_then(Value::as_boolean)
1461        {
1462            return Some(queue);
1463        }
1464
1465        if let Some(queue) = self.gpu_lsf_queue.as_ref()
1466            && let Some(true) = requirements
1467                .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1468                .and_then(Value::as_boolean)
1469        {
1470            return Some(queue);
1471        }
1472
1473        // Then short tasks.
1474        if let Some(queue) = self.short_task_lsf_queue.as_ref()
1475            && let Some(true) = hints
1476                .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1477                .and_then(Value::as_boolean)
1478        {
1479            return Some(queue);
1480        }
1481
1482        // Finally the default queue. If this is `None`, `bsub` gets run without a queue
1483        // argument and the cluster's default is used.
1484        self.default_lsf_queue.as_ref()
1485    }
1486}
1487
1488/// Configuration for a Slurm partition.
1489///
1490/// Each partition can optionally have per-task CPU and memory limits set so
1491/// that tasks which are too large to be scheduled on that partition will fail
1492/// immediately instead of pending indefinitely. In the future, these limits may
1493/// be populated or validated by live information from the cluster, but
1494/// for now they must be manually based on the user's understanding of the
1495/// cluster configuration.
1496#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1497#[serde(rename_all = "snake_case", deny_unknown_fields)]
1498pub struct SlurmPartitionConfig {
1499    /// The name of the partition; this is the string passed to `sbatch
1500    /// --partition=<partition_name>`.
1501    pub name: String,
1502    /// The maximum number of CPUs this partition can provision for a single
1503    /// task.
1504    pub max_cpu_per_task: Option<u64>,
1505    /// The maximum memory this partition can provision for a single task.
1506    pub max_memory_per_task: Option<ByteSize>,
1507}
1508
1509impl SlurmPartitionConfig {
1510    /// Validate that this Slurm partition exists according to the local
1511    /// `sinfo`.
1512    pub async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1513        let partition = &self.name;
1514        ensure!(
1515            !partition.is_empty(),
1516            "{name}_slurm_partition name cannot be empty"
1517        );
1518        if let Some(max_cpu_per_task) = self.max_cpu_per_task {
1519            ensure!(
1520                max_cpu_per_task > 0,
1521                "{name}_slurm_partition `{partition}` must allow at least 1 CPU to be provisioned"
1522            );
1523        }
1524        if let Some(max_memory_per_task) = self.max_memory_per_task {
1525            ensure!(
1526                max_memory_per_task.as_u64() > 0,
1527                "{name}_slurm_partition `{partition}` must allow at least some memory to be \
1528                 provisioned"
1529            );
1530        }
1531        match tokio::time::timeout(
1532            // 10 seconds is rather arbitrary; `scontrol` ordinarily returns extremely quickly, but
1533            // we don't want things to run away on a misconfigured system
1534            std::time::Duration::from_secs(10),
1535            Command::new("scontrol")
1536                .arg("show")
1537                .arg("partition")
1538                .arg(partition)
1539                .output(),
1540        )
1541        .await
1542        {
1543            Ok(output) => {
1544                let output = output.context("validating Slurm partition")?;
1545                if !output.status.success() {
1546                    let stdout = String::from_utf8_lossy(&output.stdout);
1547                    let stderr = String::from_utf8_lossy(&output.stderr);
1548                    error!(%stdout, %stderr, %partition, "failed to validate {name}_slurm_partition");
1549                    Err(anyhow!(
1550                        "failed to validate {name}_slurm_partition `{partition}`"
1551                    ))
1552                } else {
1553                    Ok(())
1554                }
1555            }
1556            Err(_) => Err(anyhow!(
1557                "timed out trying to validate {name}_slurm_partition `{partition}`"
1558            )),
1559        }
1560    }
1561}
1562
1563/// Configuration for the Slurm + Apptainer backend.
1564// TODO ACF 2025-09-23: add a Apptainer/Singularity mode config that switches around executable
1565// name, env var names, etc.
1566#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1567#[serde(rename_all = "snake_case", deny_unknown_fields)]
1568pub struct SlurmApptainerBackendConfig {
1569    /// Which partition, if any, to specify when submitting normal jobs to
1570    /// Slurm.
1571    ///
1572    /// This may be superseded by
1573    /// [`short_task_slurm_partition`][Self::short_task_slurm_partition],
1574    /// [`gpu_slurm_partition`][Self::gpu_slurm_partition], or
1575    /// [`fpga_slurm_partition`][Self::fpga_slurm_partition] for corresponding
1576    /// tasks.
1577    pub default_slurm_partition: Option<SlurmPartitionConfig>,
1578    /// Which partition, if any, to specify when submitting [short
1579    /// tasks](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#short_task) to Slurm.
1580    ///
1581    /// This may be superseded by
1582    /// [`gpu_slurm_partition`][Self::gpu_slurm_partition] or
1583    /// [`fpga_slurm_partition`][Self::fpga_slurm_partition] for tasks which
1584    /// require specialized hardware.
1585    pub short_task_slurm_partition: Option<SlurmPartitionConfig>,
1586    /// Which partition, if any, to specify when submitting [tasks which require
1587    /// a GPU](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
1588    /// to Slurm.
1589    pub gpu_slurm_partition: Option<SlurmPartitionConfig>,
1590    /// Which partition, if any, to specify when submitting [tasks which require
1591    /// an FPGA](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
1592    /// to Slurm.
1593    pub fpga_slurm_partition: Option<SlurmPartitionConfig>,
1594    /// Additional command-line arguments to pass to `sbatch` when submitting
1595    /// jobs to Slurm.
1596    pub extra_sbatch_args: Option<Vec<String>>,
1597    /// The configuration of Apptainer, which is used as the container runtime
1598    /// on the compute nodes where Slurm dispatches tasks.
1599    ///
1600    /// Note that this will likely be replaced by an abstraction over multiple
1601    /// container execution runtimes in the future, rather than being
1602    /// hardcoded to Apptainer.
1603    #[serde(default)]
1604    // TODO ACF 2025-10-16: temporarily flatten this into the overall config so that it doesn't
1605    // break existing serialized configs. We'll save breaking the config file format for when we
1606    // actually have meaningful composition of in-place runtimes.
1607    #[serde(flatten)]
1608    pub apptainer_config: ApptainerConfig,
1609}
1610
1611impl SlurmApptainerBackendConfig {
1612    /// Validate that the backend is appropriately configured.
1613    pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1614        if cfg!(not(unix)) {
1615            bail!("Slurm + Apptainer backend is not supported on non-unix platforms");
1616        }
1617        if !engine_config.experimental_features_enabled {
1618            bail!("Slurm + Apptainer backend requires enabling experimental features");
1619        }
1620
1621        // Do what we can to validate options that are dependent on the dynamic
1622        // environment. These are a bit fraught, particularly if the behavior of
1623        // the external tools changes based on where a job gets dispatched, but
1624        // querying from the perspective of the current node allows
1625        // us to get better error messages in circumstances typical to a cluster.
1626        if let Some(partition) = &self.default_slurm_partition {
1627            partition.validate("default").await?;
1628        }
1629        if let Some(partition) = &self.short_task_slurm_partition {
1630            partition.validate("short_task").await?;
1631        }
1632        if let Some(partition) = &self.gpu_slurm_partition {
1633            partition.validate("gpu").await?;
1634        }
1635        if let Some(partition) = &self.fpga_slurm_partition {
1636            partition.validate("fpga").await?;
1637        }
1638
1639        self.apptainer_config.validate().await?;
1640
1641        Ok(())
1642    }
1643
1644    /// Get the appropriate Slurm partition for a task under this configuration.
1645    ///
1646    /// Specialized hardware requirements are prioritized over other
1647    /// characteristics, with FPGA taking precedence over GPU.
1648    pub(crate) fn slurm_partition_for_task(
1649        &self,
1650        requirements: &HashMap<String, Value>,
1651        hints: &HashMap<String, Value>,
1652    ) -> Option<&SlurmPartitionConfig> {
1653        // TODO ACF 2025-09-26: what's the relationship between this code and
1654        // `TaskExecutionConstraints`? Should this be there instead, or be pulling
1655        // values from that instead of directly from `requirements` and `hints`?
1656
1657        // Specialized hardware gets priority.
1658        if let Some(partition) = self.fpga_slurm_partition.as_ref()
1659            && let Some(true) = requirements
1660                .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1661                .and_then(Value::as_boolean)
1662        {
1663            return Some(partition);
1664        }
1665
1666        if let Some(partition) = self.gpu_slurm_partition.as_ref()
1667            && let Some(true) = requirements
1668                .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1669                .and_then(Value::as_boolean)
1670        {
1671            return Some(partition);
1672        }
1673
1674        // Then short tasks.
1675        if let Some(partition) = self.short_task_slurm_partition.as_ref()
1676            && let Some(true) = hints
1677                .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1678                .and_then(Value::as_boolean)
1679        {
1680            return Some(partition);
1681        }
1682
1683        // Finally the default partition. If this is `None`, `sbatch` gets run without a
1684        // partition argument and the cluster's default is used.
1685        self.default_slurm_partition.as_ref()
1686    }
1687}
1688
1689#[cfg(test)]
1690mod test {
1691    use pretty_assertions::assert_eq;
1692
1693    use super::*;
1694
1695    #[test]
1696    fn redacted_secret() {
1697        let mut secret: SecretString = "secret".into();
1698
1699        assert_eq!(
1700            serde_json::to_string(&secret).unwrap(),
1701            format!(r#""{REDACTED}""#)
1702        );
1703
1704        secret.unredact();
1705        assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1706
1707        secret.redact();
1708        assert_eq!(
1709            serde_json::to_string(&secret).unwrap(),
1710            format!(r#""{REDACTED}""#)
1711        );
1712    }
1713
1714    #[test]
1715    fn redacted_config() {
1716        let config = Config {
1717            backends: [
1718                (
1719                    "first".to_string(),
1720                    BackendConfig::Tes(TesBackendConfig {
1721                        auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1722                            username: "foo".into(),
1723                            password: "secret".into(),
1724                        })),
1725                        ..Default::default()
1726                    }),
1727                ),
1728                (
1729                    "second".to_string(),
1730                    BackendConfig::Tes(TesBackendConfig {
1731                        auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1732                            token: "secret".into(),
1733                        })),
1734                        ..Default::default()
1735                    }),
1736                ),
1737            ]
1738            .into(),
1739            storage: StorageConfig {
1740                azure: AzureStorageConfig {
1741                    auth: Some(AzureStorageAuthConfig {
1742                        account_name: "foo".into(),
1743                        access_key: "secret".into(),
1744                    }),
1745                },
1746                s3: S3StorageConfig {
1747                    auth: Some(S3StorageAuthConfig {
1748                        access_key_id: "foo".into(),
1749                        secret_access_key: "secret".into(),
1750                    }),
1751                    ..Default::default()
1752                },
1753                google: GoogleStorageConfig {
1754                    auth: Some(GoogleStorageAuthConfig {
1755                        access_key: "foo".into(),
1756                        secret: "secret".into(),
1757                    }),
1758                },
1759            },
1760            ..Default::default()
1761        };
1762
1763        let json = serde_json::to_string_pretty(&config).unwrap();
1764        assert!(json.contains("secret"), "`{json}` contains a secret");
1765    }
1766
1767    #[tokio::test]
1768    async fn test_config_validate() {
1769        // Test invalid task config
1770        let mut config = Config::default();
1771        config.task.retries = Some(1000000);
1772        assert_eq!(
1773            config.validate().await.unwrap_err().to_string(),
1774            "configuration value `task.retries` cannot exceed 100"
1775        );
1776
1777        // Test invalid scatter concurrency config
1778        let mut config = Config::default();
1779        config.workflow.scatter.concurrency = Some(0);
1780        assert_eq!(
1781            config.validate().await.unwrap_err().to_string(),
1782            "configuration value `workflow.scatter.concurrency` cannot be zero"
1783        );
1784
1785        // Test invalid backend name
1786        let config = Config {
1787            backend: Some("foo".into()),
1788            ..Default::default()
1789        };
1790        assert_eq!(
1791            config.validate().await.unwrap_err().to_string(),
1792            "a backend named `foo` is not present in the configuration"
1793        );
1794        let config = Config {
1795            backend: Some("bar".into()),
1796            backends: [("foo".to_string(), BackendConfig::default())].into(),
1797            ..Default::default()
1798        };
1799        assert_eq!(
1800            config.validate().await.unwrap_err().to_string(),
1801            "a backend named `bar` is not present in the configuration"
1802        );
1803
1804        // Test a singular backend
1805        let config = Config {
1806            backends: [("foo".to_string(), BackendConfig::default())].into(),
1807            ..Default::default()
1808        };
1809        config.validate().await.expect("config should validate");
1810
1811        // Test invalid local backend cpu config
1812        let config = Config {
1813            backends: [(
1814                "default".to_string(),
1815                BackendConfig::Local(LocalBackendConfig {
1816                    cpu: Some(0),
1817                    ..Default::default()
1818                }),
1819            )]
1820            .into(),
1821            ..Default::default()
1822        };
1823        assert_eq!(
1824            config.validate().await.unwrap_err().to_string(),
1825            "local backend configuration value `cpu` cannot be zero"
1826        );
1827        let config = Config {
1828            backends: [(
1829                "default".to_string(),
1830                BackendConfig::Local(LocalBackendConfig {
1831                    cpu: Some(10000000),
1832                    ..Default::default()
1833                }),
1834            )]
1835            .into(),
1836            ..Default::default()
1837        };
1838        assert!(
1839            config
1840                .validate()
1841                .await
1842                .unwrap_err()
1843                .to_string()
1844                .starts_with(
1845                    "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1846                     available to the host"
1847                )
1848        );
1849
1850        // Test invalid local backend memory config
1851        let config = Config {
1852            backends: [(
1853                "default".to_string(),
1854                BackendConfig::Local(LocalBackendConfig {
1855                    memory: Some("0 GiB".to_string()),
1856                    ..Default::default()
1857                }),
1858            )]
1859            .into(),
1860            ..Default::default()
1861        };
1862        assert_eq!(
1863            config.validate().await.unwrap_err().to_string(),
1864            "local backend configuration value `memory` cannot be zero"
1865        );
1866        let config = Config {
1867            backends: [(
1868                "default".to_string(),
1869                BackendConfig::Local(LocalBackendConfig {
1870                    memory: Some("100 meows".to_string()),
1871                    ..Default::default()
1872                }),
1873            )]
1874            .into(),
1875            ..Default::default()
1876        };
1877        assert_eq!(
1878            config.validate().await.unwrap_err().to_string(),
1879            "local backend configuration value `memory` has invalid value `100 meows`"
1880        );
1881
1882        let config = Config {
1883            backends: [(
1884                "default".to_string(),
1885                BackendConfig::Local(LocalBackendConfig {
1886                    memory: Some("1000 TiB".to_string()),
1887                    ..Default::default()
1888                }),
1889            )]
1890            .into(),
1891            ..Default::default()
1892        };
1893        assert!(
1894            config
1895                .validate()
1896                .await
1897                .unwrap_err()
1898                .to_string()
1899                .starts_with(
1900                    "local backend configuration value `memory` cannot exceed the total memory of \
1901                     the host"
1902                )
1903        );
1904
1905        // Test missing TES URL
1906        let config = Config {
1907            backends: [(
1908                "default".to_string(),
1909                BackendConfig::Tes(Default::default()),
1910            )]
1911            .into(),
1912            ..Default::default()
1913        };
1914        assert_eq!(
1915            config.validate().await.unwrap_err().to_string(),
1916            "TES backend configuration value `url` is required"
1917        );
1918
1919        // Test TES invalid max concurrency
1920        let config = Config {
1921            backends: [(
1922                "default".to_string(),
1923                BackendConfig::Tes(TesBackendConfig {
1924                    url: Some("https://example.com".parse().unwrap()),
1925                    max_concurrency: Some(0),
1926                    ..Default::default()
1927                }),
1928            )]
1929            .into(),
1930            ..Default::default()
1931        };
1932        assert_eq!(
1933            config.validate().await.unwrap_err().to_string(),
1934            "TES backend configuration value `max_concurrency` cannot be zero"
1935        );
1936
1937        // Insecure TES URL
1938        let config = Config {
1939            backends: [(
1940                "default".to_string(),
1941                BackendConfig::Tes(TesBackendConfig {
1942                    url: Some("http://example.com".parse().unwrap()),
1943                    inputs: Some("http://example.com".parse().unwrap()),
1944                    outputs: Some("http://example.com".parse().unwrap()),
1945                    ..Default::default()
1946                }),
1947            )]
1948            .into(),
1949            ..Default::default()
1950        };
1951        assert_eq!(
1952            config.validate().await.unwrap_err().to_string(),
1953            "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
1954             must use a HTTPS scheme"
1955        );
1956
1957        // Allow insecure URL
1958        let config = Config {
1959            backends: [(
1960                "default".to_string(),
1961                BackendConfig::Tes(TesBackendConfig {
1962                    url: Some("http://example.com".parse().unwrap()),
1963                    inputs: Some("http://example.com".parse().unwrap()),
1964                    outputs: Some("http://example.com".parse().unwrap()),
1965                    insecure: true,
1966                    ..Default::default()
1967                }),
1968            )]
1969            .into(),
1970            ..Default::default()
1971        };
1972        config
1973            .validate()
1974            .await
1975            .expect("configuration should validate");
1976
1977        let mut config = Config::default();
1978        config.http.parallelism = Some(0);
1979        assert_eq!(
1980            config.validate().await.unwrap_err().to_string(),
1981            "configuration value `http.parallelism` cannot be zero"
1982        );
1983
1984        let mut config = Config::default();
1985        config.http.parallelism = Some(5);
1986        assert!(
1987            config.validate().await.is_ok(),
1988            "should pass for valid configuration"
1989        );
1990
1991        let mut config = Config::default();
1992        config.http.parallelism = None;
1993        assert!(
1994            config.validate().await.is_ok(),
1995            "should pass for default (None)"
1996        );
1997
1998        // Test invalid LSF job name prefix
1999        #[cfg(unix)]
2000        {
2001            let job_name_prefix = "A".repeat(MAX_LSF_JOB_NAME_PREFIX * 2);
2002            let mut config = Config {
2003                experimental_features_enabled: true,
2004                ..Default::default()
2005            };
2006            config.backends.insert(
2007                "default".to_string(),
2008                BackendConfig::LsfApptainer(LsfApptainerBackendConfig {
2009                    job_name_prefix: Some(job_name_prefix.clone()),
2010                    ..Default::default()
2011                }),
2012            );
2013            assert_eq!(
2014                config.validate().await.unwrap_err().to_string(),
2015                format!("LSF job name prefix `{job_name_prefix}` exceeds the maximum 100 bytes")
2016            );
2017        }
2018    }
2019}