wdl_engine/
config.rs

1//! Implementation of engine configuration.
2
3use std::borrow::Cow;
4use std::path::Path;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use anyhow::Context;
9use anyhow::Result;
10use anyhow::anyhow;
11use anyhow::bail;
12use crankshaft::events::Event;
13use indexmap::IndexMap;
14use secrecy::ExposeSecret;
15use serde::Deserialize;
16use serde::Serialize;
17use tokio::sync::broadcast;
18use tracing::warn;
19use url::Url;
20
21use crate::DockerBackend;
22use crate::LocalBackend;
23use crate::LsfApptainerBackend;
24use crate::LsfApptainerBackendConfig;
25use crate::SYSTEM;
26use crate::SlurmApptainerBackend;
27use crate::SlurmApptainerBackendConfig;
28use crate::TaskExecutionBackend;
29use crate::TesBackend;
30use crate::convert_unit_string;
31use crate::path::is_supported_url;
32
33/// The inclusive maximum number of task retries the engine supports.
34pub const MAX_RETRIES: u64 = 100;
35
36/// The default task shell.
37pub const DEFAULT_TASK_SHELL: &str = "bash";
38
39/// The default backend name.
40pub const DEFAULT_BACKEND_NAME: &str = "default";
41
42/// The string that replaces redacted serialization fields.
43const REDACTED: &str = "<REDACTED>";
44
45/// Gets tne default root cache directory for the user.
46pub fn cache_dir() -> Result<PathBuf> {
47    /// The subdirectory within the user's cache directory for all caches
48    const CACHE_DIR_ROOT: &str = "sprocket";
49
50    Ok(dirs::cache_dir()
51        .context("failed to determine user cache directory")?
52        .join(CACHE_DIR_ROOT))
53}
54
55/// Represents a secret string that is, by default, redacted for serialization.
56///
57/// This type is a wrapper around [`secrecy::SecretString`].
58#[derive(Debug, Clone)]
59pub struct SecretString {
60    /// The inner secret string.
61    ///
62    /// This type is not serializable.
63    inner: secrecy::SecretString,
64    /// Whether or not the secret string is redacted for serialization.
65    ///
66    /// If `true` (the default), `<REDACTED>` is serialized for the string's
67    /// value.
68    ///
69    /// If `false`, the inner secret string is exposed for serialization.
70    redacted: bool,
71}
72
73impl SecretString {
74    /// Redacts the secret for serialization.
75    ///
76    /// By default, a [`SecretString`] is redacted; when redacted, the string is
77    /// replaced with `<REDACTED>` when serialized.
78    pub fn redact(&mut self) {
79        self.redacted = true;
80    }
81
82    /// Unredacts the secret for serialization.
83    pub fn unredact(&mut self) {
84        self.redacted = false;
85    }
86
87    /// Gets the inner [`secrecy::SecretString`].
88    pub fn inner(&self) -> &secrecy::SecretString {
89        &self.inner
90    }
91}
92
93impl From<String> for SecretString {
94    fn from(s: String) -> Self {
95        Self {
96            inner: s.into(),
97            redacted: true,
98        }
99    }
100}
101
102impl From<&str> for SecretString {
103    fn from(s: &str) -> Self {
104        Self {
105            inner: s.into(),
106            redacted: true,
107        }
108    }
109}
110
111impl Default for SecretString {
112    fn default() -> Self {
113        Self {
114            inner: Default::default(),
115            redacted: true,
116        }
117    }
118}
119
120impl serde::Serialize for SecretString {
121    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
122    where
123        S: serde::Serializer,
124    {
125        use secrecy::ExposeSecret;
126
127        if self.redacted {
128            serializer.serialize_str(REDACTED)
129        } else {
130            serializer.serialize_str(self.inner.expose_secret())
131        }
132    }
133}
134
135impl<'de> serde::Deserialize<'de> for SecretString {
136    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
137    where
138        D: serde::Deserializer<'de>,
139    {
140        let inner = secrecy::SecretString::deserialize(deserializer)?;
141        Ok(Self {
142            inner,
143            redacted: true,
144        })
145    }
146}
147
148/// Represents how an evaluation error or cancellation should be handled by the
149/// engine.
150#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
151#[serde(rename_all = "snake_case")]
152pub enum FailureMode {
153    /// When an error is encountered or evaluation is canceled, evaluation waits
154    /// for any outstanding tasks to complete.
155    #[default]
156    Slow,
157    /// When an error is encountered or evaluation is canceled, any outstanding
158    /// tasks that are executing are immediately canceled and evaluation waits
159    /// for cancellation to complete.
160    Fast,
161}
162
163/// Represents WDL evaluation configuration.
164///
165/// <div class="warning">
166///
167/// By default, serialization of [`Config`] will redact the values of secrets.
168///
169/// Use the [`Config::unredact`] method before serialization to prevent the
170/// secrets from being redacted.
171///
172/// </div>
173#[derive(Debug, Default, Clone, Serialize, Deserialize)]
174#[serde(rename_all = "snake_case", deny_unknown_fields)]
175pub struct Config {
176    /// HTTP configuration.
177    #[serde(default)]
178    pub http: HttpConfig,
179    /// Workflow evaluation configuration.
180    #[serde(default)]
181    pub workflow: WorkflowConfig,
182    /// Task evaluation configuration.
183    #[serde(default)]
184    pub task: TaskConfig,
185    /// The name of the backend to use.
186    ///
187    /// If not specified and `backends` has multiple entries, it will use a name
188    /// of `default`.
189    #[serde(skip_serializing_if = "Option::is_none")]
190    pub backend: Option<String>,
191    /// Task execution backends configuration.
192    ///
193    /// If the collection is empty and `backend` is not specified, the engine
194    /// default backend is used.
195    ///
196    /// If the collection has exactly one entry and `backend` is not specified,
197    /// the singular entry will be used.
198    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
199    pub backends: IndexMap<String, BackendConfig>,
200    /// Storage configuration.
201    #[serde(default)]
202    pub storage: StorageConfig,
203    /// (Experimental) Avoid environment-specific output; default is `false`.
204    ///
205    /// If this option is `true`, selected error messages and log output will
206    /// avoid emitting environment-specific output such as absolute paths
207    /// and system resource counts.
208    ///
209    /// This is largely meant to support "golden testing" where a test's success
210    /// depends on matching an expected set of outputs exactly. Cues that
211    /// help users overcome errors, such as the path to a temporary
212    /// directory or the number of CPUs available to the system, confound this
213    /// style of testing. This flag is a best-effort experimental attempt to
214    /// reduce the impact of these differences in order to allow a wider
215    /// range of golden tests to be written.
216    #[serde(default)]
217    pub suppress_env_specific_output: bool,
218    /// (Experimental) Whether experimental features are enabled; default is
219    /// `false`.
220    ///
221    /// Experimental features are provided to users with heavy caveats about
222    /// their stability and rough edges. Use at your own risk, but feedback
223    /// is quite welcome.
224    #[serde(default)]
225    pub experimental_features_enabled: bool,
226    /// The failure mode for workflow or task evaluation.
227    ///
228    /// A value of [`FailureMode::Slow`] will result in evaluation waiting for
229    /// executing tasks to complete upon error or interruption.
230    ///
231    /// A value of [`FailureMode::Fast`] will immediately attempt to cancel
232    /// executing tasks upon error or interruption.
233    #[serde(default, rename = "fail")]
234    pub failure_mode: FailureMode,
235}
236
237impl Config {
238    /// Validates the evaluation configuration.
239    pub async fn validate(&self) -> Result<()> {
240        self.http.validate()?;
241        self.workflow.validate()?;
242        self.task.validate()?;
243
244        if self.backend.is_none() && self.backends.len() < 2 {
245            // This is OK, we'll use either the singular backends entry (1) or
246            // the default (0)
247        } else {
248            // Check the backends map for the backend name (or "default")
249            let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
250            if !self.backends.contains_key(backend) {
251                bail!("a backend named `{backend}` is not present in the configuration");
252            }
253        }
254
255        for backend in self.backends.values() {
256            backend.validate(self).await?;
257        }
258
259        self.storage.validate()?;
260
261        if self.suppress_env_specific_output && !self.experimental_features_enabled {
262            bail!("`suppress_env_specific_output` requires enabling experimental features");
263        }
264
265        Ok(())
266    }
267
268    /// Redacts the secrets contained in the configuration.
269    ///
270    /// By default, secrets are redacted for serialization.
271    pub fn redact(&mut self) {
272        for backend in self.backends.values_mut() {
273            backend.redact();
274        }
275
276        if let Some(auth) = &mut self.storage.azure.auth {
277            auth.redact();
278        }
279
280        if let Some(auth) = &mut self.storage.s3.auth {
281            auth.redact();
282        }
283
284        if let Some(auth) = &mut self.storage.google.auth {
285            auth.redact();
286        }
287    }
288
289    /// Unredacts the secrets contained in the configuration.
290    ///
291    /// Calling this method will expose secrets for serialization.
292    pub fn unredact(&mut self) {
293        for backend in self.backends.values_mut() {
294            backend.unredact();
295        }
296
297        if let Some(auth) = &mut self.storage.azure.auth {
298            auth.unredact();
299        }
300
301        if let Some(auth) = &mut self.storage.s3.auth {
302            auth.unredact();
303        }
304
305        if let Some(auth) = &mut self.storage.google.auth {
306            auth.unredact();
307        }
308    }
309
310    /// Creates a new task execution backend based on this configuration.
311    pub async fn create_backend(
312        self: &Arc<Self>,
313        run_root_dir: &Path,
314        events: Option<broadcast::Sender<Event>>,
315    ) -> Result<Arc<dyn TaskExecutionBackend>> {
316        let config = if self.backend.is_none() && self.backends.len() < 2 {
317            if self.backends.len() == 1 {
318                // Use the singular entry
319                Cow::Borrowed(self.backends.values().next().unwrap())
320            } else {
321                // Use the default
322                Cow::Owned(BackendConfig::default())
323            }
324        } else {
325            // Lookup the backend to use
326            let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
327            Cow::Borrowed(self.backends.get(backend).ok_or_else(|| {
328                anyhow!("a backend named `{backend}` is not present in the configuration")
329            })?)
330        };
331
332        match config.as_ref() {
333            BackendConfig::Local(config) => {
334                warn!(
335                    "the engine is configured to use the local backend: tasks will not be run \
336                     inside of a container"
337                );
338                Ok(Arc::new(LocalBackend::new(self.clone(), config, events)?))
339            }
340            BackendConfig::Docker(config) => Ok(Arc::new(
341                DockerBackend::new(self.clone(), config, events).await?,
342            )),
343            BackendConfig::Tes(config) => Ok(Arc::new(
344                TesBackend::new(self.clone(), config, events).await?,
345            )),
346            BackendConfig::LsfApptainer(config) => Ok(Arc::new(LsfApptainerBackend::new(
347                run_root_dir,
348                self.clone(),
349                config.clone(),
350                events,
351            ))),
352            BackendConfig::SlurmApptainer(config) => Ok(Arc::new(SlurmApptainerBackend::new(
353                run_root_dir,
354                self.clone(),
355                config.clone(),
356                events,
357            ))),
358        }
359    }
360}
361
362/// Represents HTTP configuration.
363#[derive(Debug, Default, Clone, Serialize, Deserialize)]
364#[serde(rename_all = "snake_case", deny_unknown_fields)]
365pub struct HttpConfig {
366    /// The HTTP download cache location.
367    ///
368    /// Defaults to an operating system specific cache directory for the user.
369    #[serde(default, skip_serializing_if = "Option::is_none")]
370    pub cache_dir: Option<PathBuf>,
371    /// The number of retries for transferring files.
372    ///
373    /// Defaults to `5`.
374    #[serde(default, skip_serializing_if = "Option::is_none")]
375    pub retries: Option<usize>,
376    /// The maximum parallelism for file transfers.
377    ///
378    /// Defaults to the host's available parallelism.
379    #[serde(default, skip_serializing_if = "Option::is_none")]
380    pub parallelism: Option<usize>,
381}
382
383impl HttpConfig {
384    /// Validates the HTTP configuration.
385    pub fn validate(&self) -> Result<()> {
386        if let Some(parallelism) = self.parallelism
387            && parallelism == 0
388        {
389            bail!("configuration value `http.parallelism` cannot be zero");
390        }
391        Ok(())
392    }
393}
394
395/// Represents storage configuration.
396#[derive(Debug, Default, Clone, Serialize, Deserialize)]
397#[serde(rename_all = "snake_case", deny_unknown_fields)]
398pub struct StorageConfig {
399    /// Azure Blob Storage configuration.
400    #[serde(default)]
401    pub azure: AzureStorageConfig,
402    /// AWS S3 configuration.
403    #[serde(default)]
404    pub s3: S3StorageConfig,
405    /// Google Cloud Storage configuration.
406    #[serde(default)]
407    pub google: GoogleStorageConfig,
408}
409
410impl StorageConfig {
411    /// Validates the HTTP configuration.
412    pub fn validate(&self) -> Result<()> {
413        self.azure.validate()?;
414        self.s3.validate()?;
415        self.google.validate()?;
416        Ok(())
417    }
418}
419
420/// Represents authentication information for Azure Blob Storage.
421#[derive(Debug, Default, Clone, Serialize, Deserialize)]
422#[serde(rename_all = "snake_case", deny_unknown_fields)]
423pub struct AzureStorageAuthConfig {
424    /// The Azure Storage account name to use.
425    pub account_name: String,
426    /// The Azure Storage access key to use.
427    pub access_key: SecretString,
428}
429
430impl AzureStorageAuthConfig {
431    /// Validates the Azure Blob Storage authentication configuration.
432    pub fn validate(&self) -> Result<()> {
433        if self.account_name.is_empty() {
434            bail!("configuration value `storage.azure.auth.account_name` is required");
435        }
436
437        if self.access_key.inner.expose_secret().is_empty() {
438            bail!("configuration value `storage.azure.auth.access_key` is required");
439        }
440
441        Ok(())
442    }
443
444    /// Redacts the secrets contained in the Azure Blob Storage storage
445    /// authentication configuration.
446    pub fn redact(&mut self) {
447        self.access_key.redact();
448    }
449
450    /// Unredacts the secrets contained in the Azure Blob Storage authentication
451    /// configuration.
452    pub fn unredact(&mut self) {
453        self.access_key.unredact();
454    }
455}
456
457/// Represents configuration for Azure Blob Storage.
458#[derive(Debug, Default, Clone, Serialize, Deserialize)]
459#[serde(rename_all = "snake_case", deny_unknown_fields)]
460pub struct AzureStorageConfig {
461    /// The Azure Blob Storage authentication configuration.
462    #[serde(default, skip_serializing_if = "Option::is_none")]
463    pub auth: Option<AzureStorageAuthConfig>,
464}
465
466impl AzureStorageConfig {
467    /// Validates the Azure Blob Storage configuration.
468    pub fn validate(&self) -> Result<()> {
469        if let Some(auth) = &self.auth {
470            auth.validate()?;
471        }
472
473        Ok(())
474    }
475}
476
477/// Represents authentication information for AWS S3 storage.
478#[derive(Debug, Default, Clone, Serialize, Deserialize)]
479#[serde(rename_all = "snake_case", deny_unknown_fields)]
480pub struct S3StorageAuthConfig {
481    /// The AWS Access Key ID to use.
482    pub access_key_id: String,
483    /// The AWS Secret Access Key to use.
484    pub secret_access_key: SecretString,
485}
486
487impl S3StorageAuthConfig {
488    /// Validates the AWS S3 storage authentication configuration.
489    pub fn validate(&self) -> Result<()> {
490        if self.access_key_id.is_empty() {
491            bail!("configuration value `storage.s3.auth.access_key_id` is required");
492        }
493
494        if self.secret_access_key.inner.expose_secret().is_empty() {
495            bail!("configuration value `storage.s3.auth.secret_access_key` is required");
496        }
497
498        Ok(())
499    }
500
501    /// Redacts the secrets contained in the AWS S3 storage authentication
502    /// configuration.
503    pub fn redact(&mut self) {
504        self.secret_access_key.redact();
505    }
506
507    /// Unredacts the secrets contained in the AWS S3 storage authentication
508    /// configuration.
509    pub fn unredact(&mut self) {
510        self.secret_access_key.unredact();
511    }
512}
513
514/// Represents configuration for AWS S3 storage.
515#[derive(Debug, Default, Clone, Serialize, Deserialize)]
516#[serde(rename_all = "snake_case", deny_unknown_fields)]
517pub struct S3StorageConfig {
518    /// The default region to use for S3-schemed URLs (e.g.
519    /// `s3://<bucket>/<blob>`).
520    ///
521    /// Defaults to `us-east-1`.
522    #[serde(default, skip_serializing_if = "Option::is_none")]
523    pub region: Option<String>,
524
525    /// The AWS S3 storage authentication configuration.
526    #[serde(default, skip_serializing_if = "Option::is_none")]
527    pub auth: Option<S3StorageAuthConfig>,
528}
529
530impl S3StorageConfig {
531    /// Validates the AWS S3 storage configuration.
532    pub fn validate(&self) -> Result<()> {
533        if let Some(auth) = &self.auth {
534            auth.validate()?;
535        }
536
537        Ok(())
538    }
539}
540
541/// Represents authentication information for Google Cloud Storage.
542#[derive(Debug, Default, Clone, Serialize, Deserialize)]
543#[serde(rename_all = "snake_case", deny_unknown_fields)]
544pub struct GoogleStorageAuthConfig {
545    /// The HMAC Access Key to use.
546    pub access_key: String,
547    /// The HMAC Secret to use.
548    pub secret: SecretString,
549}
550
551impl GoogleStorageAuthConfig {
552    /// Validates the Google Cloud Storage authentication configuration.
553    pub fn validate(&self) -> Result<()> {
554        if self.access_key.is_empty() {
555            bail!("configuration value `storage.google.auth.access_key` is required");
556        }
557
558        if self.secret.inner.expose_secret().is_empty() {
559            bail!("configuration value `storage.google.auth.secret` is required");
560        }
561
562        Ok(())
563    }
564
565    /// Redacts the secrets contained in the Google Cloud Storage authentication
566    /// configuration.
567    pub fn redact(&mut self) {
568        self.secret.redact();
569    }
570
571    /// Unredacts the secrets contained in the Google Cloud Storage
572    /// authentication configuration.
573    pub fn unredact(&mut self) {
574        self.secret.unredact();
575    }
576}
577
578/// Represents configuration for Google Cloud Storage.
579#[derive(Debug, Default, Clone, Serialize, Deserialize)]
580#[serde(rename_all = "snake_case", deny_unknown_fields)]
581pub struct GoogleStorageConfig {
582    /// The Google Cloud Storage authentication configuration.
583    #[serde(default, skip_serializing_if = "Option::is_none")]
584    pub auth: Option<GoogleStorageAuthConfig>,
585}
586
587impl GoogleStorageConfig {
588    /// Validates the Google Cloud Storage configuration.
589    pub fn validate(&self) -> Result<()> {
590        if let Some(auth) = &self.auth {
591            auth.validate()?;
592        }
593
594        Ok(())
595    }
596}
597
598/// Represents workflow evaluation configuration.
599#[derive(Debug, Default, Clone, Serialize, Deserialize)]
600#[serde(rename_all = "snake_case", deny_unknown_fields)]
601pub struct WorkflowConfig {
602    /// Scatter statement evaluation configuration.
603    #[serde(default)]
604    pub scatter: ScatterConfig,
605}
606
607impl WorkflowConfig {
608    /// Validates the workflow configuration.
609    pub fn validate(&self) -> Result<()> {
610        self.scatter.validate()?;
611        Ok(())
612    }
613}
614
615/// Represents scatter statement evaluation configuration.
616#[derive(Debug, Default, Clone, Serialize, Deserialize)]
617#[serde(rename_all = "snake_case", deny_unknown_fields)]
618pub struct ScatterConfig {
619    /// The number of scatter array elements to process concurrently.
620    ///
621    /// By default, the value is the parallelism supported by the task
622    /// execution backend.
623    ///
624    /// A value of `0` is invalid.
625    ///
626    /// Lower values use less memory for evaluation and higher values may better
627    /// saturate the task execution backend with tasks to execute.
628    ///
629    /// This setting does not change how many tasks an execution backend can run
630    /// concurrently, but may affect how many tasks are sent to the backend to
631    /// run at a time.
632    ///
633    /// For example, if `concurrency` was set to 10 and we evaluate the
634    /// following scatters:
635    ///
636    /// ```wdl
637    /// scatter (i in range(100)) {
638    ///     call my_task
639    /// }
640    ///
641    /// scatter (j in range(100)) {
642    ///     call my_task as my_task2
643    /// }
644    /// ```
645    ///
646    /// Here each scatter is independent and therefore there will be 20 calls
647    /// (10 for each scatter) made concurrently. If the task execution
648    /// backend can only execute 5 tasks concurrently, 5 tasks will execute
649    /// and 15 will be "ready" to execute and waiting for an executing task
650    /// to complete.
651    ///
652    /// If instead we evaluate the following scatters:
653    ///
654    /// ```wdl
655    /// scatter (i in range(100)) {
656    ///     scatter (j in range(100)) {
657    ///         call my_task
658    ///     }
659    /// }
660    /// ```
661    ///
662    /// Then there will be 100 calls (10*10 as 10 are made for each outer
663    /// element) made concurrently. If the task execution backend can only
664    /// execute 5 tasks concurrently, 5 tasks will execute and 95 will be
665    /// "ready" to execute and waiting for an executing task to complete.
666    ///
667    /// <div class="warning">
668    /// Warning: nested scatter statements cause exponential memory usage based
669    /// on this value, as each scatter statement evaluation requires allocating
670    /// new scopes for scatter array elements being processed. </div>
671    #[serde(default, skip_serializing_if = "Option::is_none")]
672    pub concurrency: Option<u64>,
673}
674
675impl ScatterConfig {
676    /// Validates the scatter configuration.
677    pub fn validate(&self) -> Result<()> {
678        if let Some(concurrency) = self.concurrency
679            && concurrency == 0
680        {
681            bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
682        }
683
684        Ok(())
685    }
686}
687
688/// Represents the supported call caching modes.
689#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
690#[serde(rename_all = "snake_case")]
691pub enum CallCachingMode {
692    /// Call caching is disabled.
693    ///
694    /// The call cache is not checked and new entries are not added to the
695    /// cache.
696    ///
697    /// This is the default value.
698    #[default]
699    Off,
700    /// Call caching is enabled.
701    ///
702    /// The call cache is checked and new entries are added to the cache.
703    ///
704    /// Defaults the `cacheable` task hint to `true`.
705    On,
706    /// Call caching is enabled only for tasks that explicitly have a
707    /// `cacheable` hint set to `true`.
708    ///
709    /// The call cache is checked and new entries are added to the cache *only*
710    /// for tasks that have the `cacheable` hint set to `true`.
711    ///
712    /// Defaults the `cacheable` task hint to `false`.
713    Explicit,
714}
715
716/// Represents task evaluation configuration.
717#[derive(Debug, Default, Clone, Serialize, Deserialize)]
718#[serde(rename_all = "snake_case", deny_unknown_fields)]
719pub struct TaskConfig {
720    /// The default maximum number of retries to attempt if a task fails.
721    ///
722    /// A task's `max_retries` requirement will override this value.
723    ///
724    /// Defaults to 0 (no retries).
725    #[serde(default, skip_serializing_if = "Option::is_none")]
726    pub retries: Option<u64>,
727    /// The default container to use if a container is not specified in a task's
728    /// requirements.
729    ///
730    /// Defaults to `ubuntu:latest`.
731    #[serde(default, skip_serializing_if = "Option::is_none")]
732    pub container: Option<String>,
733    /// The default shell to use for tasks.
734    ///
735    /// Defaults to `bash`.
736    ///
737    /// <div class="warning">
738    /// Warning: the use of a shell other than `bash` may lead to tasks that may
739    /// not be portable to other execution engines.</div>
740    #[serde(default, skip_serializing_if = "Option::is_none")]
741    pub shell: Option<String>,
742    /// The behavior when a task's `cpu` requirement cannot be met.
743    #[serde(default)]
744    pub cpu_limit_behavior: TaskResourceLimitBehavior,
745    /// The behavior when a task's `memory` requirement cannot be met.
746    #[serde(default)]
747    pub memory_limit_behavior: TaskResourceLimitBehavior,
748    /// The call cache directory to use for caching task execution results.
749    ///
750    /// Defaults to an operating system specific cache directory for the user.
751    #[serde(default, skip_serializing_if = "Option::is_none")]
752    pub cache_dir: Option<PathBuf>,
753    /// The call caching mode to use for tasks.
754    #[serde(default)]
755    pub cache: CallCachingMode,
756}
757
758impl TaskConfig {
759    /// Validates the task evaluation configuration.
760    pub fn validate(&self) -> Result<()> {
761        if self.retries.unwrap_or(0) > MAX_RETRIES {
762            bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
763        }
764
765        Ok(())
766    }
767}
768
769/// The behavior when a task resource requirement, such as `cpu` or `memory`,
770/// cannot be met.
771#[derive(Debug, Default, Clone, Serialize, Deserialize)]
772#[serde(rename_all = "snake_case", deny_unknown_fields)]
773pub enum TaskResourceLimitBehavior {
774    /// Try executing a task with the maximum amount of the resource available
775    /// when the task's corresponding requirement cannot be met.
776    TryWithMax,
777    /// Do not execute a task if its corresponding requirement cannot be met.
778    ///
779    /// This is the default behavior.
780    #[default]
781    Deny,
782}
783
784/// Represents supported task execution backends.
785#[derive(Debug, Clone, Serialize, Deserialize)]
786#[serde(rename_all = "snake_case", tag = "type")]
787pub enum BackendConfig {
788    /// Use the local task execution backend.
789    Local(LocalBackendConfig),
790    /// Use the Docker task execution backend.
791    Docker(DockerBackendConfig),
792    /// Use the TES task execution backend.
793    Tes(Box<TesBackendConfig>),
794    /// Use the experimental LSF + Apptainer task execution backend.
795    ///
796    /// Requires enabling experimental features.
797    LsfApptainer(Arc<LsfApptainerBackendConfig>),
798    /// Use the experimental Slurm + Apptainer task execution backend.
799    ///
800    /// Requires enabling experimental features.
801    SlurmApptainer(Arc<SlurmApptainerBackendConfig>),
802}
803
804impl Default for BackendConfig {
805    fn default() -> Self {
806        Self::Docker(Default::default())
807    }
808}
809
810impl BackendConfig {
811    /// Validates the backend configuration.
812    pub async fn validate(&self, engine_config: &Config) -> Result<()> {
813        match self {
814            Self::Local(config) => config.validate(),
815            Self::Docker(config) => config.validate(),
816            Self::Tes(config) => config.validate(),
817            Self::LsfApptainer(config) => config.validate(engine_config).await,
818            Self::SlurmApptainer(config) => config.validate(engine_config).await,
819        }
820    }
821
822    /// Converts the backend configuration into a local backend configuration
823    ///
824    /// Returns `None` if the backend configuration is not local.
825    pub fn as_local(&self) -> Option<&LocalBackendConfig> {
826        match self {
827            Self::Local(config) => Some(config),
828            _ => None,
829        }
830    }
831
832    /// Converts the backend configuration into a Docker backend configuration
833    ///
834    /// Returns `None` if the backend configuration is not Docker.
835    pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
836        match self {
837            Self::Docker(config) => Some(config),
838            _ => None,
839        }
840    }
841
842    /// Converts the backend configuration into a TES backend configuration
843    ///
844    /// Returns `None` if the backend configuration is not TES.
845    pub fn as_tes(&self) -> Option<&TesBackendConfig> {
846        match self {
847            Self::Tes(config) => Some(config),
848            _ => None,
849        }
850    }
851
852    /// Redacts the secrets contained in the backend configuration.
853    pub fn redact(&mut self) {
854        match self {
855            Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
856            Self::Tes(config) => config.redact(),
857        }
858    }
859
860    /// Unredacts the secrets contained in the backend configuration.
861    pub fn unredact(&mut self) {
862        match self {
863            Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) | Self::SlurmApptainer(_) => {}
864            Self::Tes(config) => config.unredact(),
865        }
866    }
867}
868
869/// Represents configuration for the local task execution backend.
870///
871/// <div class="warning">
872/// Warning: the local task execution backend spawns processes on the host
873/// directly without the use of a container; only use this backend on trusted
874/// WDL. </div>
875#[derive(Debug, Default, Clone, Serialize, Deserialize)]
876#[serde(rename_all = "snake_case", deny_unknown_fields)]
877pub struct LocalBackendConfig {
878    /// Set the number of CPUs available for task execution.
879    ///
880    /// Defaults to the number of logical CPUs for the host.
881    ///
882    /// The value cannot be zero or exceed the host's number of CPUs.
883    #[serde(default, skip_serializing_if = "Option::is_none")]
884    pub cpu: Option<u64>,
885
886    /// Set the total amount of memory for task execution as a unit string (e.g.
887    /// `2 GiB`).
888    ///
889    /// Defaults to the total amount of memory for the host.
890    ///
891    /// The value cannot be zero or exceed the host's total amount of memory.
892    #[serde(default, skip_serializing_if = "Option::is_none")]
893    pub memory: Option<String>,
894}
895
896impl LocalBackendConfig {
897    /// Validates the local task execution backend configuration.
898    pub fn validate(&self) -> Result<()> {
899        if let Some(cpu) = self.cpu {
900            if cpu == 0 {
901                bail!("local backend configuration value `cpu` cannot be zero");
902            }
903
904            let total = SYSTEM.cpus().len() as u64;
905            if cpu > total {
906                bail!(
907                    "local backend configuration value `cpu` cannot exceed the virtual CPUs \
908                     available to the host ({total})"
909                );
910            }
911        }
912
913        if let Some(memory) = &self.memory {
914            let memory = convert_unit_string(memory).with_context(|| {
915                format!("local backend configuration value `memory` has invalid value `{memory}`")
916            })?;
917
918            if memory == 0 {
919                bail!("local backend configuration value `memory` cannot be zero");
920            }
921
922            let total = SYSTEM.total_memory();
923            if memory > total {
924                bail!(
925                    "local backend configuration value `memory` cannot exceed the total memory of \
926                     the host ({total} bytes)"
927                );
928            }
929        }
930
931        Ok(())
932    }
933}
934
935/// Gets the default value for the docker `cleanup` field.
936const fn cleanup_default() -> bool {
937    true
938}
939
940/// Represents configuration for the Docker backend.
941#[derive(Debug, Clone, Serialize, Deserialize)]
942#[serde(rename_all = "snake_case", deny_unknown_fields)]
943pub struct DockerBackendConfig {
944    /// Whether or not to remove a task's container after the task completes.
945    ///
946    /// Defaults to `true`.
947    #[serde(default = "cleanup_default")]
948    pub cleanup: bool,
949}
950
951impl DockerBackendConfig {
952    /// Validates the Docker backend configuration.
953    pub fn validate(&self) -> Result<()> {
954        Ok(())
955    }
956}
957
958impl Default for DockerBackendConfig {
959    fn default() -> Self {
960        Self { cleanup: true }
961    }
962}
963
964/// Represents HTTP basic authentication configuration.
965#[derive(Debug, Default, Clone, Serialize, Deserialize)]
966#[serde(rename_all = "snake_case", deny_unknown_fields)]
967pub struct BasicAuthConfig {
968    /// The HTTP basic authentication username.
969    #[serde(default)]
970    pub username: String,
971    /// The HTTP basic authentication password.
972    #[serde(default)]
973    pub password: SecretString,
974}
975
976impl BasicAuthConfig {
977    /// Validates the HTTP basic auth configuration.
978    pub fn validate(&self) -> Result<()> {
979        Ok(())
980    }
981
982    /// Redacts the secrets contained in the HTTP basic auth configuration.
983    pub fn redact(&mut self) {
984        self.password.redact();
985    }
986
987    /// Unredacts the secrets contained in the HTTP basic auth configuration.
988    pub fn unredact(&mut self) {
989        self.password.unredact();
990    }
991}
992
993/// Represents HTTP bearer token authentication configuration.
994#[derive(Debug, Default, Clone, Serialize, Deserialize)]
995#[serde(rename_all = "snake_case", deny_unknown_fields)]
996pub struct BearerAuthConfig {
997    /// The HTTP bearer authentication token.
998    #[serde(default)]
999    pub token: SecretString,
1000}
1001
1002impl BearerAuthConfig {
1003    /// Validates the HTTP bearer auth configuration.
1004    pub fn validate(&self) -> Result<()> {
1005        Ok(())
1006    }
1007
1008    /// Redacts the secrets contained in the HTTP bearer auth configuration.
1009    pub fn redact(&mut self) {
1010        self.token.redact();
1011    }
1012
1013    /// Unredacts the secrets contained in the HTTP bearer auth configuration.
1014    pub fn unredact(&mut self) {
1015        self.token.unredact();
1016    }
1017}
1018
1019/// Represents the kind of authentication for a TES backend.
1020#[derive(Debug, Clone, Serialize, Deserialize)]
1021#[serde(rename_all = "snake_case", tag = "type")]
1022pub enum TesBackendAuthConfig {
1023    /// Use basic authentication for the TES backend.
1024    Basic(BasicAuthConfig),
1025    /// Use bearer token authentication for the TES backend.
1026    Bearer(BearerAuthConfig),
1027}
1028
1029impl TesBackendAuthConfig {
1030    /// Validates the TES backend authentication configuration.
1031    pub fn validate(&self) -> Result<()> {
1032        match self {
1033            Self::Basic(config) => config.validate(),
1034            Self::Bearer(config) => config.validate(),
1035        }
1036    }
1037
1038    /// Redacts the secrets contained in the TES backend authentication
1039    /// configuration.
1040    pub fn redact(&mut self) {
1041        match self {
1042            Self::Basic(auth) => auth.redact(),
1043            Self::Bearer(auth) => auth.redact(),
1044        }
1045    }
1046
1047    /// Unredacts the secrets contained in the TES backend authentication
1048    /// configuration.
1049    pub fn unredact(&mut self) {
1050        match self {
1051            Self::Basic(auth) => auth.unredact(),
1052            Self::Bearer(auth) => auth.unredact(),
1053        }
1054    }
1055}
1056
1057/// Represents configuration for the Task Execution Service (TES) backend.
1058#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1059#[serde(rename_all = "snake_case", deny_unknown_fields)]
1060pub struct TesBackendConfig {
1061    /// The URL of the Task Execution Service.
1062    #[serde(default, skip_serializing_if = "Option::is_none")]
1063    pub url: Option<Url>,
1064
1065    /// The authentication configuration for the TES backend.
1066    #[serde(default, skip_serializing_if = "Option::is_none")]
1067    pub auth: Option<TesBackendAuthConfig>,
1068
1069    /// The root cloud storage URL for storing inputs.
1070    #[serde(default, skip_serializing_if = "Option::is_none")]
1071    pub inputs: Option<Url>,
1072
1073    /// The root cloud storage URL for storing outputs.
1074    #[serde(default, skip_serializing_if = "Option::is_none")]
1075    pub outputs: Option<Url>,
1076
1077    /// The polling interval, in seconds, for checking task status.
1078    ///
1079    /// Defaults to 1 second.
1080    #[serde(default, skip_serializing_if = "Option::is_none")]
1081    pub interval: Option<u64>,
1082
1083    /// The number of retries after encountering an error communicating with the
1084    /// TES server.
1085    ///
1086    /// Defaults to no retries.
1087    pub retries: Option<u32>,
1088
1089    /// The maximum number of concurrent requests the backend will send to the
1090    /// TES server.
1091    ///
1092    /// Defaults to 10 concurrent requests.
1093    #[serde(default, skip_serializing_if = "Option::is_none")]
1094    pub max_concurrency: Option<u32>,
1095
1096    /// Whether or not the TES server URL may use an insecure protocol like
1097    /// HTTP.
1098    #[serde(default)]
1099    pub insecure: bool,
1100}
1101
1102impl TesBackendConfig {
1103    /// Validates the TES backend configuration.
1104    pub fn validate(&self) -> Result<()> {
1105        match &self.url {
1106            Some(url) => {
1107                if !self.insecure && url.scheme() != "https" {
1108                    bail!(
1109                        "TES backend configuration value `url` has invalid value `{url}`: URL \
1110                         must use a HTTPS scheme"
1111                    );
1112                }
1113            }
1114            None => bail!("TES backend configuration value `url` is required"),
1115        }
1116
1117        if let Some(auth) = &self.auth {
1118            auth.validate()?;
1119        }
1120
1121        if let Some(max_concurrency) = self.max_concurrency
1122            && max_concurrency == 0
1123        {
1124            bail!("TES backend configuration value `max_concurrency` cannot be zero");
1125        }
1126
1127        match &self.inputs {
1128            Some(url) => {
1129                if !is_supported_url(url.as_str()) {
1130                    bail!(
1131                        "TES backend storage configuration value `inputs` has invalid value \
1132                         `{url}`: URL scheme is not supported"
1133                    );
1134                }
1135
1136                if !url.path().ends_with('/') {
1137                    bail!(
1138                        "TES backend storage configuration value `inputs` has invalid value \
1139                         `{url}`: URL path must end with a slash"
1140                    );
1141                }
1142            }
1143            None => bail!("TES backend configuration value `inputs` is required"),
1144        }
1145
1146        match &self.outputs {
1147            Some(url) => {
1148                if !is_supported_url(url.as_str()) {
1149                    bail!(
1150                        "TES backend storage configuration value `outputs` has invalid value \
1151                         `{url}`: URL scheme is not supported"
1152                    );
1153                }
1154
1155                if !url.path().ends_with('/') {
1156                    bail!(
1157                        "TES backend storage configuration value `outputs` has invalid value \
1158                         `{url}`: URL path must end with a slash"
1159                    );
1160                }
1161            }
1162            None => bail!("TES backend storage configuration value `outputs` is required"),
1163        }
1164
1165        Ok(())
1166    }
1167
1168    /// Redacts the secrets contained in the TES backend configuration.
1169    pub fn redact(&mut self) {
1170        if let Some(auth) = &mut self.auth {
1171            auth.redact();
1172        }
1173    }
1174
1175    /// Unredacts the secrets contained in the TES backend configuration.
1176    pub fn unredact(&mut self) {
1177        if let Some(auth) = &mut self.auth {
1178            auth.unredact();
1179        }
1180    }
1181}
1182
1183#[cfg(test)]
1184mod test {
1185    use pretty_assertions::assert_eq;
1186
1187    use super::*;
1188
1189    #[test]
1190    fn redacted_secret() {
1191        let mut secret: SecretString = "secret".into();
1192
1193        assert_eq!(
1194            serde_json::to_string(&secret).unwrap(),
1195            format!(r#""{REDACTED}""#)
1196        );
1197
1198        secret.unredact();
1199        assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1200
1201        secret.redact();
1202        assert_eq!(
1203            serde_json::to_string(&secret).unwrap(),
1204            format!(r#""{REDACTED}""#)
1205        );
1206    }
1207
1208    #[test]
1209    fn redacted_config() {
1210        let config = Config {
1211            backends: [
1212                (
1213                    "first".to_string(),
1214                    BackendConfig::Tes(
1215                        TesBackendConfig {
1216                            auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1217                                username: "foo".into(),
1218                                password: "secret".into(),
1219                            })),
1220                            ..Default::default()
1221                        }
1222                        .into(),
1223                    ),
1224                ),
1225                (
1226                    "second".to_string(),
1227                    BackendConfig::Tes(
1228                        TesBackendConfig {
1229                            auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1230                                token: "secret".into(),
1231                            })),
1232                            ..Default::default()
1233                        }
1234                        .into(),
1235                    ),
1236                ),
1237            ]
1238            .into(),
1239            storage: StorageConfig {
1240                azure: AzureStorageConfig {
1241                    auth: Some(AzureStorageAuthConfig {
1242                        account_name: "foo".into(),
1243                        access_key: "secret".into(),
1244                    }),
1245                },
1246                s3: S3StorageConfig {
1247                    auth: Some(S3StorageAuthConfig {
1248                        access_key_id: "foo".into(),
1249                        secret_access_key: "secret".into(),
1250                    }),
1251                    ..Default::default()
1252                },
1253                google: GoogleStorageConfig {
1254                    auth: Some(GoogleStorageAuthConfig {
1255                        access_key: "foo".into(),
1256                        secret: "secret".into(),
1257                    }),
1258                },
1259            },
1260            ..Default::default()
1261        };
1262
1263        let json = serde_json::to_string_pretty(&config).unwrap();
1264        assert!(json.contains("secret"), "`{json}` contains a secret");
1265    }
1266
1267    #[tokio::test]
1268    async fn test_config_validate() {
1269        // Test invalid task config
1270        let mut config = Config::default();
1271        config.task.retries = Some(1000000);
1272        assert_eq!(
1273            config.validate().await.unwrap_err().to_string(),
1274            "configuration value `task.retries` cannot exceed 100"
1275        );
1276
1277        // Test invalid scatter concurrency config
1278        let mut config = Config::default();
1279        config.workflow.scatter.concurrency = Some(0);
1280        assert_eq!(
1281            config.validate().await.unwrap_err().to_string(),
1282            "configuration value `workflow.scatter.concurrency` cannot be zero"
1283        );
1284
1285        // Test invalid backend name
1286        let config = Config {
1287            backend: Some("foo".into()),
1288            ..Default::default()
1289        };
1290        assert_eq!(
1291            config.validate().await.unwrap_err().to_string(),
1292            "a backend named `foo` is not present in the configuration"
1293        );
1294        let config = Config {
1295            backend: Some("bar".into()),
1296            backends: [("foo".to_string(), BackendConfig::default())].into(),
1297            ..Default::default()
1298        };
1299        assert_eq!(
1300            config.validate().await.unwrap_err().to_string(),
1301            "a backend named `bar` is not present in the configuration"
1302        );
1303
1304        // Test a singular backend
1305        let config = Config {
1306            backends: [("foo".to_string(), BackendConfig::default())].into(),
1307            ..Default::default()
1308        };
1309        config.validate().await.expect("config should validate");
1310
1311        // Test invalid local backend cpu config
1312        let config = Config {
1313            backends: [(
1314                "default".to_string(),
1315                BackendConfig::Local(LocalBackendConfig {
1316                    cpu: Some(0),
1317                    ..Default::default()
1318                }),
1319            )]
1320            .into(),
1321            ..Default::default()
1322        };
1323        assert_eq!(
1324            config.validate().await.unwrap_err().to_string(),
1325            "local backend configuration value `cpu` cannot be zero"
1326        );
1327        let config = Config {
1328            backends: [(
1329                "default".to_string(),
1330                BackendConfig::Local(LocalBackendConfig {
1331                    cpu: Some(10000000),
1332                    ..Default::default()
1333                }),
1334            )]
1335            .into(),
1336            ..Default::default()
1337        };
1338        assert!(
1339            config
1340                .validate()
1341                .await
1342                .unwrap_err()
1343                .to_string()
1344                .starts_with(
1345                    "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1346                     available to the host"
1347                )
1348        );
1349
1350        // Test invalid local backend memory config
1351        let config = Config {
1352            backends: [(
1353                "default".to_string(),
1354                BackendConfig::Local(LocalBackendConfig {
1355                    memory: Some("0 GiB".to_string()),
1356                    ..Default::default()
1357                }),
1358            )]
1359            .into(),
1360            ..Default::default()
1361        };
1362        assert_eq!(
1363            config.validate().await.unwrap_err().to_string(),
1364            "local backend configuration value `memory` cannot be zero"
1365        );
1366        let config = Config {
1367            backends: [(
1368                "default".to_string(),
1369                BackendConfig::Local(LocalBackendConfig {
1370                    memory: Some("100 meows".to_string()),
1371                    ..Default::default()
1372                }),
1373            )]
1374            .into(),
1375            ..Default::default()
1376        };
1377        assert_eq!(
1378            config.validate().await.unwrap_err().to_string(),
1379            "local backend configuration value `memory` has invalid value `100 meows`"
1380        );
1381
1382        let config = Config {
1383            backends: [(
1384                "default".to_string(),
1385                BackendConfig::Local(LocalBackendConfig {
1386                    memory: Some("1000 TiB".to_string()),
1387                    ..Default::default()
1388                }),
1389            )]
1390            .into(),
1391            ..Default::default()
1392        };
1393        assert!(
1394            config
1395                .validate()
1396                .await
1397                .unwrap_err()
1398                .to_string()
1399                .starts_with(
1400                    "local backend configuration value `memory` cannot exceed the total memory of \
1401                     the host"
1402                )
1403        );
1404
1405        // Test missing TES URL
1406        let config = Config {
1407            backends: [(
1408                "default".to_string(),
1409                BackendConfig::Tes(Default::default()),
1410            )]
1411            .into(),
1412            ..Default::default()
1413        };
1414        assert_eq!(
1415            config.validate().await.unwrap_err().to_string(),
1416            "TES backend configuration value `url` is required"
1417        );
1418
1419        // Test TES invalid max concurrency
1420        let config = Config {
1421            backends: [(
1422                "default".to_string(),
1423                BackendConfig::Tes(
1424                    TesBackendConfig {
1425                        url: Some("https://example.com".parse().unwrap()),
1426                        max_concurrency: Some(0),
1427                        ..Default::default()
1428                    }
1429                    .into(),
1430                ),
1431            )]
1432            .into(),
1433            ..Default::default()
1434        };
1435        assert_eq!(
1436            config.validate().await.unwrap_err().to_string(),
1437            "TES backend configuration value `max_concurrency` cannot be zero"
1438        );
1439
1440        // Insecure TES URL
1441        let config = Config {
1442            backends: [(
1443                "default".to_string(),
1444                BackendConfig::Tes(
1445                    TesBackendConfig {
1446                        url: Some("http://example.com".parse().unwrap()),
1447                        inputs: Some("http://example.com".parse().unwrap()),
1448                        outputs: Some("http://example.com".parse().unwrap()),
1449                        ..Default::default()
1450                    }
1451                    .into(),
1452                ),
1453            )]
1454            .into(),
1455            ..Default::default()
1456        };
1457        assert_eq!(
1458            config.validate().await.unwrap_err().to_string(),
1459            "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
1460             must use a HTTPS scheme"
1461        );
1462
1463        // Allow insecure URL
1464        let config = Config {
1465            backends: [(
1466                "default".to_string(),
1467                BackendConfig::Tes(
1468                    TesBackendConfig {
1469                        url: Some("http://example.com".parse().unwrap()),
1470                        inputs: Some("http://example.com".parse().unwrap()),
1471                        outputs: Some("http://example.com".parse().unwrap()),
1472                        insecure: true,
1473                        ..Default::default()
1474                    }
1475                    .into(),
1476                ),
1477            )]
1478            .into(),
1479            ..Default::default()
1480        };
1481        config
1482            .validate()
1483            .await
1484            .expect("configuration should validate");
1485
1486        let mut config = Config::default();
1487        config.http.parallelism = Some(0);
1488        assert_eq!(
1489            config.validate().await.unwrap_err().to_string(),
1490            "configuration value `http.parallelism` cannot be zero"
1491        );
1492
1493        let mut config = Config::default();
1494        config.http.parallelism = Some(5);
1495        assert!(
1496            config.validate().await.is_ok(),
1497            "should pass for valid configuration"
1498        );
1499
1500        let mut config = Config::default();
1501        config.http.parallelism = None;
1502        assert!(
1503            config.validate().await.is_ok(),
1504            "should pass for default (None)"
1505        );
1506    }
1507}