wdl_engine/
config.rs

1//! Implementation of engine configuration.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::path::Path;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use anyhow::ensure;
14use bytesize::ByteSize;
15use indexmap::IndexMap;
16use secrecy::ExposeSecret;
17use serde::Deserialize;
18use serde::Serialize;
19use tokio::process::Command;
20use tracing::error;
21use tracing::warn;
22use url::Url;
23
24use crate::CancellationContext;
25use crate::Events;
26use crate::SYSTEM;
27use crate::Value;
28use crate::backend::TaskExecutionBackend;
29use crate::convert_unit_string;
30use crate::path::is_supported_url;
31
32/// The inclusive maximum number of task retries the engine supports.
33pub(crate) const MAX_RETRIES: u64 = 100;
34
35/// The default task shell.
36pub(crate) const DEFAULT_TASK_SHELL: &str = "bash";
37
38/// The default backend name.
39pub(crate) const DEFAULT_BACKEND_NAME: &str = "default";
40
41/// The string that replaces redacted serialization fields.
42const REDACTED: &str = "<REDACTED>";
43
44/// Gets tne default root cache directory for the user.
45pub(crate) fn cache_dir() -> Result<PathBuf> {
46    /// The subdirectory within the user's cache directory for all caches
47    const CACHE_DIR_ROOT: &str = "sprocket";
48
49    Ok(dirs::cache_dir()
50        .context("failed to determine user cache directory")?
51        .join(CACHE_DIR_ROOT))
52}
53
54/// Represents a secret string that is, by default, redacted for serialization.
55///
56/// This type is a wrapper around [`secrecy::SecretString`].
57#[derive(Debug, Clone)]
58pub struct SecretString {
59    /// The inner secret string.
60    ///
61    /// This type is not serializable.
62    inner: secrecy::SecretString,
63    /// Whether or not the secret string is redacted for serialization.
64    ///
65    /// If `true` (the default), `<REDACTED>` is serialized for the string's
66    /// value.
67    ///
68    /// If `false`, the inner secret string is exposed for serialization.
69    redacted: bool,
70}
71
72impl SecretString {
73    /// Redacts the secret for serialization.
74    ///
75    /// By default, a [`SecretString`] is redacted; when redacted, the string is
76    /// replaced with `<REDACTED>` when serialized.
77    pub fn redact(&mut self) {
78        self.redacted = true;
79    }
80
81    /// Unredacts the secret for serialization.
82    pub fn unredact(&mut self) {
83        self.redacted = false;
84    }
85
86    /// Gets the inner [`secrecy::SecretString`].
87    pub fn inner(&self) -> &secrecy::SecretString {
88        &self.inner
89    }
90}
91
92impl From<String> for SecretString {
93    fn from(s: String) -> Self {
94        Self {
95            inner: s.into(),
96            redacted: true,
97        }
98    }
99}
100
101impl From<&str> for SecretString {
102    fn from(s: &str) -> Self {
103        Self {
104            inner: s.into(),
105            redacted: true,
106        }
107    }
108}
109
110impl Default for SecretString {
111    fn default() -> Self {
112        Self {
113            inner: Default::default(),
114            redacted: true,
115        }
116    }
117}
118
119impl serde::Serialize for SecretString {
120    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
121    where
122        S: serde::Serializer,
123    {
124        use secrecy::ExposeSecret;
125
126        if self.redacted {
127            serializer.serialize_str(REDACTED)
128        } else {
129            serializer.serialize_str(self.inner.expose_secret())
130        }
131    }
132}
133
134impl<'de> serde::Deserialize<'de> for SecretString {
135    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
136    where
137        D: serde::Deserializer<'de>,
138    {
139        let inner = secrecy::SecretString::deserialize(deserializer)?;
140        Ok(Self {
141            inner,
142            redacted: true,
143        })
144    }
145}
146
147/// Represents how an evaluation error or cancellation should be handled by the
148/// engine.
149#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
150#[serde(rename_all = "snake_case")]
151pub enum FailureMode {
152    /// When an error is encountered or evaluation is canceled, evaluation waits
153    /// for any outstanding tasks to complete.
154    #[default]
155    Slow,
156    /// When an error is encountered or evaluation is canceled, any outstanding
157    /// tasks that are executing are immediately canceled and evaluation waits
158    /// for cancellation to complete.
159    Fast,
160}
161
162/// Represents WDL evaluation configuration.
163///
164/// <div class="warning">
165///
166/// By default, serialization of [`Config`] will redact the values of secrets.
167///
168/// Use the [`Config::unredact`] method before serialization to prevent the
169/// secrets from being redacted.
170///
171/// </div>
172#[derive(Debug, Default, Clone, Serialize, Deserialize)]
173#[serde(rename_all = "snake_case", deny_unknown_fields)]
174pub struct Config {
175    /// HTTP configuration.
176    #[serde(default)]
177    pub http: HttpConfig,
178    /// Workflow evaluation configuration.
179    #[serde(default)]
180    pub workflow: WorkflowConfig,
181    /// Task evaluation configuration.
182    #[serde(default)]
183    pub task: TaskConfig,
184    /// The name of the backend to use.
185    ///
186    /// If not specified and `backends` has multiple entries, it will use a name
187    /// of `default`.
188    #[serde(skip_serializing_if = "Option::is_none")]
189    pub backend: Option<String>,
190    /// Task execution backends configuration.
191    ///
192    /// If the collection is empty and `backend` is not specified, the engine
193    /// default backend is used.
194    ///
195    /// If the collection has exactly one entry and `backend` is not specified,
196    /// the singular entry will be used.
197    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
198    pub backends: IndexMap<String, BackendConfig>,
199    /// Storage configuration.
200    #[serde(default)]
201    pub storage: StorageConfig,
202    /// (Experimental) Avoid environment-specific output; default is `false`.
203    ///
204    /// If this option is `true`, selected error messages and log output will
205    /// avoid emitting environment-specific output such as absolute paths
206    /// and system resource counts.
207    ///
208    /// This is largely meant to support "golden testing" where a test's success
209    /// depends on matching an expected set of outputs exactly. Cues that
210    /// help users overcome errors, such as the path to a temporary
211    /// directory or the number of CPUs available to the system, confound this
212    /// style of testing. This flag is a best-effort experimental attempt to
213    /// reduce the impact of these differences in order to allow a wider
214    /// range of golden tests to be written.
215    #[serde(default)]
216    pub suppress_env_specific_output: bool,
217    /// (Experimental) Whether experimental features are enabled; default is
218    /// `false`.
219    ///
220    /// Experimental features are provided to users with heavy caveats about
221    /// their stability and rough edges. Use at your own risk, but feedback
222    /// is quite welcome.
223    #[serde(default)]
224    pub experimental_features_enabled: bool,
225    /// The failure mode for workflow or task evaluation.
226    ///
227    /// A value of [`FailureMode::Slow`] will result in evaluation waiting for
228    /// executing tasks to complete upon error or interruption.
229    ///
230    /// A value of [`FailureMode::Fast`] will immediately attempt to cancel
231    /// executing tasks upon error or interruption.
232    #[serde(default, rename = "fail")]
233    pub failure_mode: FailureMode,
234}
235
236impl Config {
237    /// Validates the evaluation configuration.
238    pub async fn validate(&self) -> Result<()> {
239        self.http.validate()?;
240        self.workflow.validate()?;
241        self.task.validate()?;
242
243        if self.backend.is_none() && self.backends.len() < 2 {
244            // This is OK, we'll use either the singular backends entry (1) or
245            // the default (0)
246        } else {
247            // Check the backends map for the backend name (or "default")
248            let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
249            if !self.backends.contains_key(backend) {
250                bail!("a backend named `{backend}` is not present in the configuration");
251            }
252        }
253
254        for backend in self.backends.values() {
255            backend.validate(self).await?;
256        }
257
258        self.storage.validate()?;
259
260        if self.suppress_env_specific_output && !self.experimental_features_enabled {
261            bail!("`suppress_env_specific_output` requires enabling experimental features");
262        }
263
264        Ok(())
265    }
266
267    /// Redacts the secrets contained in the configuration.
268    ///
269    /// By default, secrets are redacted for serialization.
270    pub fn redact(&mut self) {
271        for backend in self.backends.values_mut() {
272            backend.redact();
273        }
274
275        if let Some(auth) = &mut self.storage.azure.auth {
276            auth.redact();
277        }
278
279        if let Some(auth) = &mut self.storage.s3.auth {
280            auth.redact();
281        }
282
283        if let Some(auth) = &mut self.storage.google.auth {
284            auth.redact();
285        }
286    }
287
288    /// Unredacts the secrets contained in the configuration.
289    ///
290    /// Calling this method will expose secrets for serialization.
291    pub fn unredact(&mut self) {
292        for backend in self.backends.values_mut() {
293            backend.unredact();
294        }
295
296        if let Some(auth) = &mut self.storage.azure.auth {
297            auth.unredact();
298        }
299
300        if let Some(auth) = &mut self.storage.s3.auth {
301            auth.unredact();
302        }
303
304        if let Some(auth) = &mut self.storage.google.auth {
305            auth.unredact();
306        }
307    }
308
309    /// Gets the backend configuration.
310    ///
311    /// Returns an error if the configuration specifies a named backend that
312    /// isn't present in the configuration.
313    pub fn backend(&self) -> Result<Cow<'_, BackendConfig>> {
314        if self.backend.is_some() || self.backends.len() >= 2 {
315            // Lookup the backend to use
316            let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
317            return Ok(Cow::Borrowed(self.backends.get(backend).ok_or_else(
318                || anyhow!("a backend named `{backend}` is not present in the configuration"),
319            )?));
320        }
321
322        if self.backends.len() == 1 {
323            // Use the singular entry
324            Ok(Cow::Borrowed(self.backends.values().next().unwrap()))
325        } else {
326            // Use the default
327            Ok(Cow::Owned(BackendConfig::default()))
328        }
329    }
330
331    /// Creates a new task execution backend based on this configuration.
332    pub(crate) async fn create_backend(
333        self: &Arc<Self>,
334        run_root_dir: &Path,
335        events: Events,
336        cancellation: CancellationContext,
337    ) -> Result<Arc<dyn TaskExecutionBackend>> {
338        use crate::backend::*;
339
340        match self.backend()?.as_ref() {
341            BackendConfig::Local(_) => {
342                warn!(
343                    "the engine is configured to use the local backend: tasks will not be run \
344                     inside of a container"
345                );
346                Ok(Arc::new(LocalBackend::new(
347                    self.clone(),
348                    events,
349                    cancellation,
350                )?))
351            }
352            BackendConfig::Docker(_) => Ok(Arc::new(
353                DockerBackend::new(self.clone(), events, cancellation).await?,
354            )),
355            BackendConfig::Tes(_) => Ok(Arc::new(
356                TesBackend::new(self.clone(), events, cancellation).await?,
357            )),
358            BackendConfig::LsfApptainer(_) => Ok(Arc::new(LsfApptainerBackend::new(
359                self.clone(),
360                run_root_dir,
361                events,
362                cancellation,
363            )?)),
364            BackendConfig::SlurmApptainer(_) => Ok(Arc::new(SlurmApptainerBackend::new(
365                self.clone(),
366                run_root_dir,
367                events,
368                cancellation,
369            )?)),
370        }
371    }
372}
373
374/// Represents HTTP configuration.
375#[derive(Debug, Default, Clone, Serialize, Deserialize)]
376#[serde(rename_all = "snake_case", deny_unknown_fields)]
377pub struct HttpConfig {
378    /// The HTTP download cache location.
379    ///
380    /// Defaults to an operating system specific cache directory for the user.
381    #[serde(default, skip_serializing_if = "Option::is_none")]
382    pub cache_dir: Option<PathBuf>,
383    /// The number of retries for transferring files.
384    ///
385    /// Defaults to `5`.
386    #[serde(default, skip_serializing_if = "Option::is_none")]
387    pub retries: Option<usize>,
388    /// The maximum parallelism for file transfers.
389    ///
390    /// Defaults to the host's available parallelism.
391    #[serde(default, skip_serializing_if = "Option::is_none")]
392    pub parallelism: Option<usize>,
393}
394
395impl HttpConfig {
396    /// Validates the HTTP configuration.
397    pub fn validate(&self) -> Result<()> {
398        if let Some(parallelism) = self.parallelism
399            && parallelism == 0
400        {
401            bail!("configuration value `http.parallelism` cannot be zero");
402        }
403        Ok(())
404    }
405}
406
407/// Represents storage configuration.
408#[derive(Debug, Default, Clone, Serialize, Deserialize)]
409#[serde(rename_all = "snake_case", deny_unknown_fields)]
410pub struct StorageConfig {
411    /// Azure Blob Storage configuration.
412    #[serde(default)]
413    pub azure: AzureStorageConfig,
414    /// AWS S3 configuration.
415    #[serde(default)]
416    pub s3: S3StorageConfig,
417    /// Google Cloud Storage configuration.
418    #[serde(default)]
419    pub google: GoogleStorageConfig,
420}
421
422impl StorageConfig {
423    /// Validates the HTTP configuration.
424    pub fn validate(&self) -> Result<()> {
425        self.azure.validate()?;
426        self.s3.validate()?;
427        self.google.validate()?;
428        Ok(())
429    }
430}
431
432/// Represents authentication information for Azure Blob Storage.
433#[derive(Debug, Default, Clone, Serialize, Deserialize)]
434#[serde(rename_all = "snake_case", deny_unknown_fields)]
435pub struct AzureStorageAuthConfig {
436    /// The Azure Storage account name to use.
437    pub account_name: String,
438    /// The Azure Storage access key to use.
439    pub access_key: SecretString,
440}
441
442impl AzureStorageAuthConfig {
443    /// Validates the Azure Blob Storage authentication configuration.
444    pub fn validate(&self) -> Result<()> {
445        if self.account_name.is_empty() {
446            bail!("configuration value `storage.azure.auth.account_name` is required");
447        }
448
449        if self.access_key.inner.expose_secret().is_empty() {
450            bail!("configuration value `storage.azure.auth.access_key` is required");
451        }
452
453        Ok(())
454    }
455
456    /// Redacts the secrets contained in the Azure Blob Storage storage
457    /// authentication configuration.
458    pub fn redact(&mut self) {
459        self.access_key.redact();
460    }
461
462    /// Unredacts the secrets contained in the Azure Blob Storage authentication
463    /// configuration.
464    pub fn unredact(&mut self) {
465        self.access_key.unredact();
466    }
467}
468
469/// Represents configuration for Azure Blob Storage.
470#[derive(Debug, Default, Clone, Serialize, Deserialize)]
471#[serde(rename_all = "snake_case", deny_unknown_fields)]
472pub struct AzureStorageConfig {
473    /// The Azure Blob Storage authentication configuration.
474    #[serde(default, skip_serializing_if = "Option::is_none")]
475    pub auth: Option<AzureStorageAuthConfig>,
476}
477
478impl AzureStorageConfig {
479    /// Validates the Azure Blob Storage configuration.
480    pub fn validate(&self) -> Result<()> {
481        if let Some(auth) = &self.auth {
482            auth.validate()?;
483        }
484
485        Ok(())
486    }
487}
488
489/// Represents authentication information for AWS S3 storage.
490#[derive(Debug, Default, Clone, Serialize, Deserialize)]
491#[serde(rename_all = "snake_case", deny_unknown_fields)]
492pub struct S3StorageAuthConfig {
493    /// The AWS Access Key ID to use.
494    pub access_key_id: String,
495    /// The AWS Secret Access Key to use.
496    pub secret_access_key: SecretString,
497}
498
499impl S3StorageAuthConfig {
500    /// Validates the AWS S3 storage authentication configuration.
501    pub fn validate(&self) -> Result<()> {
502        if self.access_key_id.is_empty() {
503            bail!("configuration value `storage.s3.auth.access_key_id` is required");
504        }
505
506        if self.secret_access_key.inner.expose_secret().is_empty() {
507            bail!("configuration value `storage.s3.auth.secret_access_key` is required");
508        }
509
510        Ok(())
511    }
512
513    /// Redacts the secrets contained in the AWS S3 storage authentication
514    /// configuration.
515    pub fn redact(&mut self) {
516        self.secret_access_key.redact();
517    }
518
519    /// Unredacts the secrets contained in the AWS S3 storage authentication
520    /// configuration.
521    pub fn unredact(&mut self) {
522        self.secret_access_key.unredact();
523    }
524}
525
526/// Represents configuration for AWS S3 storage.
527#[derive(Debug, Default, Clone, Serialize, Deserialize)]
528#[serde(rename_all = "snake_case", deny_unknown_fields)]
529pub struct S3StorageConfig {
530    /// The default region to use for S3-schemed URLs (e.g.
531    /// `s3://<bucket>/<blob>`).
532    ///
533    /// Defaults to `us-east-1`.
534    #[serde(default, skip_serializing_if = "Option::is_none")]
535    pub region: Option<String>,
536
537    /// The AWS S3 storage authentication configuration.
538    #[serde(default, skip_serializing_if = "Option::is_none")]
539    pub auth: Option<S3StorageAuthConfig>,
540}
541
542impl S3StorageConfig {
543    /// Validates the AWS S3 storage configuration.
544    pub fn validate(&self) -> Result<()> {
545        if let Some(auth) = &self.auth {
546            auth.validate()?;
547        }
548
549        Ok(())
550    }
551}
552
553/// Represents authentication information for Google Cloud Storage.
554#[derive(Debug, Default, Clone, Serialize, Deserialize)]
555#[serde(rename_all = "snake_case", deny_unknown_fields)]
556pub struct GoogleStorageAuthConfig {
557    /// The HMAC Access Key to use.
558    pub access_key: String,
559    /// The HMAC Secret to use.
560    pub secret: SecretString,
561}
562
563impl GoogleStorageAuthConfig {
564    /// Validates the Google Cloud Storage authentication configuration.
565    pub fn validate(&self) -> Result<()> {
566        if self.access_key.is_empty() {
567            bail!("configuration value `storage.google.auth.access_key` is required");
568        }
569
570        if self.secret.inner.expose_secret().is_empty() {
571            bail!("configuration value `storage.google.auth.secret` is required");
572        }
573
574        Ok(())
575    }
576
577    /// Redacts the secrets contained in the Google Cloud Storage authentication
578    /// configuration.
579    pub fn redact(&mut self) {
580        self.secret.redact();
581    }
582
583    /// Unredacts the secrets contained in the Google Cloud Storage
584    /// authentication configuration.
585    pub fn unredact(&mut self) {
586        self.secret.unredact();
587    }
588}
589
590/// Represents configuration for Google Cloud Storage.
591#[derive(Debug, Default, Clone, Serialize, Deserialize)]
592#[serde(rename_all = "snake_case", deny_unknown_fields)]
593pub struct GoogleStorageConfig {
594    /// The Google Cloud Storage authentication configuration.
595    #[serde(default, skip_serializing_if = "Option::is_none")]
596    pub auth: Option<GoogleStorageAuthConfig>,
597}
598
599impl GoogleStorageConfig {
600    /// Validates the Google Cloud Storage configuration.
601    pub fn validate(&self) -> Result<()> {
602        if let Some(auth) = &self.auth {
603            auth.validate()?;
604        }
605
606        Ok(())
607    }
608}
609
610/// Represents workflow evaluation configuration.
611#[derive(Debug, Default, Clone, Serialize, Deserialize)]
612#[serde(rename_all = "snake_case", deny_unknown_fields)]
613pub struct WorkflowConfig {
614    /// Scatter statement evaluation configuration.
615    #[serde(default)]
616    pub scatter: ScatterConfig,
617}
618
619impl WorkflowConfig {
620    /// Validates the workflow configuration.
621    pub fn validate(&self) -> Result<()> {
622        self.scatter.validate()?;
623        Ok(())
624    }
625}
626
627/// Represents scatter statement evaluation configuration.
628#[derive(Debug, Default, Clone, Serialize, Deserialize)]
629#[serde(rename_all = "snake_case", deny_unknown_fields)]
630pub struct ScatterConfig {
631    /// The number of scatter array elements to process concurrently.
632    ///
633    /// Defaults to `1000`.
634    ///
635    /// A value of `0` is invalid.
636    ///
637    /// Lower values use less memory for evaluation and higher values may better
638    /// saturate the task execution backend with tasks to execute for large
639    /// scatters.
640    ///
641    /// This setting does not change how many tasks an execution backend can run
642    /// concurrently, but may affect how many tasks are sent to the backend to
643    /// run at a time.
644    ///
645    /// For example, if `concurrency` was set to 10 and we evaluate the
646    /// following scatters:
647    ///
648    /// ```wdl
649    /// scatter (i in range(100)) {
650    ///     call my_task
651    /// }
652    ///
653    /// scatter (j in range(100)) {
654    ///     call my_task as my_task2
655    /// }
656    /// ```
657    ///
658    /// Here each scatter is independent and therefore there will be 20 calls
659    /// (10 for each scatter) made concurrently. If the task execution
660    /// backend can only execute 5 tasks concurrently, 5 tasks will execute
661    /// and 15 will be "ready" to execute and waiting for an executing task
662    /// to complete.
663    ///
664    /// If instead we evaluate the following scatters:
665    ///
666    /// ```wdl
667    /// scatter (i in range(100)) {
668    ///     scatter (j in range(100)) {
669    ///         call my_task
670    ///     }
671    /// }
672    /// ```
673    ///
674    /// Then there will be 100 calls (10*10 as 10 are made for each outer
675    /// element) made concurrently. If the task execution backend can only
676    /// execute 5 tasks concurrently, 5 tasks will execute and 95 will be
677    /// "ready" to execute and waiting for an executing task to complete.
678    ///
679    /// <div class="warning">
680    /// Warning: nested scatter statements cause exponential memory usage based
681    /// on this value, as each scatter statement evaluation requires allocating
682    /// new scopes for scatter array elements being processed. </div>
683    #[serde(default, skip_serializing_if = "Option::is_none")]
684    pub concurrency: Option<u64>,
685}
686
687impl ScatterConfig {
688    /// Validates the scatter configuration.
689    pub fn validate(&self) -> Result<()> {
690        if let Some(concurrency) = self.concurrency
691            && concurrency == 0
692        {
693            bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
694        }
695
696        Ok(())
697    }
698}
699
700/// Represents the supported call caching modes.
701#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
702#[serde(rename_all = "snake_case")]
703pub enum CallCachingMode {
704    /// Call caching is disabled.
705    ///
706    /// The call cache is not checked and new entries are not added to the
707    /// cache.
708    ///
709    /// This is the default value.
710    #[default]
711    Off,
712    /// Call caching is enabled.
713    ///
714    /// The call cache is checked and new entries are added to the cache.
715    ///
716    /// Defaults the `cacheable` task hint to `true`.
717    On,
718    /// Call caching is enabled only for tasks that explicitly have a
719    /// `cacheable` hint set to `true`.
720    ///
721    /// The call cache is checked and new entries are added to the cache *only*
722    /// for tasks that have the `cacheable` hint set to `true`.
723    ///
724    /// Defaults the `cacheable` task hint to `false`.
725    Explicit,
726}
727
728/// Represents the supported modes for calculating content digests.
729#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
730#[serde(rename_all = "snake_case")]
731pub enum ContentDigestMode {
732    /// Use a strong digest for file content.
733    ///
734    /// Strong digests require hashing all of the contents of a file; this may
735    /// noticeably impact performance for very large files.
736    ///
737    /// This setting guarantees that a modified file will be detected.
738    Strong,
739    /// Use a weak digest for file content.
740    ///
741    /// A weak digest is based solely off of file metadata, such as size and
742    /// last modified time.
743    ///
744    /// This setting cannot guarantee the detection of modified files and may
745    /// result in a modified file not causing a call cache entry to be
746    /// invalidated.
747    ///
748    /// However, it is substantially faster than using a strong digest.
749    #[default]
750    Weak,
751}
752
753/// Represents task evaluation configuration.
754#[derive(Debug, Default, Clone, Serialize, Deserialize)]
755#[serde(rename_all = "snake_case", deny_unknown_fields)]
756pub struct TaskConfig {
757    /// The default maximum number of retries to attempt if a task fails.
758    ///
759    /// A task's `max_retries` requirement will override this value.
760    ///
761    /// Defaults to 0 (no retries).
762    #[serde(default, skip_serializing_if = "Option::is_none")]
763    pub retries: Option<u64>,
764    /// The default container to use if a container is not specified in a task's
765    /// requirements.
766    ///
767    /// Defaults to `ubuntu:latest`.
768    #[serde(default, skip_serializing_if = "Option::is_none")]
769    pub container: Option<String>,
770    /// The default shell to use for tasks.
771    ///
772    /// Defaults to `bash`.
773    ///
774    /// <div class="warning">
775    /// Warning: the use of a shell other than `bash` may lead to tasks that may
776    /// not be portable to other execution engines.</div>
777    #[serde(default, skip_serializing_if = "Option::is_none")]
778    pub shell: Option<String>,
779    /// The behavior when a task's `cpu` requirement cannot be met.
780    #[serde(default)]
781    pub cpu_limit_behavior: TaskResourceLimitBehavior,
782    /// The behavior when a task's `memory` requirement cannot be met.
783    #[serde(default)]
784    pub memory_limit_behavior: TaskResourceLimitBehavior,
785    /// The call cache directory to use for caching task execution results.
786    ///
787    /// Defaults to an operating system specific cache directory for the user.
788    #[serde(default, skip_serializing_if = "Option::is_none")]
789    pub cache_dir: Option<PathBuf>,
790    /// The call caching mode to use for tasks.
791    #[serde(default)]
792    pub cache: CallCachingMode,
793    /// The content digest mode to use.
794    ///
795    /// Used as part of call caching.
796    #[serde(default)]
797    pub digests: ContentDigestMode,
798}
799
800impl TaskConfig {
801    /// Validates the task evaluation configuration.
802    pub fn validate(&self) -> Result<()> {
803        if self.retries.unwrap_or(0) > MAX_RETRIES {
804            bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
805        }
806
807        Ok(())
808    }
809}
810
811/// The behavior when a task resource requirement, such as `cpu` or `memory`,
812/// cannot be met.
813#[derive(Debug, Default, Clone, Serialize, Deserialize)]
814#[serde(rename_all = "snake_case", deny_unknown_fields)]
815pub enum TaskResourceLimitBehavior {
816    /// Try executing a task with the maximum amount of the resource available
817    /// when the task's corresponding requirement cannot be met.
818    TryWithMax,
819    /// Do not execute a task if its corresponding requirement cannot be met.
820    ///
821    /// This is the default behavior.
822    #[default]
823    Deny,
824}
825
826/// Represents supported task execution backends.
827#[derive(Debug, Clone, Serialize, Deserialize)]
828#[serde(rename_all = "snake_case", tag = "type")]
829pub enum BackendConfig {
830    /// Use the local task execution backend.
831    Local(LocalBackendConfig),
832    /// Use the Docker task execution backend.
833    Docker(DockerBackendConfig),
834    /// Use the TES task execution backend.
835    Tes(TesBackendConfig),
836    /// Use the experimental LSF + Apptainer task execution backend.
837    ///
838    /// Requires enabling experimental features.
839    LsfApptainer(LsfApptainerBackendConfig),
840    /// Use the experimental Slurm + Apptainer task execution backend.
841    ///
842    /// Requires enabling experimental features.
843    SlurmApptainer(SlurmApptainerBackendConfig),
844}
845
846impl Default for BackendConfig {
847    fn default() -> Self {
848        Self::Docker(Default::default())
849    }
850}
851
852impl BackendConfig {
853    /// Validates the backend configuration.
854    pub async fn validate(&self, engine_config: &Config) -> Result<()> {
855        match self {
856            Self::Local(config) => config.validate(),
857            Self::Docker(config) => config.validate(),
858            Self::Tes(config) => config.validate(),
859            Self::LsfApptainer(config) => config.validate(engine_config).await,
860            Self::SlurmApptainer(config) => config.validate(engine_config).await,
861        }
862    }
863
864    /// Converts the backend configuration into a local backend configuration
865    ///
866    /// Returns `None` if the backend configuration is not local.
867    pub fn as_local(&self) -> Option<&LocalBackendConfig> {
868        match self {
869            Self::Local(config) => Some(config),
870            _ => None,
871        }
872    }
873
874    /// Converts the backend configuration into a Docker backend configuration
875    ///
876    /// Returns `None` if the backend configuration is not Docker.
877    pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
878        match self {
879            Self::Docker(config) => Some(config),
880            _ => None,
881        }
882    }
883
884    /// Converts the backend configuration into a TES backend configuration
885    ///
886    /// Returns `None` if the backend configuration is not TES.
887    pub fn as_tes(&self) -> Option<&TesBackendConfig> {
888        match self {
889            Self::Tes(config) => Some(config),
890            _ => None,
891        }
892    }
893
894    /// Converts the backend configuration into a LSF Apptainer backend
895    /// configuration
896    ///
897    /// Returns `None` if the backend configuration is not LSF Apptainer.
898    pub fn as_lsf_apptainer(&self) -> Option<&LsfApptainerBackendConfig> {
899        match self {
900            Self::LsfApptainer(config) => Some(config),
901            _ => None,
902        }
903    }
904
905    /// Converts the backend configuration into a Slurm Apptainer backend
906    /// configuration
907    ///
908    /// Returns `None` if the backend configuration is not Slurm Apptainer.
909    pub fn as_slurm_apptainer(&self) -> Option<&SlurmApptainerBackendConfig> {
910        match self {
911            Self::SlurmApptainer(config) => Some(config),
912            _ => None,
913        }
914    }
915
916    /// Redacts the secrets contained in the backend configuration.
917    pub fn redact(&mut self) {
918        match self {
919            Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
920            Self::Tes(config) => config.redact(),
921        }
922    }
923
924    /// Unredacts the secrets contained in the backend configuration.
925    pub fn unredact(&mut self) {
926        match self {
927            Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
928            Self::Tes(config) => config.unredact(),
929        }
930    }
931}
932
933/// Represents configuration for the local task execution backend.
934///
935/// <div class="warning">
936/// Warning: the local task execution backend spawns processes on the host
937/// directly without the use of a container; only use this backend on trusted
938/// WDL. </div>
939#[derive(Debug, Default, Clone, Serialize, Deserialize)]
940#[serde(rename_all = "snake_case", deny_unknown_fields)]
941pub struct LocalBackendConfig {
942    /// Set the number of CPUs available for task execution.
943    ///
944    /// Defaults to the number of logical CPUs for the host.
945    ///
946    /// The value cannot be zero or exceed the host's number of CPUs.
947    #[serde(default, skip_serializing_if = "Option::is_none")]
948    pub cpu: Option<u64>,
949
950    /// Set the total amount of memory for task execution as a unit string (e.g.
951    /// `2 GiB`).
952    ///
953    /// Defaults to the total amount of memory for the host.
954    ///
955    /// The value cannot be zero or exceed the host's total amount of memory.
956    #[serde(default, skip_serializing_if = "Option::is_none")]
957    pub memory: Option<String>,
958}
959
960impl LocalBackendConfig {
961    /// Validates the local task execution backend configuration.
962    pub fn validate(&self) -> Result<()> {
963        if let Some(cpu) = self.cpu {
964            if cpu == 0 {
965                bail!("local backend configuration value `cpu` cannot be zero");
966            }
967
968            let total = SYSTEM.cpus().len() as u64;
969            if cpu > total {
970                bail!(
971                    "local backend configuration value `cpu` cannot exceed the virtual CPUs \
972                     available to the host ({total})"
973                );
974            }
975        }
976
977        if let Some(memory) = &self.memory {
978            let memory = convert_unit_string(memory).with_context(|| {
979                format!("local backend configuration value `memory` has invalid value `{memory}`")
980            })?;
981
982            if memory == 0 {
983                bail!("local backend configuration value `memory` cannot be zero");
984            }
985
986            let total = SYSTEM.total_memory();
987            if memory > total {
988                bail!(
989                    "local backend configuration value `memory` cannot exceed the total memory of \
990                     the host ({total} bytes)"
991                );
992            }
993        }
994
995        Ok(())
996    }
997}
998
999/// Gets the default value for the docker `cleanup` field.
1000const fn cleanup_default() -> bool {
1001    true
1002}
1003
1004/// Represents configuration for the Docker backend.
1005#[derive(Debug, Clone, Serialize, Deserialize)]
1006#[serde(rename_all = "snake_case", deny_unknown_fields)]
1007pub struct DockerBackendConfig {
1008    /// Whether or not to remove a task's container after the task completes.
1009    ///
1010    /// Defaults to `true`.
1011    #[serde(default = "cleanup_default")]
1012    pub cleanup: bool,
1013}
1014
1015impl DockerBackendConfig {
1016    /// Validates the Docker backend configuration.
1017    pub fn validate(&self) -> Result<()> {
1018        Ok(())
1019    }
1020}
1021
1022impl Default for DockerBackendConfig {
1023    fn default() -> Self {
1024        Self { cleanup: true }
1025    }
1026}
1027
1028/// Represents HTTP basic authentication configuration.
1029#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1030#[serde(rename_all = "snake_case", deny_unknown_fields)]
1031pub struct BasicAuthConfig {
1032    /// The HTTP basic authentication username.
1033    #[serde(default)]
1034    pub username: String,
1035    /// The HTTP basic authentication password.
1036    #[serde(default)]
1037    pub password: SecretString,
1038}
1039
1040impl BasicAuthConfig {
1041    /// Validates the HTTP basic auth configuration.
1042    pub fn validate(&self) -> Result<()> {
1043        Ok(())
1044    }
1045
1046    /// Redacts the secrets contained in the HTTP basic auth configuration.
1047    pub fn redact(&mut self) {
1048        self.password.redact();
1049    }
1050
1051    /// Unredacts the secrets contained in the HTTP basic auth configuration.
1052    pub fn unredact(&mut self) {
1053        self.password.unredact();
1054    }
1055}
1056
1057/// Represents HTTP bearer token authentication configuration.
1058#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1059#[serde(rename_all = "snake_case", deny_unknown_fields)]
1060pub struct BearerAuthConfig {
1061    /// The HTTP bearer authentication token.
1062    #[serde(default)]
1063    pub token: SecretString,
1064}
1065
1066impl BearerAuthConfig {
1067    /// Validates the HTTP bearer auth configuration.
1068    pub fn validate(&self) -> Result<()> {
1069        Ok(())
1070    }
1071
1072    /// Redacts the secrets contained in the HTTP bearer auth configuration.
1073    pub fn redact(&mut self) {
1074        self.token.redact();
1075    }
1076
1077    /// Unredacts the secrets contained in the HTTP bearer auth configuration.
1078    pub fn unredact(&mut self) {
1079        self.token.unredact();
1080    }
1081}
1082
1083/// Represents the kind of authentication for a TES backend.
1084#[derive(Debug, Clone, Serialize, Deserialize)]
1085#[serde(rename_all = "snake_case", tag = "type")]
1086pub enum TesBackendAuthConfig {
1087    /// Use basic authentication for the TES backend.
1088    Basic(BasicAuthConfig),
1089    /// Use bearer token authentication for the TES backend.
1090    Bearer(BearerAuthConfig),
1091}
1092
1093impl TesBackendAuthConfig {
1094    /// Validates the TES backend authentication configuration.
1095    pub fn validate(&self) -> Result<()> {
1096        match self {
1097            Self::Basic(config) => config.validate(),
1098            Self::Bearer(config) => config.validate(),
1099        }
1100    }
1101
1102    /// Redacts the secrets contained in the TES backend authentication
1103    /// configuration.
1104    pub fn redact(&mut self) {
1105        match self {
1106            Self::Basic(auth) => auth.redact(),
1107            Self::Bearer(auth) => auth.redact(),
1108        }
1109    }
1110
1111    /// Unredacts the secrets contained in the TES backend authentication
1112    /// configuration.
1113    pub fn unredact(&mut self) {
1114        match self {
1115            Self::Basic(auth) => auth.unredact(),
1116            Self::Bearer(auth) => auth.unredact(),
1117        }
1118    }
1119}
1120
1121/// Represents configuration for the Task Execution Service (TES) backend.
1122#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1123#[serde(rename_all = "snake_case", deny_unknown_fields)]
1124pub struct TesBackendConfig {
1125    /// The URL of the Task Execution Service.
1126    #[serde(default, skip_serializing_if = "Option::is_none")]
1127    pub url: Option<Url>,
1128
1129    /// The authentication configuration for the TES backend.
1130    #[serde(default, skip_serializing_if = "Option::is_none")]
1131    pub auth: Option<TesBackendAuthConfig>,
1132
1133    /// The root cloud storage URL for storing inputs.
1134    #[serde(default, skip_serializing_if = "Option::is_none")]
1135    pub inputs: Option<Url>,
1136
1137    /// The root cloud storage URL for storing outputs.
1138    #[serde(default, skip_serializing_if = "Option::is_none")]
1139    pub outputs: Option<Url>,
1140
1141    /// The polling interval, in seconds, for checking task status.
1142    ///
1143    /// Defaults to 1 second.
1144    #[serde(default, skip_serializing_if = "Option::is_none")]
1145    pub interval: Option<u64>,
1146
1147    /// The number of retries after encountering an error communicating with the
1148    /// TES server.
1149    ///
1150    /// Defaults to no retries.
1151    pub retries: Option<u32>,
1152
1153    /// The maximum number of concurrent requests the backend will send to the
1154    /// TES server.
1155    ///
1156    /// Defaults to 10 concurrent requests.
1157    #[serde(default, skip_serializing_if = "Option::is_none")]
1158    pub max_concurrency: Option<u32>,
1159
1160    /// Whether or not the TES server URL may use an insecure protocol like
1161    /// HTTP.
1162    #[serde(default)]
1163    pub insecure: bool,
1164}
1165
1166impl TesBackendConfig {
1167    /// Validates the TES backend configuration.
1168    pub fn validate(&self) -> Result<()> {
1169        match &self.url {
1170            Some(url) => {
1171                if !self.insecure && url.scheme() != "https" {
1172                    bail!(
1173                        "TES backend configuration value `url` has invalid value `{url}`: URL \
1174                         must use a HTTPS scheme"
1175                    );
1176                }
1177            }
1178            None => bail!("TES backend configuration value `url` is required"),
1179        }
1180
1181        if let Some(auth) = &self.auth {
1182            auth.validate()?;
1183        }
1184
1185        if let Some(max_concurrency) = self.max_concurrency
1186            && max_concurrency == 0
1187        {
1188            bail!("TES backend configuration value `max_concurrency` cannot be zero");
1189        }
1190
1191        match &self.inputs {
1192            Some(url) => {
1193                if !is_supported_url(url.as_str()) {
1194                    bail!(
1195                        "TES backend storage configuration value `inputs` has invalid value \
1196                         `{url}`: URL scheme is not supported"
1197                    );
1198                }
1199
1200                if !url.path().ends_with('/') {
1201                    bail!(
1202                        "TES backend storage configuration value `inputs` has invalid value \
1203                         `{url}`: URL path must end with a slash"
1204                    );
1205                }
1206            }
1207            None => bail!("TES backend configuration value `inputs` is required"),
1208        }
1209
1210        match &self.outputs {
1211            Some(url) => {
1212                if !is_supported_url(url.as_str()) {
1213                    bail!(
1214                        "TES backend storage configuration value `outputs` has invalid value \
1215                         `{url}`: URL scheme is not supported"
1216                    );
1217                }
1218
1219                if !url.path().ends_with('/') {
1220                    bail!(
1221                        "TES backend storage configuration value `outputs` has invalid value \
1222                         `{url}`: URL path must end with a slash"
1223                    );
1224                }
1225            }
1226            None => bail!("TES backend storage configuration value `outputs` is required"),
1227        }
1228
1229        Ok(())
1230    }
1231
1232    /// Redacts the secrets contained in the TES backend configuration.
1233    pub fn redact(&mut self) {
1234        if let Some(auth) = &mut self.auth {
1235            auth.redact();
1236        }
1237    }
1238
1239    /// Unredacts the secrets contained in the TES backend configuration.
1240    pub fn unredact(&mut self) {
1241        if let Some(auth) = &mut self.auth {
1242            auth.unredact();
1243        }
1244    }
1245}
1246
1247/// Configuration for the Apptainer container runtime.
1248#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)]
1249pub struct ApptainerConfig {
1250    /// Additional command-line arguments to pass to `apptainer exec` when
1251    /// executing tasks.
1252    pub extra_apptainer_exec_args: Option<Vec<String>>,
1253}
1254
1255impl ApptainerConfig {
1256    /// Validate that Apptainer is appropriately configured.
1257    pub async fn validate(&self) -> Result<(), anyhow::Error> {
1258        Ok(())
1259    }
1260}
1261
1262/// Configuration for an LSF queue.
1263///
1264/// Each queue can optionally have per-task CPU and memory limits set so that
1265/// tasks which are too large to be scheduled on that queue will fail
1266/// immediately instead of pending indefinitely. In the future, these limits may
1267/// be populated or validated by live information from the cluster, but
1268/// for now they must be manually based on the user's understanding of the
1269/// cluster configuration.
1270#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1271pub struct LsfQueueConfig {
1272    /// The name of the queue; this is the string passed to `bsub -q
1273    /// <queue_name>`.
1274    name: String,
1275    /// The maximum number of CPUs this queue can provision for a single task.
1276    max_cpu_per_task: Option<u64>,
1277    /// The maximum memory this queue can provision for a single task.
1278    max_memory_per_task: Option<ByteSize>,
1279}
1280
1281impl LsfQueueConfig {
1282    /// Create an [`LsfQueueConfig`].
1283    pub fn new(
1284        name: String,
1285        max_cpu_per_task: Option<u64>,
1286        max_memory_per_task: Option<ByteSize>,
1287    ) -> Self {
1288        Self {
1289            name,
1290            max_cpu_per_task,
1291            max_memory_per_task,
1292        }
1293    }
1294
1295    /// The name of the queue; this is the string passed to `bsub -q
1296    /// <queue_name>`.
1297    pub fn name(&self) -> &str {
1298        &self.name
1299    }
1300
1301    /// The maximum number of CPUs this queue can provision for a single task.
1302    pub fn max_cpu_per_task(&self) -> Option<u64> {
1303        self.max_cpu_per_task
1304    }
1305
1306    /// The maximum memory this queue can provision for a single task.
1307    pub fn max_memory_per_task(&self) -> Option<ByteSize> {
1308        self.max_memory_per_task
1309    }
1310
1311    /// Validate that this LSF queue exists according to the local `bqueues`.
1312    async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1313        let queue = self.name();
1314        ensure!(!queue.is_empty(), "{name}_lsf_queue name cannot be empty");
1315        if let Some(max_cpu_per_task) = self.max_cpu_per_task() {
1316            ensure!(
1317                max_cpu_per_task > 0,
1318                "{name}_lsf_queue `{queue}` must allow at least 1 CPU to be provisioned"
1319            );
1320        }
1321        if let Some(max_memory_per_task) = self.max_memory_per_task() {
1322            ensure!(
1323                max_memory_per_task.as_u64() > 0,
1324                "{name}_lsf_queue `{queue}` must allow at least some memory to be provisioned"
1325            );
1326        }
1327        match tokio::time::timeout(
1328            // 10 seconds is rather arbitrary; `bqueues` ordinarily returns extremely quickly, but
1329            // we don't want things to run away on a misconfigured system
1330            std::time::Duration::from_secs(10),
1331            Command::new("bqueues").arg(queue).output(),
1332        )
1333        .await
1334        {
1335            Ok(output) => {
1336                let output = output.context("validating LSF queue")?;
1337                if !output.status.success() {
1338                    let stdout = String::from_utf8_lossy(&output.stdout);
1339                    let stderr = String::from_utf8_lossy(&output.stderr);
1340                    error!(%stdout, %stderr, %queue, "failed to validate {name}_lsf_queue");
1341                    Err(anyhow!("failed to validate {name}_lsf_queue `{queue}`"))
1342                } else {
1343                    Ok(())
1344                }
1345            }
1346            Err(_) => Err(anyhow!(
1347                "timed out trying to validate {name}_lsf_queue `{queue}`"
1348            )),
1349        }
1350    }
1351}
1352
1353/// Configuration for the LSF + Apptainer backend.
1354// TODO ACF 2025-09-23: add a Apptainer/Singularity mode config that switches around executable
1355// name, env var names, etc.
1356#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1357pub struct LsfApptainerBackendConfig {
1358    /// Which queue, if any, to specify when submitting normal jobs to LSF.
1359    ///
1360    /// This may be superseded by
1361    /// [`short_task_lsf_queue`][Self::short_task_lsf_queue],
1362    /// [`gpu_lsf_queue`][Self::gpu_lsf_queue], or
1363    /// [`fpga_lsf_queue`][Self::fpga_lsf_queue] for corresponding tasks.
1364    pub default_lsf_queue: Option<LsfQueueConfig>,
1365    /// Which queue, if any, to specify when submitting [short
1366    /// tasks](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#short_task) to LSF.
1367    ///
1368    /// This may be superseded by [`gpu_lsf_queue`][Self::gpu_lsf_queue] or
1369    /// [`fpga_lsf_queue`][Self::fpga_lsf_queue] for tasks which require
1370    /// specialized hardware.
1371    pub short_task_lsf_queue: Option<LsfQueueConfig>,
1372    /// Which queue, if any, to specify when submitting [tasks which require a
1373    /// GPU](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
1374    /// to LSF.
1375    pub gpu_lsf_queue: Option<LsfQueueConfig>,
1376    /// Which queue, if any, to specify when submitting [tasks which require an
1377    /// FPGA](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
1378    /// to LSF.
1379    pub fpga_lsf_queue: Option<LsfQueueConfig>,
1380    /// Additional command-line arguments to pass to `bsub` when submitting jobs
1381    /// to LSF.
1382    pub extra_bsub_args: Option<Vec<String>>,
1383    /// The configuration of Apptainer, which is used as the container runtime
1384    /// on the compute nodes where LSF dispatches tasks.
1385    ///
1386    /// Note that this will likely be replaced by an abstraction over multiple
1387    /// container execution runtimes in the future, rather than being
1388    /// hardcoded to Apptainer.
1389    #[serde(default)]
1390    // TODO ACF 2025-10-16: temporarily flatten this into the overall config so that it doesn't
1391    // break existing serialized configs. We'll save breaking the config file format for when we
1392    // actually have meaningful composition of in-place runtimes.
1393    #[serde(flatten)]
1394    pub apptainer_config: ApptainerConfig,
1395}
1396
1397impl LsfApptainerBackendConfig {
1398    /// Validate that the backend is appropriately configured.
1399    pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1400        if cfg!(not(unix)) {
1401            bail!("LSF + Apptainer backend is not supported on non-unix platforms");
1402        }
1403        if !engine_config.experimental_features_enabled {
1404            bail!("LSF + Apptainer backend requires enabling experimental features");
1405        }
1406
1407        // Do what we can to validate options that are dependent on the dynamic
1408        // environment. These are a bit fraught, particularly if the behavior of
1409        // the external tools changes based on where a job gets dispatched, but
1410        // querying from the perspective of the current node allows
1411        // us to get better error messages in circumstances typical to a cluster.
1412        if let Some(queue) = self.default_lsf_queue.as_ref() {
1413            queue.validate("default").await?;
1414        }
1415        if let Some(queue) = self.short_task_lsf_queue.as_ref() {
1416            queue.validate("short_task").await?;
1417        }
1418        if let Some(queue) = self.gpu_lsf_queue.as_ref() {
1419            queue.validate("gpu").await?;
1420        }
1421        if let Some(queue) = self.fpga_lsf_queue.as_ref() {
1422            queue.validate("fpga").await?;
1423        }
1424
1425        self.apptainer_config.validate().await?;
1426
1427        Ok(())
1428    }
1429
1430    /// Get the appropriate LSF queue for a task under this configuration.
1431    ///
1432    /// Specialized hardware requirements are prioritized over other
1433    /// characteristics, with FPGA taking precedence over GPU.
1434    pub(crate) fn lsf_queue_for_task(
1435        &self,
1436        requirements: &HashMap<String, Value>,
1437        hints: &HashMap<String, Value>,
1438    ) -> Option<&LsfQueueConfig> {
1439        // Specialized hardware gets priority.
1440        if let Some(queue) = self.fpga_lsf_queue.as_ref()
1441            && let Some(true) = requirements
1442                .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1443                .and_then(Value::as_boolean)
1444        {
1445            return Some(queue);
1446        }
1447
1448        if let Some(queue) = self.gpu_lsf_queue.as_ref()
1449            && let Some(true) = requirements
1450                .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1451                .and_then(Value::as_boolean)
1452        {
1453            return Some(queue);
1454        }
1455
1456        // Then short tasks.
1457        if let Some(queue) = self.short_task_lsf_queue.as_ref()
1458            && let Some(true) = hints
1459                .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1460                .and_then(Value::as_boolean)
1461        {
1462            return Some(queue);
1463        }
1464
1465        // Finally the default queue. If this is `None`, `bsub` gets run without a queue
1466        // argument and the cluster's default is used.
1467        self.default_lsf_queue.as_ref()
1468    }
1469}
1470
1471/// Configuration for a Slurm partition.
1472///
1473/// Each partition can optionally have per-task CPU and memory limits set so
1474/// that tasks which are too large to be scheduled on that partition will fail
1475/// immediately instead of pending indefinitely. In the future, these limits may
1476/// be populated or validated by live information from the cluster, but
1477/// for now they must be manually based on the user's understanding of the
1478/// cluster configuration.
1479#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
1480pub struct SlurmPartitionConfig {
1481    /// The name of the partition; this is the string passed to `sbatch
1482    /// --partition=<partition_name>`.
1483    name: String,
1484    /// The maximum number of CPUs this partition can provision for a single
1485    /// task.
1486    max_cpu_per_task: Option<u64>,
1487    /// The maximum memory this partition can provision for a single task.
1488    max_memory_per_task: Option<ByteSize>,
1489}
1490
1491impl SlurmPartitionConfig {
1492    /// Create a [`SlurmPartitionConfig`].
1493    pub fn new(
1494        name: String,
1495        max_cpu_per_task: Option<u64>,
1496        max_memory_per_task: Option<ByteSize>,
1497    ) -> Self {
1498        Self {
1499            name,
1500            max_cpu_per_task,
1501            max_memory_per_task,
1502        }
1503    }
1504
1505    /// The name of the partition; this is the string passed to `sbatch
1506    /// --partition=<partition_name>`.
1507    pub fn name(&self) -> &str {
1508        &self.name
1509    }
1510
1511    /// The maximum number of CPUs this partition can provision for a single
1512    /// task.
1513    pub fn max_cpu_per_task(&self) -> Option<u64> {
1514        self.max_cpu_per_task
1515    }
1516
1517    /// The maximum memory this partition can provision for a single task.
1518    pub fn max_memory_per_task(&self) -> Option<ByteSize> {
1519        self.max_memory_per_task
1520    }
1521
1522    /// Validate that this Slurm partition exists according to the local
1523    /// `sinfo`.
1524    async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
1525        let partition = self.name();
1526        ensure!(
1527            !partition.is_empty(),
1528            "{name}_slurm_partition name cannot be empty"
1529        );
1530        if let Some(max_cpu_per_task) = self.max_cpu_per_task() {
1531            ensure!(
1532                max_cpu_per_task > 0,
1533                "{name}_slurm_partition `{partition}` must allow at least 1 CPU to be provisioned"
1534            );
1535        }
1536        if let Some(max_memory_per_task) = self.max_memory_per_task() {
1537            ensure!(
1538                max_memory_per_task.as_u64() > 0,
1539                "{name}_slurm_partition `{partition}` must allow at least some memory to be \
1540                 provisioned"
1541            );
1542        }
1543        match tokio::time::timeout(
1544            // 10 seconds is rather arbitrary; `scontrol` ordinarily returns extremely quickly, but
1545            // we don't want things to run away on a misconfigured system
1546            std::time::Duration::from_secs(10),
1547            Command::new("scontrol")
1548                .arg("show")
1549                .arg("partition")
1550                .arg(partition)
1551                .output(),
1552        )
1553        .await
1554        {
1555            Ok(output) => {
1556                let output = output.context("validating Slurm partition")?;
1557                if !output.status.success() {
1558                    let stdout = String::from_utf8_lossy(&output.stdout);
1559                    let stderr = String::from_utf8_lossy(&output.stderr);
1560                    error!(%stdout, %stderr, %partition, "failed to validate {name}_slurm_partition");
1561                    Err(anyhow!(
1562                        "failed to validate {name}_slurm_partition `{partition}`"
1563                    ))
1564                } else {
1565                    Ok(())
1566                }
1567            }
1568            Err(_) => Err(anyhow!(
1569                "timed out trying to validate {name}_slurm_partition `{partition}`"
1570            )),
1571        }
1572    }
1573}
1574
1575/// Configuration for the Slurm + Apptainer backend.
1576// TODO ACF 2025-09-23: add a Apptainer/Singularity mode config that switches around executable
1577// name, env var names, etc.
1578#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
1579pub struct SlurmApptainerBackendConfig {
1580    /// Which partition, if any, to specify when submitting normal jobs to
1581    /// Slurm.
1582    ///
1583    /// This may be superseded by
1584    /// [`short_task_slurm_partition`][Self::short_task_slurm_partition],
1585    /// [`gpu_slurm_partition`][Self::gpu_slurm_partition], or
1586    /// [`fpga_slurm_partition`][Self::fpga_slurm_partition] for corresponding
1587    /// tasks.
1588    pub default_slurm_partition: Option<SlurmPartitionConfig>,
1589    /// Which partition, if any, to specify when submitting [short
1590    /// tasks](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#short_task) to Slurm.
1591    ///
1592    /// This may be superseded by
1593    /// [`gpu_slurm_partition`][Self::gpu_slurm_partition] or
1594    /// [`fpga_slurm_partition`][Self::fpga_slurm_partition] for tasks which
1595    /// require specialized hardware.
1596    pub short_task_slurm_partition: Option<SlurmPartitionConfig>,
1597    /// Which partition, if any, to specify when submitting [tasks which require
1598    /// a GPU](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
1599    /// to Slurm.
1600    pub gpu_slurm_partition: Option<SlurmPartitionConfig>,
1601    /// Which partition, if any, to specify when submitting [tasks which require
1602    /// an FPGA](https://github.com/openwdl/wdl/blob/wdl-1.2/SPEC.md#hardware-accelerators-gpu-and--fpga)
1603    /// to Slurm.
1604    pub fpga_slurm_partition: Option<SlurmPartitionConfig>,
1605    /// Additional command-line arguments to pass to `sbatch` when submitting
1606    /// jobs to Slurm.
1607    pub extra_sbatch_args: Option<Vec<String>>,
1608    /// The configuration of Apptainer, which is used as the container runtime
1609    /// on the compute nodes where Slurm dispatches tasks.
1610    ///
1611    /// Note that this will likely be replaced by an abstraction over multiple
1612    /// container execution runtimes in the future, rather than being
1613    /// hardcoded to Apptainer.
1614    #[serde(default)]
1615    // TODO ACF 2025-10-16: temporarily flatten this into the overall config so that it doesn't
1616    // break existing serialized configs. We'll save breaking the config file format for when we
1617    // actually have meaningful composition of in-place runtimes.
1618    #[serde(flatten)]
1619    pub apptainer_config: ApptainerConfig,
1620}
1621
1622impl SlurmApptainerBackendConfig {
1623    /// Validate that the backend is appropriately configured.
1624    pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
1625        if cfg!(not(unix)) {
1626            bail!("Slurm + Apptainer backend is not supported on non-unix platforms");
1627        }
1628        if !engine_config.experimental_features_enabled {
1629            bail!("Slurm + Apptainer backend requires enabling experimental features");
1630        }
1631
1632        // Do what we can to validate options that are dependent on the dynamic
1633        // environment. These are a bit fraught, particularly if the behavior of
1634        // the external tools changes based on where a job gets dispatched, but
1635        // querying from the perspective of the current node allows
1636        // us to get better error messages in circumstances typical to a cluster.
1637        if let Some(partition) = &self.default_slurm_partition {
1638            partition.validate("default").await?;
1639        }
1640        if let Some(partition) = &self.short_task_slurm_partition {
1641            partition.validate("short_task").await?;
1642        }
1643        if let Some(partition) = &self.gpu_slurm_partition {
1644            partition.validate("gpu").await?;
1645        }
1646        if let Some(partition) = &self.fpga_slurm_partition {
1647            partition.validate("fpga").await?;
1648        }
1649
1650        self.apptainer_config.validate().await?;
1651
1652        Ok(())
1653    }
1654
1655    /// Get the appropriate Slurm partition for a task under this configuration.
1656    ///
1657    /// Specialized hardware requirements are prioritized over other
1658    /// characteristics, with FPGA taking precedence over GPU.
1659    pub(crate) fn slurm_partition_for_task(
1660        &self,
1661        requirements: &HashMap<String, Value>,
1662        hints: &HashMap<String, Value>,
1663    ) -> Option<&SlurmPartitionConfig> {
1664        // TODO ACF 2025-09-26: what's the relationship between this code and
1665        // `TaskExecutionConstraints`? Should this be there instead, or be pulling
1666        // values from that instead of directly from `requirements` and `hints`?
1667
1668        // Specialized hardware gets priority.
1669        if let Some(partition) = self.fpga_slurm_partition.as_ref()
1670            && let Some(true) = requirements
1671                .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
1672                .and_then(Value::as_boolean)
1673        {
1674            return Some(partition);
1675        }
1676
1677        if let Some(partition) = self.gpu_slurm_partition.as_ref()
1678            && let Some(true) = requirements
1679                .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
1680                .and_then(Value::as_boolean)
1681        {
1682            return Some(partition);
1683        }
1684
1685        // Then short tasks.
1686        if let Some(partition) = self.short_task_slurm_partition.as_ref()
1687            && let Some(true) = hints
1688                .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
1689                .and_then(Value::as_boolean)
1690        {
1691            return Some(partition);
1692        }
1693
1694        // Finally the default partition. If this is `None`, `sbatch` gets run without a
1695        // partition argument and the cluster's default is used.
1696        self.default_slurm_partition.as_ref()
1697    }
1698}
1699
1700#[cfg(test)]
1701mod test {
1702    use pretty_assertions::assert_eq;
1703
1704    use super::*;
1705
1706    #[test]
1707    fn redacted_secret() {
1708        let mut secret: SecretString = "secret".into();
1709
1710        assert_eq!(
1711            serde_json::to_string(&secret).unwrap(),
1712            format!(r#""{REDACTED}""#)
1713        );
1714
1715        secret.unredact();
1716        assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1717
1718        secret.redact();
1719        assert_eq!(
1720            serde_json::to_string(&secret).unwrap(),
1721            format!(r#""{REDACTED}""#)
1722        );
1723    }
1724
1725    #[test]
1726    fn redacted_config() {
1727        let config = Config {
1728            backends: [
1729                (
1730                    "first".to_string(),
1731                    BackendConfig::Tes(TesBackendConfig {
1732                        auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1733                            username: "foo".into(),
1734                            password: "secret".into(),
1735                        })),
1736                        ..Default::default()
1737                    }),
1738                ),
1739                (
1740                    "second".to_string(),
1741                    BackendConfig::Tes(TesBackendConfig {
1742                        auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1743                            token: "secret".into(),
1744                        })),
1745                        ..Default::default()
1746                    }),
1747                ),
1748            ]
1749            .into(),
1750            storage: StorageConfig {
1751                azure: AzureStorageConfig {
1752                    auth: Some(AzureStorageAuthConfig {
1753                        account_name: "foo".into(),
1754                        access_key: "secret".into(),
1755                    }),
1756                },
1757                s3: S3StorageConfig {
1758                    auth: Some(S3StorageAuthConfig {
1759                        access_key_id: "foo".into(),
1760                        secret_access_key: "secret".into(),
1761                    }),
1762                    ..Default::default()
1763                },
1764                google: GoogleStorageConfig {
1765                    auth: Some(GoogleStorageAuthConfig {
1766                        access_key: "foo".into(),
1767                        secret: "secret".into(),
1768                    }),
1769                },
1770            },
1771            ..Default::default()
1772        };
1773
1774        let json = serde_json::to_string_pretty(&config).unwrap();
1775        assert!(json.contains("secret"), "`{json}` contains a secret");
1776    }
1777
1778    #[tokio::test]
1779    async fn test_config_validate() {
1780        // Test invalid task config
1781        let mut config = Config::default();
1782        config.task.retries = Some(1000000);
1783        assert_eq!(
1784            config.validate().await.unwrap_err().to_string(),
1785            "configuration value `task.retries` cannot exceed 100"
1786        );
1787
1788        // Test invalid scatter concurrency config
1789        let mut config = Config::default();
1790        config.workflow.scatter.concurrency = Some(0);
1791        assert_eq!(
1792            config.validate().await.unwrap_err().to_string(),
1793            "configuration value `workflow.scatter.concurrency` cannot be zero"
1794        );
1795
1796        // Test invalid backend name
1797        let config = Config {
1798            backend: Some("foo".into()),
1799            ..Default::default()
1800        };
1801        assert_eq!(
1802            config.validate().await.unwrap_err().to_string(),
1803            "a backend named `foo` is not present in the configuration"
1804        );
1805        let config = Config {
1806            backend: Some("bar".into()),
1807            backends: [("foo".to_string(), BackendConfig::default())].into(),
1808            ..Default::default()
1809        };
1810        assert_eq!(
1811            config.validate().await.unwrap_err().to_string(),
1812            "a backend named `bar` is not present in the configuration"
1813        );
1814
1815        // Test a singular backend
1816        let config = Config {
1817            backends: [("foo".to_string(), BackendConfig::default())].into(),
1818            ..Default::default()
1819        };
1820        config.validate().await.expect("config should validate");
1821
1822        // Test invalid local backend cpu config
1823        let config = Config {
1824            backends: [(
1825                "default".to_string(),
1826                BackendConfig::Local(LocalBackendConfig {
1827                    cpu: Some(0),
1828                    ..Default::default()
1829                }),
1830            )]
1831            .into(),
1832            ..Default::default()
1833        };
1834        assert_eq!(
1835            config.validate().await.unwrap_err().to_string(),
1836            "local backend configuration value `cpu` cannot be zero"
1837        );
1838        let config = Config {
1839            backends: [(
1840                "default".to_string(),
1841                BackendConfig::Local(LocalBackendConfig {
1842                    cpu: Some(10000000),
1843                    ..Default::default()
1844                }),
1845            )]
1846            .into(),
1847            ..Default::default()
1848        };
1849        assert!(
1850            config
1851                .validate()
1852                .await
1853                .unwrap_err()
1854                .to_string()
1855                .starts_with(
1856                    "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1857                     available to the host"
1858                )
1859        );
1860
1861        // Test invalid local backend memory config
1862        let config = Config {
1863            backends: [(
1864                "default".to_string(),
1865                BackendConfig::Local(LocalBackendConfig {
1866                    memory: Some("0 GiB".to_string()),
1867                    ..Default::default()
1868                }),
1869            )]
1870            .into(),
1871            ..Default::default()
1872        };
1873        assert_eq!(
1874            config.validate().await.unwrap_err().to_string(),
1875            "local backend configuration value `memory` cannot be zero"
1876        );
1877        let config = Config {
1878            backends: [(
1879                "default".to_string(),
1880                BackendConfig::Local(LocalBackendConfig {
1881                    memory: Some("100 meows".to_string()),
1882                    ..Default::default()
1883                }),
1884            )]
1885            .into(),
1886            ..Default::default()
1887        };
1888        assert_eq!(
1889            config.validate().await.unwrap_err().to_string(),
1890            "local backend configuration value `memory` has invalid value `100 meows`"
1891        );
1892
1893        let config = Config {
1894            backends: [(
1895                "default".to_string(),
1896                BackendConfig::Local(LocalBackendConfig {
1897                    memory: Some("1000 TiB".to_string()),
1898                    ..Default::default()
1899                }),
1900            )]
1901            .into(),
1902            ..Default::default()
1903        };
1904        assert!(
1905            config
1906                .validate()
1907                .await
1908                .unwrap_err()
1909                .to_string()
1910                .starts_with(
1911                    "local backend configuration value `memory` cannot exceed the total memory of \
1912                     the host"
1913                )
1914        );
1915
1916        // Test missing TES URL
1917        let config = Config {
1918            backends: [(
1919                "default".to_string(),
1920                BackendConfig::Tes(Default::default()),
1921            )]
1922            .into(),
1923            ..Default::default()
1924        };
1925        assert_eq!(
1926            config.validate().await.unwrap_err().to_string(),
1927            "TES backend configuration value `url` is required"
1928        );
1929
1930        // Test TES invalid max concurrency
1931        let config = Config {
1932            backends: [(
1933                "default".to_string(),
1934                BackendConfig::Tes(TesBackendConfig {
1935                    url: Some("https://example.com".parse().unwrap()),
1936                    max_concurrency: Some(0),
1937                    ..Default::default()
1938                }),
1939            )]
1940            .into(),
1941            ..Default::default()
1942        };
1943        assert_eq!(
1944            config.validate().await.unwrap_err().to_string(),
1945            "TES backend configuration value `max_concurrency` cannot be zero"
1946        );
1947
1948        // Insecure TES URL
1949        let config = Config {
1950            backends: [(
1951                "default".to_string(),
1952                BackendConfig::Tes(TesBackendConfig {
1953                    url: Some("http://example.com".parse().unwrap()),
1954                    inputs: Some("http://example.com".parse().unwrap()),
1955                    outputs: Some("http://example.com".parse().unwrap()),
1956                    ..Default::default()
1957                }),
1958            )]
1959            .into(),
1960            ..Default::default()
1961        };
1962        assert_eq!(
1963            config.validate().await.unwrap_err().to_string(),
1964            "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
1965             must use a HTTPS scheme"
1966        );
1967
1968        // Allow insecure URL
1969        let config = Config {
1970            backends: [(
1971                "default".to_string(),
1972                BackendConfig::Tes(TesBackendConfig {
1973                    url: Some("http://example.com".parse().unwrap()),
1974                    inputs: Some("http://example.com".parse().unwrap()),
1975                    outputs: Some("http://example.com".parse().unwrap()),
1976                    insecure: true,
1977                    ..Default::default()
1978                }),
1979            )]
1980            .into(),
1981            ..Default::default()
1982        };
1983        config
1984            .validate()
1985            .await
1986            .expect("configuration should validate");
1987
1988        let mut config = Config::default();
1989        config.http.parallelism = Some(0);
1990        assert_eq!(
1991            config.validate().await.unwrap_err().to_string(),
1992            "configuration value `http.parallelism` cannot be zero"
1993        );
1994
1995        let mut config = Config::default();
1996        config.http.parallelism = Some(5);
1997        assert!(
1998            config.validate().await.is_ok(),
1999            "should pass for valid configuration"
2000        );
2001
2002        let mut config = Config::default();
2003        config.http.parallelism = None;
2004        assert!(
2005            config.validate().await.is_ok(),
2006            "should pass for default (None)"
2007        );
2008    }
2009}