wdl_engine/
config.rs

1//! Implementation of engine configuration.
2
3use std::borrow::Cow;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use anyhow::Context;
8use anyhow::Result;
9use anyhow::anyhow;
10use anyhow::bail;
11use crankshaft::events::Event;
12use indexmap::IndexMap;
13use secrecy::ExposeSecret;
14use serde::Deserialize;
15use serde::Serialize;
16use tokio::sync::broadcast;
17use tracing::warn;
18use url::Url;
19
20use crate::DockerBackend;
21use crate::LocalBackend;
22use crate::LsfApptainerBackend;
23use crate::LsfApptainerBackendConfig;
24use crate::SYSTEM;
25use crate::TaskExecutionBackend;
26use crate::TesBackend;
27use crate::convert_unit_string;
28use crate::path::is_url;
29
30/// The inclusive maximum number of task retries the engine supports.
31pub const MAX_RETRIES: u64 = 100;
32
33/// The default task shell.
34pub const DEFAULT_TASK_SHELL: &str = "bash";
35
36/// The default backend name.
37pub const DEFAULT_BACKEND_NAME: &str = "default";
38
39/// The string that replaces redacted serialization fields.
40const REDACTED: &str = "<REDACTED>";
41
42/// Represents a secret string that is, by default, redacted for serialization.
43///
44/// This type is a wrapper around [`secrecy::SecretString`].
45#[derive(Debug, Clone)]
46pub struct SecretString {
47    /// The inner secret string.
48    ///
49    /// This type is not serializable.
50    inner: secrecy::SecretString,
51    /// Whether or not the secret string is redacted for serialization.
52    ///
53    /// If `true` (the default), `<REDACTED>` is serialized for the string's
54    /// value.
55    ///
56    /// If `false`, the inner secret string is exposed for serialization.
57    redacted: bool,
58}
59
60impl SecretString {
61    /// Redacts the secret for serialization.
62    ///
63    /// By default, a [`SecretString`] is redacted; when redacted, the string is
64    /// replaced with `<REDACTED>` when serialized.
65    pub fn redact(&mut self) {
66        self.redacted = true;
67    }
68
69    /// Unredacts the secret for serialization.
70    pub fn unredact(&mut self) {
71        self.redacted = false;
72    }
73
74    /// Gets the inner [`secrecy::SecretString`].
75    pub fn inner(&self) -> &secrecy::SecretString {
76        &self.inner
77    }
78}
79
80impl From<String> for SecretString {
81    fn from(s: String) -> Self {
82        Self {
83            inner: s.into(),
84            redacted: true,
85        }
86    }
87}
88
89impl From<&str> for SecretString {
90    fn from(s: &str) -> Self {
91        Self {
92            inner: s.into(),
93            redacted: true,
94        }
95    }
96}
97
98impl Default for SecretString {
99    fn default() -> Self {
100        Self {
101            inner: Default::default(),
102            redacted: true,
103        }
104    }
105}
106
107impl serde::Serialize for SecretString {
108    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
109    where
110        S: serde::Serializer,
111    {
112        use secrecy::ExposeSecret;
113
114        if self.redacted {
115            serializer.serialize_str(REDACTED)
116        } else {
117            serializer.serialize_str(self.inner.expose_secret())
118        }
119    }
120}
121
122impl<'de> serde::Deserialize<'de> for SecretString {
123    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
124    where
125        D: serde::Deserializer<'de>,
126    {
127        let inner = secrecy::SecretString::deserialize(deserializer)?;
128        Ok(Self {
129            inner,
130            redacted: true,
131        })
132    }
133}
134
135/// Represents WDL evaluation configuration.
136///
137/// <div class="warning">
138///
139/// By default, serialization of [`Config`] will redact the values of secrets.
140///
141/// Use the [`Config::unredact`] method before serialization to prevent the
142/// secrets from being redacted.
143///
144/// </div>
145#[derive(Debug, Default, Clone, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case", deny_unknown_fields)]
147pub struct Config {
148    /// HTTP configuration.
149    #[serde(default)]
150    pub http: HttpConfig,
151    /// Workflow evaluation configuration.
152    #[serde(default)]
153    pub workflow: WorkflowConfig,
154    /// Task evaluation configuration.
155    #[serde(default)]
156    pub task: TaskConfig,
157    /// The name of the backend to use.
158    ///
159    /// If not specified and `backends` has multiple entries, it will use a name
160    /// of `default`.
161    #[serde(skip_serializing_if = "Option::is_none")]
162    pub backend: Option<String>,
163    /// Task execution backends configuration.
164    ///
165    /// If the collection is empty and `backend` is not specified, the engine
166    /// default backend is used.
167    ///
168    /// If the collection has exactly one entry and `backend` is not specified,
169    /// the singular entry will be used.
170    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
171    pub backends: IndexMap<String, BackendConfig>,
172    /// Storage configuration.
173    #[serde(default)]
174    pub storage: StorageConfig,
175    /// (Experimental) Avoid environment-specific output; default is `false`.
176    ///
177    /// If this option is `true`, selected error messages and log output will
178    /// avoid emitting environment-specific output such as absolute paths
179    /// and system resource counts.
180    ///
181    /// This is largely meant to support "golden testing" where a test's success
182    /// depends on matching an expected set of outputs exactly. Cues that
183    /// help users overcome errors, such as the path to a temporary
184    /// directory or the number of CPUs available to the system, confound this
185    /// style of testing. This flag is a best-effort experimental attempt to
186    /// reduce the impact of these differences in order to allow a wider
187    /// range of golden tests to be written.
188    #[serde(default)]
189    pub suppress_env_specific_output: bool,
190    /// (Experimental) Whether experimental features are enabled; default is
191    /// `false`.
192    ///
193    /// Experimental features are provided to users with heavy caveats about
194    /// their stability and rough edges. Use at your own risk, but feedback
195    /// is quite welcome.
196    #[serde(default)]
197    pub experimental_features_enabled: bool,
198}
199
200impl Config {
201    /// Validates the evaluation configuration.
202    pub async fn validate(&self) -> Result<()> {
203        self.http.validate()?;
204        self.workflow.validate()?;
205        self.task.validate()?;
206
207        if self.backend.is_none() && self.backends.len() < 2 {
208            // This is OK, we'll use either the singular backends entry (1) or
209            // the default (0)
210        } else {
211            // Check the backends map for the backend name (or "default")
212            let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
213            if !self.backends.contains_key(backend) {
214                bail!("a backend named `{backend}` is not present in the configuration");
215            }
216        }
217
218        for backend in self.backends.values() {
219            backend.validate(self).await?;
220        }
221
222        self.storage.validate()?;
223
224        if self.suppress_env_specific_output && !self.experimental_features_enabled {
225            bail!("`suppress_env_specific_output` requires enabling experimental features");
226        }
227
228        Ok(())
229    }
230
231    /// Redacts the secrets contained in the configuration.
232    ///
233    /// By default, secrets are redacted for serialization.
234    pub fn redact(&mut self) {
235        for backend in self.backends.values_mut() {
236            backend.redact();
237        }
238
239        self.storage.azure.redact();
240
241        if let Some(auth) = &mut self.storage.s3.auth {
242            auth.redact();
243        }
244
245        if let Some(auth) = &mut self.storage.google.auth {
246            auth.redact();
247        }
248    }
249
250    /// Unredacts the secrets contained in the configuration.
251    ///
252    /// Calling this method will expose secrets for serialization.
253    pub fn unredact(&mut self) {
254        for backend in self.backends.values_mut() {
255            backend.unredact();
256        }
257
258        self.storage.azure.unredact();
259
260        if let Some(auth) = &mut self.storage.s3.auth {
261            auth.unredact();
262        }
263
264        if let Some(auth) = &mut self.storage.google.auth {
265            auth.unredact();
266        }
267    }
268
269    /// Creates a new task execution backend based on this configuration.
270    pub async fn create_backend(
271        self: &Arc<Self>,
272        events: Option<broadcast::Sender<Event>>,
273    ) -> Result<Arc<dyn TaskExecutionBackend>> {
274        let config = if self.backend.is_none() && self.backends.len() < 2 {
275            if self.backends.len() == 1 {
276                // Use the singular entry
277                Cow::Borrowed(self.backends.values().next().unwrap())
278            } else {
279                // Use the default
280                Cow::Owned(BackendConfig::default())
281            }
282        } else {
283            // Lookup the backend to use
284            let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
285            Cow::Borrowed(self.backends.get(backend).ok_or_else(|| {
286                anyhow!("a backend named `{backend}` is not present in the configuration")
287            })?)
288        };
289
290        match config.as_ref() {
291            BackendConfig::Local(config) => {
292                warn!(
293                    "the engine is configured to use the local backend: tasks will not be run \
294                     inside of a container"
295                );
296                Ok(Arc::new(LocalBackend::new(self.clone(), config, events)?))
297            }
298            BackendConfig::Docker(config) => Ok(Arc::new(
299                DockerBackend::new(self.clone(), config, events).await?,
300            )),
301            BackendConfig::Tes(config) => Ok(Arc::new(
302                TesBackend::new(self.clone(), config, events).await?,
303            )),
304            BackendConfig::LsfApptainer(config) => Ok(Arc::new(LsfApptainerBackend::new(
305                self.clone(),
306                config.clone(),
307                events,
308            ))),
309        }
310    }
311}
312
313/// Represents HTTP configuration.
314#[derive(Debug, Default, Clone, Serialize, Deserialize)]
315#[serde(rename_all = "snake_case", deny_unknown_fields)]
316pub struct HttpConfig {
317    /// The HTTP download cache location.
318    ///
319    /// Defaults to using the system cache directory.
320    #[serde(default, skip_serializing_if = "Option::is_none")]
321    pub cache: Option<PathBuf>,
322    /// The number of retries for transferring files.
323    ///
324    /// Defaults to `5`.
325    #[serde(default, skip_serializing_if = "Option::is_none")]
326    pub retries: Option<usize>,
327    /// The maximum parallelism for file transfers.
328    ///
329    /// Defaults to the host's available parallelism.
330    #[serde(default, skip_serializing_if = "Option::is_none")]
331    pub parallelism: Option<usize>,
332}
333
334impl HttpConfig {
335    /// Validates the HTTP configuration.
336    pub fn validate(&self) -> Result<()> {
337        if let Some(parallelism) = self.parallelism
338            && parallelism == 0
339        {
340            bail!("configuration value `http.parallelism` cannot be zero");
341        }
342        Ok(())
343    }
344}
345
346/// Represents storage configuration.
347#[derive(Debug, Default, Clone, Serialize, Deserialize)]
348#[serde(rename_all = "snake_case", deny_unknown_fields)]
349pub struct StorageConfig {
350    /// Azure Blob Storage configuration.
351    #[serde(default)]
352    pub azure: AzureStorageConfig,
353    /// AWS S3 configuration.
354    #[serde(default)]
355    pub s3: S3StorageConfig,
356    /// Google Cloud Storage configuration.
357    #[serde(default)]
358    pub google: GoogleStorageConfig,
359}
360
361impl StorageConfig {
362    /// Validates the HTTP configuration.
363    pub fn validate(&self) -> Result<()> {
364        self.azure.validate()?;
365        self.s3.validate()?;
366        self.google.validate()?;
367        Ok(())
368    }
369}
370
371/// Represents configuration for Azure Blob Storage.
372#[derive(Debug, Default, Clone, Serialize, Deserialize)]
373#[serde(rename_all = "snake_case", deny_unknown_fields)]
374pub struct AzureStorageConfig {
375    /// The Azure Blob Storage authentication configuration.
376    ///
377    /// The key for the outer map is the Azure Storage account name.
378    ///
379    /// The key for the inner map is the Azure Storage container name.
380    ///
381    /// The value for the inner map is the SAS token to apply for requests to
382    /// the Azure Storage container.
383    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
384    pub auth: IndexMap<String, IndexMap<String, SecretString>>,
385}
386
387impl AzureStorageConfig {
388    /// Validates the Azure Blob Storage configuration.
389    pub fn validate(&self) -> Result<()> {
390        Ok(())
391    }
392
393    /// Redacts the secrets contained in the Azure Blob Storage configuration.
394    pub fn redact(&mut self) {
395        for v in self.auth.values_mut() {
396            for v in v.values_mut() {
397                v.redact();
398            }
399        }
400    }
401
402    /// Unredacts the secrets contained in the Azure Blob Storage configuration.
403    pub fn unredact(&mut self) {
404        for v in self.auth.values_mut() {
405            for v in v.values_mut() {
406                v.unredact();
407            }
408        }
409    }
410}
411
412/// Represents authentication information for AWS S3 storage.
413#[derive(Debug, Default, Clone, Serialize, Deserialize)]
414#[serde(rename_all = "snake_case", deny_unknown_fields)]
415pub struct S3StorageAuthConfig {
416    /// The AWS Access Key ID to use.
417    pub access_key_id: String,
418    /// The AWS Secret Access Key to use.
419    pub secret_access_key: SecretString,
420}
421
422impl S3StorageAuthConfig {
423    /// Validates the AWS S3 storage authentication configuration.
424    pub fn validate(&self) -> Result<()> {
425        if self.access_key_id.is_empty() {
426            bail!("configuration value `storage.s3.auth.access_key_id` is required");
427        }
428
429        if self.secret_access_key.inner.expose_secret().is_empty() {
430            bail!("configuration value `storage.s3.auth.secret_access_key` is required");
431        }
432
433        Ok(())
434    }
435
436    /// Redacts the secrets contained in the AWS S3 storage authentication
437    /// configuration.
438    pub fn redact(&mut self) {
439        self.secret_access_key.redact();
440    }
441
442    /// Unredacts the secrets contained in the AWS S3 storage authentication
443    /// configuration.
444    pub fn unredact(&mut self) {
445        self.secret_access_key.unredact();
446    }
447}
448
449/// Represents configuration for AWS S3 storage.
450#[derive(Debug, Default, Clone, Serialize, Deserialize)]
451#[serde(rename_all = "snake_case", deny_unknown_fields)]
452pub struct S3StorageConfig {
453    /// The default region to use for S3-schemed URLs (e.g.
454    /// `s3://<bucket>/<blob>`).
455    ///
456    /// Defaults to `us-east-1`.
457    #[serde(default, skip_serializing_if = "Option::is_none")]
458    pub region: Option<String>,
459
460    /// The AWS S3 storage authentication configuration.
461    #[serde(default, skip_serializing_if = "Option::is_none")]
462    pub auth: Option<S3StorageAuthConfig>,
463}
464
465impl S3StorageConfig {
466    /// Validates the AWS S3 storage configuration.
467    pub fn validate(&self) -> Result<()> {
468        if let Some(auth) = &self.auth {
469            auth.validate()?;
470        }
471
472        Ok(())
473    }
474}
475
476/// Represents authentication information for Google Cloud Storage.
477#[derive(Debug, Default, Clone, Serialize, Deserialize)]
478#[serde(rename_all = "snake_case", deny_unknown_fields)]
479pub struct GoogleStorageAuthConfig {
480    /// The HMAC Access Key to use.
481    pub access_key: String,
482    /// The HMAC Secret to use.
483    pub secret: SecretString,
484}
485
486impl GoogleStorageAuthConfig {
487    /// Validates the Google Cloud Storage authentication configuration.
488    pub fn validate(&self) -> Result<()> {
489        if self.access_key.is_empty() {
490            bail!("configuration value `storage.google.auth.access_key` is required");
491        }
492
493        if self.secret.inner.expose_secret().is_empty() {
494            bail!("configuration value `storage.google.auth.secret` is required");
495        }
496
497        Ok(())
498    }
499
500    /// Redacts the secrets contained in the Google Cloud Storage authentication
501    /// configuration.
502    pub fn redact(&mut self) {
503        self.secret.redact();
504    }
505
506    /// Unredacts the secrets contained in the Google Cloud Storage
507    /// authentication configuration.
508    pub fn unredact(&mut self) {
509        self.secret.unredact();
510    }
511}
512
513/// Represents configuration for Google Cloud Storage.
514#[derive(Debug, Default, Clone, Serialize, Deserialize)]
515#[serde(rename_all = "snake_case", deny_unknown_fields)]
516pub struct GoogleStorageConfig {
517    /// The Google Cloud Storage authentication configuration.
518    #[serde(default, skip_serializing_if = "Option::is_none")]
519    pub auth: Option<GoogleStorageAuthConfig>,
520}
521
522impl GoogleStorageConfig {
523    /// Validates the Google Cloud Storage configuration.
524    pub fn validate(&self) -> Result<()> {
525        if let Some(auth) = &self.auth {
526            auth.validate()?;
527        }
528
529        Ok(())
530    }
531}
532
533/// Represents workflow evaluation configuration.
534#[derive(Debug, Default, Clone, Serialize, Deserialize)]
535#[serde(rename_all = "snake_case", deny_unknown_fields)]
536pub struct WorkflowConfig {
537    /// Scatter statement evaluation configuration.
538    #[serde(default)]
539    pub scatter: ScatterConfig,
540}
541
542impl WorkflowConfig {
543    /// Validates the workflow configuration.
544    pub fn validate(&self) -> Result<()> {
545        self.scatter.validate()?;
546        Ok(())
547    }
548}
549
550/// Represents scatter statement evaluation configuration.
551#[derive(Debug, Default, Clone, Serialize, Deserialize)]
552#[serde(rename_all = "snake_case", deny_unknown_fields)]
553pub struct ScatterConfig {
554    /// The number of scatter array elements to process concurrently.
555    ///
556    /// By default, the value is the parallelism supported by the task
557    /// execution backend.
558    ///
559    /// A value of `0` is invalid.
560    ///
561    /// Lower values use less memory for evaluation and higher values may better
562    /// saturate the task execution backend with tasks to execute.
563    ///
564    /// This setting does not change how many tasks an execution backend can run
565    /// concurrently, but may affect how many tasks are sent to the backend to
566    /// run at a time.
567    ///
568    /// For example, if `concurrency` was set to 10 and we evaluate the
569    /// following scatters:
570    ///
571    /// ```wdl
572    /// scatter (i in range(100)) {
573    ///     call my_task
574    /// }
575    ///
576    /// scatter (j in range(100)) {
577    ///     call my_task as my_task2
578    /// }
579    /// ```
580    ///
581    /// Here each scatter is independent and therefore there will be 20 calls
582    /// (10 for each scatter) made concurrently. If the task execution
583    /// backend can only execute 5 tasks concurrently, 5 tasks will execute
584    /// and 15 will be "ready" to execute and waiting for an executing task
585    /// to complete.
586    ///
587    /// If instead we evaluate the following scatters:
588    ///
589    /// ```wdl
590    /// scatter (i in range(100)) {
591    ///     scatter (j in range(100)) {
592    ///         call my_task
593    ///     }
594    /// }
595    /// ```
596    ///
597    /// Then there will be 100 calls (10*10 as 10 are made for each outer
598    /// element) made concurrently. If the task execution backend can only
599    /// execute 5 tasks concurrently, 5 tasks will execute and 95 will be
600    /// "ready" to execute and waiting for an executing task to complete.
601    ///
602    /// <div class="warning">
603    /// Warning: nested scatter statements cause exponential memory usage based
604    /// on this value, as each scatter statement evaluation requires allocating
605    /// new scopes for scatter array elements being processed. </div>
606    #[serde(default, skip_serializing_if = "Option::is_none")]
607    pub concurrency: Option<u64>,
608}
609
610impl ScatterConfig {
611    /// Validates the scatter configuration.
612    pub fn validate(&self) -> Result<()> {
613        if let Some(concurrency) = self.concurrency
614            && concurrency == 0
615        {
616            bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
617        }
618
619        Ok(())
620    }
621}
622
623/// Represents task evaluation configuration.
624#[derive(Debug, Default, Clone, Serialize, Deserialize)]
625#[serde(rename_all = "snake_case", deny_unknown_fields)]
626pub struct TaskConfig {
627    /// The default maximum number of retries to attempt if a task fails.
628    ///
629    /// A task's `max_retries` requirement will override this value.
630    ///
631    /// Defaults to 0 (no retries).
632    #[serde(default, skip_serializing_if = "Option::is_none")]
633    pub retries: Option<u64>,
634    /// The default container to use if a container is not specified in a task's
635    /// requirements.
636    ///
637    /// Defaults to `ubuntu:latest`.
638    #[serde(default, skip_serializing_if = "Option::is_none")]
639    pub container: Option<String>,
640    /// The default shell to use for tasks.
641    ///
642    /// Defaults to `bash`.
643    ///
644    /// <div class="warning">
645    /// Warning: the use of a shell other than `bash` may lead to tasks that may
646    /// not be portable to other execution engines.</div>
647    #[serde(default, skip_serializing_if = "Option::is_none")]
648    pub shell: Option<String>,
649    /// The behavior when a task's `cpu` requirement cannot be met.
650    #[serde(default)]
651    pub cpu_limit_behavior: TaskResourceLimitBehavior,
652    /// The behavior when a task's `memory` requirement cannot be met.
653    #[serde(default)]
654    pub memory_limit_behavior: TaskResourceLimitBehavior,
655}
656
657impl TaskConfig {
658    /// Validates the task evaluation configuration.
659    pub fn validate(&self) -> Result<()> {
660        if self.retries.unwrap_or(0) > MAX_RETRIES {
661            bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
662        }
663
664        Ok(())
665    }
666}
667
668/// The behavior when a task resource requirement, such as `cpu` or `memory`,
669/// cannot be met.
670#[derive(Debug, Default, Clone, Serialize, Deserialize)]
671#[serde(rename_all = "snake_case", deny_unknown_fields)]
672pub enum TaskResourceLimitBehavior {
673    /// Try executing a task with the maximum amount of the resource available
674    /// when the task's corresponding requirement cannot be met.
675    TryWithMax,
676    /// Do not execute a task if its corresponding requirement cannot be met.
677    ///
678    /// This is the default behavior.
679    #[default]
680    Deny,
681}
682
683/// Represents supported task execution backends.
684#[derive(Debug, Clone, Serialize, Deserialize)]
685#[serde(rename_all = "snake_case", tag = "type")]
686pub enum BackendConfig {
687    /// Use the local task execution backend.
688    Local(LocalBackendConfig),
689    /// Use the Docker task execution backend.
690    Docker(DockerBackendConfig),
691    /// Use the TES task execution backend.
692    Tes(Box<TesBackendConfig>),
693    /// Use the experimental LSF + Apptainer task execution backend.
694    ///
695    /// Requires enabling experimental features.
696    LsfApptainer(Arc<LsfApptainerBackendConfig>),
697}
698
699impl Default for BackendConfig {
700    fn default() -> Self {
701        Self::Docker(Default::default())
702    }
703}
704
705impl BackendConfig {
706    /// Validates the backend configuration.
707    pub async fn validate(&self, engine_config: &Config) -> Result<()> {
708        match self {
709            Self::Local(config) => config.validate(),
710            Self::Docker(config) => config.validate(),
711            Self::Tes(config) => config.validate(),
712            Self::LsfApptainer(config) => config.validate(engine_config).await,
713        }
714    }
715
716    /// Converts the backend configuration into a local backend configuration
717    ///
718    /// Returns `None` if the backend configuration is not local.
719    pub fn as_local(&self) -> Option<&LocalBackendConfig> {
720        match self {
721            Self::Local(config) => Some(config),
722            _ => None,
723        }
724    }
725
726    /// Converts the backend configuration into a Docker backend configuration
727    ///
728    /// Returns `None` if the backend configuration is not Docker.
729    pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
730        match self {
731            Self::Docker(config) => Some(config),
732            _ => None,
733        }
734    }
735
736    /// Converts the backend configuration into a TES backend configuration
737    ///
738    /// Returns `None` if the backend configuration is not TES.
739    pub fn as_tes(&self) -> Option<&TesBackendConfig> {
740        match self {
741            Self::Tes(config) => Some(config),
742            _ => None,
743        }
744    }
745
746    /// Redacts the secrets contained in the backend configuration.
747    pub fn redact(&mut self) {
748        match self {
749            Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) => {}
750            Self::Tes(config) => config.redact(),
751        }
752    }
753
754    /// Unredacts the secrets contained in the backend configuration.
755    pub fn unredact(&mut self) {
756        match self {
757            Self::Local(_) | Self::Docker(_) | Self::LsfApptainer(_) => {}
758            Self::Tes(config) => config.unredact(),
759        }
760    }
761}
762
763/// Represents configuration for the local task execution backend.
764///
765/// <div class="warning">
766/// Warning: the local task execution backend spawns processes on the host
767/// directly without the use of a container; only use this backend on trusted
768/// WDL. </div>
769#[derive(Debug, Default, Clone, Serialize, Deserialize)]
770#[serde(rename_all = "snake_case", deny_unknown_fields)]
771pub struct LocalBackendConfig {
772    /// Set the number of CPUs available for task execution.
773    ///
774    /// Defaults to the number of logical CPUs for the host.
775    ///
776    /// The value cannot be zero or exceed the host's number of CPUs.
777    #[serde(default, skip_serializing_if = "Option::is_none")]
778    pub cpu: Option<u64>,
779
780    /// Set the total amount of memory for task execution as a unit string (e.g.
781    /// `2 GiB`).
782    ///
783    /// Defaults to the total amount of memory for the host.
784    ///
785    /// The value cannot be zero or exceed the host's total amount of memory.
786    #[serde(default, skip_serializing_if = "Option::is_none")]
787    pub memory: Option<String>,
788}
789
790impl LocalBackendConfig {
791    /// Validates the local task execution backend configuration.
792    pub fn validate(&self) -> Result<()> {
793        if let Some(cpu) = self.cpu {
794            if cpu == 0 {
795                bail!("local backend configuration value `cpu` cannot be zero");
796            }
797
798            let total = SYSTEM.cpus().len() as u64;
799            if cpu > total {
800                bail!(
801                    "local backend configuration value `cpu` cannot exceed the virtual CPUs \
802                     available to the host ({total})"
803                );
804            }
805        }
806
807        if let Some(memory) = &self.memory {
808            let memory = convert_unit_string(memory).with_context(|| {
809                format!("local backend configuration value `memory` has invalid value `{memory}`")
810            })?;
811
812            if memory == 0 {
813                bail!("local backend configuration value `memory` cannot be zero");
814            }
815
816            let total = SYSTEM.total_memory();
817            if memory > total {
818                bail!(
819                    "local backend configuration value `memory` cannot exceed the total memory of \
820                     the host ({total} bytes)"
821                );
822            }
823        }
824
825        Ok(())
826    }
827}
828
829/// Gets the default value for the docker `cleanup` field.
830const fn cleanup_default() -> bool {
831    true
832}
833
834/// Represents configuration for the Docker backend.
835#[derive(Debug, Clone, Serialize, Deserialize)]
836#[serde(rename_all = "snake_case", deny_unknown_fields)]
837pub struct DockerBackendConfig {
838    /// Whether or not to remove a task's container after the task completes.
839    ///
840    /// Defaults to `true`.
841    #[serde(default = "cleanup_default")]
842    pub cleanup: bool,
843}
844
845impl DockerBackendConfig {
846    /// Validates the Docker backend configuration.
847    pub fn validate(&self) -> Result<()> {
848        Ok(())
849    }
850}
851
852impl Default for DockerBackendConfig {
853    fn default() -> Self {
854        Self { cleanup: true }
855    }
856}
857
858/// Represents HTTP basic authentication configuration.
859#[derive(Debug, Default, Clone, Serialize, Deserialize)]
860#[serde(rename_all = "snake_case", deny_unknown_fields)]
861pub struct BasicAuthConfig {
862    /// The HTTP basic authentication username.
863    #[serde(default)]
864    pub username: String,
865    /// The HTTP basic authentication password.
866    #[serde(default)]
867    pub password: SecretString,
868}
869
870impl BasicAuthConfig {
871    /// Validates the HTTP basic auth configuration.
872    pub fn validate(&self) -> Result<()> {
873        Ok(())
874    }
875
876    /// Redacts the secrets contained in the HTTP basic auth configuration.
877    pub fn redact(&mut self) {
878        self.password.redact();
879    }
880
881    /// Unredacts the secrets contained in the HTTP basic auth configuration.
882    pub fn unredact(&mut self) {
883        self.password.unredact();
884    }
885}
886
887/// Represents HTTP bearer token authentication configuration.
888#[derive(Debug, Default, Clone, Serialize, Deserialize)]
889#[serde(rename_all = "snake_case", deny_unknown_fields)]
890pub struct BearerAuthConfig {
891    /// The HTTP bearer authentication token.
892    #[serde(default)]
893    pub token: SecretString,
894}
895
896impl BearerAuthConfig {
897    /// Validates the HTTP bearer auth configuration.
898    pub fn validate(&self) -> Result<()> {
899        Ok(())
900    }
901
902    /// Redacts the secrets contained in the HTTP bearer auth configuration.
903    pub fn redact(&mut self) {
904        self.token.redact();
905    }
906
907    /// Unredacts the secrets contained in the HTTP bearer auth configuration.
908    pub fn unredact(&mut self) {
909        self.token.unredact();
910    }
911}
912
913/// Represents the kind of authentication for a TES backend.
914#[derive(Debug, Clone, Serialize, Deserialize)]
915#[serde(rename_all = "snake_case", tag = "type")]
916pub enum TesBackendAuthConfig {
917    /// Use basic authentication for the TES backend.
918    Basic(BasicAuthConfig),
919    /// Use bearer token authentication for the TES backend.
920    Bearer(BearerAuthConfig),
921}
922
923impl TesBackendAuthConfig {
924    /// Validates the TES backend authentication configuration.
925    pub fn validate(&self) -> Result<()> {
926        match self {
927            Self::Basic(config) => config.validate(),
928            Self::Bearer(config) => config.validate(),
929        }
930    }
931
932    /// Redacts the secrets contained in the TES backend authentication
933    /// configuration.
934    pub fn redact(&mut self) {
935        match self {
936            Self::Basic(auth) => auth.redact(),
937            Self::Bearer(auth) => auth.redact(),
938        }
939    }
940
941    /// Unredacts the secrets contained in the TES backend authentication
942    /// configuration.
943    pub fn unredact(&mut self) {
944        match self {
945            Self::Basic(auth) => auth.unredact(),
946            Self::Bearer(auth) => auth.unredact(),
947        }
948    }
949}
950
951/// Represents configuration for the Task Execution Service (TES) backend.
952#[derive(Debug, Default, Clone, Serialize, Deserialize)]
953#[serde(rename_all = "snake_case", deny_unknown_fields)]
954pub struct TesBackendConfig {
955    /// The URL of the Task Execution Service.
956    #[serde(default, skip_serializing_if = "Option::is_none")]
957    pub url: Option<Url>,
958
959    /// The authentication configuration for the TES backend.
960    #[serde(default, skip_serializing_if = "Option::is_none")]
961    pub auth: Option<TesBackendAuthConfig>,
962
963    /// The root cloud storage URL for storing inputs.
964    #[serde(default, skip_serializing_if = "Option::is_none")]
965    pub inputs: Option<Url>,
966
967    /// The root cloud storage URL for storing outputs.
968    #[serde(default, skip_serializing_if = "Option::is_none")]
969    pub outputs: Option<Url>,
970
971    /// The polling interval, in seconds, for checking task status.
972    ///
973    /// Defaults to 1 second.
974    #[serde(default, skip_serializing_if = "Option::is_none")]
975    pub interval: Option<u64>,
976
977    /// The number of retries after encountering an error communicating with the
978    /// TES server.
979    ///
980    /// Defaults to no retries.
981    pub retries: Option<u32>,
982
983    /// The maximum number of concurrent requests the backend will send to the
984    /// TES server.
985    ///
986    /// Defaults to 10 concurrent requests.
987    #[serde(default, skip_serializing_if = "Option::is_none")]
988    pub max_concurrency: Option<u32>,
989
990    /// Whether or not the TES server URL may use an insecure protocol like
991    /// HTTP.
992    #[serde(default)]
993    pub insecure: bool,
994}
995
996impl TesBackendConfig {
997    /// Validates the TES backend configuration.
998    pub fn validate(&self) -> Result<()> {
999        match &self.url {
1000            Some(url) => {
1001                if !self.insecure && url.scheme() != "https" {
1002                    bail!(
1003                        "TES backend configuration value `url` has invalid value `{url}`: URL \
1004                         must use a HTTPS scheme"
1005                    );
1006                }
1007            }
1008            None => bail!("TES backend configuration value `url` is required"),
1009        }
1010
1011        if let Some(auth) = &self.auth {
1012            auth.validate()?;
1013        }
1014
1015        if let Some(max_concurrency) = self.max_concurrency
1016            && max_concurrency == 0
1017        {
1018            bail!("TES backend configuration value `max_concurrency` cannot be zero");
1019        }
1020
1021        match &self.inputs {
1022            Some(url) => {
1023                if !is_url(url.as_str()) {
1024                    bail!(
1025                        "TES backend storage configuration value `inputs` has invalid value \
1026                         `{url}`: URL scheme is not supported"
1027                    );
1028                }
1029
1030                if !url.path().ends_with('/') {
1031                    bail!(
1032                        "TES backend storage configuration value `inputs` has invalid value \
1033                         `{url}`: URL path must end with a slash"
1034                    );
1035                }
1036            }
1037            None => bail!("TES backend configuration value `inputs` is required"),
1038        }
1039
1040        match &self.outputs {
1041            Some(url) => {
1042                if !is_url(url.as_str()) {
1043                    bail!(
1044                        "TES backend storage configuration value `outputs` has invalid value \
1045                         `{url}`: URL scheme is not supported"
1046                    );
1047                }
1048
1049                if !url.path().ends_with('/') {
1050                    bail!(
1051                        "TES backend storage configuration value `outputs` has invalid value \
1052                         `{url}`: URL path must end with a slash"
1053                    );
1054                }
1055            }
1056            None => bail!("TES backend storage configuration value `outputs` is required"),
1057        }
1058
1059        Ok(())
1060    }
1061
1062    /// Redacts the secrets contained in the TES backend configuration.
1063    pub fn redact(&mut self) {
1064        if let Some(auth) = &mut self.auth {
1065            auth.redact();
1066        }
1067    }
1068
1069    /// Unredacts the secrets contained in the TES backend configuration.
1070    pub fn unredact(&mut self) {
1071        if let Some(auth) = &mut self.auth {
1072            auth.unredact();
1073        }
1074    }
1075}
1076
1077#[cfg(test)]
1078mod test {
1079    use pretty_assertions::assert_eq;
1080
1081    use super::*;
1082
1083    #[test]
1084    fn redacted_secret() {
1085        let mut secret: SecretString = "secret".into();
1086
1087        assert_eq!(
1088            serde_json::to_string(&secret).unwrap(),
1089            format!(r#""{REDACTED}""#)
1090        );
1091
1092        secret.unredact();
1093        assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1094
1095        secret.redact();
1096        assert_eq!(
1097            serde_json::to_string(&secret).unwrap(),
1098            format!(r#""{REDACTED}""#)
1099        );
1100    }
1101
1102    #[test]
1103    fn redacted_config() {
1104        let config = Config {
1105            backends: [
1106                (
1107                    "first".to_string(),
1108                    BackendConfig::Tes(
1109                        TesBackendConfig {
1110                            auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1111                                username: "foo".into(),
1112                                password: "secret".into(),
1113                            })),
1114                            ..Default::default()
1115                        }
1116                        .into(),
1117                    ),
1118                ),
1119                (
1120                    "second".to_string(),
1121                    BackendConfig::Tes(
1122                        TesBackendConfig {
1123                            auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1124                                token: "secret".into(),
1125                            })),
1126                            ..Default::default()
1127                        }
1128                        .into(),
1129                    ),
1130                ),
1131            ]
1132            .into(),
1133            storage: StorageConfig {
1134                azure: AzureStorageConfig {
1135                    auth: [("foo".into(), [("bar".into(), "secret".into())].into())].into(),
1136                },
1137                s3: S3StorageConfig {
1138                    auth: Some(S3StorageAuthConfig {
1139                        access_key_id: "foo".into(),
1140                        secret_access_key: "secret".into(),
1141                    }),
1142                    ..Default::default()
1143                },
1144                google: GoogleStorageConfig {
1145                    auth: Some(GoogleStorageAuthConfig {
1146                        access_key: "foo".into(),
1147                        secret: "secret".into(),
1148                    }),
1149                },
1150            },
1151            ..Default::default()
1152        };
1153
1154        let json = serde_json::to_string_pretty(&config).unwrap();
1155        assert!(json.contains("secret"), "`{json}` contains a secret");
1156    }
1157
1158    #[tokio::test]
1159    async fn test_config_validate() {
1160        // Test invalid task config
1161        let mut config = Config::default();
1162        config.task.retries = Some(1000000);
1163        assert_eq!(
1164            config.validate().await.unwrap_err().to_string(),
1165            "configuration value `task.retries` cannot exceed 100"
1166        );
1167
1168        // Test invalid scatter concurrency config
1169        let mut config = Config::default();
1170        config.workflow.scatter.concurrency = Some(0);
1171        assert_eq!(
1172            config.validate().await.unwrap_err().to_string(),
1173            "configuration value `workflow.scatter.concurrency` cannot be zero"
1174        );
1175
1176        // Test invalid backend name
1177        let config = Config {
1178            backend: Some("foo".into()),
1179            ..Default::default()
1180        };
1181        assert_eq!(
1182            config.validate().await.unwrap_err().to_string(),
1183            "a backend named `foo` is not present in the configuration"
1184        );
1185        let config = Config {
1186            backend: Some("bar".into()),
1187            backends: [("foo".to_string(), BackendConfig::default())].into(),
1188            ..Default::default()
1189        };
1190        assert_eq!(
1191            config.validate().await.unwrap_err().to_string(),
1192            "a backend named `bar` is not present in the configuration"
1193        );
1194
1195        // Test a singular backend
1196        let config = Config {
1197            backends: [("foo".to_string(), BackendConfig::default())].into(),
1198            ..Default::default()
1199        };
1200        config.validate().await.expect("config should validate");
1201
1202        // Test invalid local backend cpu config
1203        let config = Config {
1204            backends: [(
1205                "default".to_string(),
1206                BackendConfig::Local(LocalBackendConfig {
1207                    cpu: Some(0),
1208                    ..Default::default()
1209                }),
1210            )]
1211            .into(),
1212            ..Default::default()
1213        };
1214        assert_eq!(
1215            config.validate().await.unwrap_err().to_string(),
1216            "local backend configuration value `cpu` cannot be zero"
1217        );
1218        let config = Config {
1219            backends: [(
1220                "default".to_string(),
1221                BackendConfig::Local(LocalBackendConfig {
1222                    cpu: Some(10000000),
1223                    ..Default::default()
1224                }),
1225            )]
1226            .into(),
1227            ..Default::default()
1228        };
1229        assert!(
1230            config
1231                .validate()
1232                .await
1233                .unwrap_err()
1234                .to_string()
1235                .starts_with(
1236                    "local backend configuration value `cpu` cannot exceed the virtual CPUs \
1237                     available to the host"
1238                )
1239        );
1240
1241        // Test invalid local backend memory config
1242        let config = Config {
1243            backends: [(
1244                "default".to_string(),
1245                BackendConfig::Local(LocalBackendConfig {
1246                    memory: Some("0 GiB".to_string()),
1247                    ..Default::default()
1248                }),
1249            )]
1250            .into(),
1251            ..Default::default()
1252        };
1253        assert_eq!(
1254            config.validate().await.unwrap_err().to_string(),
1255            "local backend configuration value `memory` cannot be zero"
1256        );
1257        let config = Config {
1258            backends: [(
1259                "default".to_string(),
1260                BackendConfig::Local(LocalBackendConfig {
1261                    memory: Some("100 meows".to_string()),
1262                    ..Default::default()
1263                }),
1264            )]
1265            .into(),
1266            ..Default::default()
1267        };
1268        assert_eq!(
1269            config.validate().await.unwrap_err().to_string(),
1270            "local backend configuration value `memory` has invalid value `100 meows`"
1271        );
1272
1273        let config = Config {
1274            backends: [(
1275                "default".to_string(),
1276                BackendConfig::Local(LocalBackendConfig {
1277                    memory: Some("1000 TiB".to_string()),
1278                    ..Default::default()
1279                }),
1280            )]
1281            .into(),
1282            ..Default::default()
1283        };
1284        assert!(
1285            config
1286                .validate()
1287                .await
1288                .unwrap_err()
1289                .to_string()
1290                .starts_with(
1291                    "local backend configuration value `memory` cannot exceed the total memory of \
1292                     the host"
1293                )
1294        );
1295
1296        // Test missing TES URL
1297        let config = Config {
1298            backends: [(
1299                "default".to_string(),
1300                BackendConfig::Tes(Default::default()),
1301            )]
1302            .into(),
1303            ..Default::default()
1304        };
1305        assert_eq!(
1306            config.validate().await.unwrap_err().to_string(),
1307            "TES backend configuration value `url` is required"
1308        );
1309
1310        // Test TES invalid max concurrency
1311        let config = Config {
1312            backends: [(
1313                "default".to_string(),
1314                BackendConfig::Tes(
1315                    TesBackendConfig {
1316                        url: Some("https://example.com".parse().unwrap()),
1317                        max_concurrency: Some(0),
1318                        ..Default::default()
1319                    }
1320                    .into(),
1321                ),
1322            )]
1323            .into(),
1324            ..Default::default()
1325        };
1326        assert_eq!(
1327            config.validate().await.unwrap_err().to_string(),
1328            "TES backend configuration value `max_concurrency` cannot be zero"
1329        );
1330
1331        // Insecure TES URL
1332        let config = Config {
1333            backends: [(
1334                "default".to_string(),
1335                BackendConfig::Tes(
1336                    TesBackendConfig {
1337                        url: Some("http://example.com".parse().unwrap()),
1338                        inputs: Some("http://example.com".parse().unwrap()),
1339                        outputs: Some("http://example.com".parse().unwrap()),
1340                        ..Default::default()
1341                    }
1342                    .into(),
1343                ),
1344            )]
1345            .into(),
1346            ..Default::default()
1347        };
1348        assert_eq!(
1349            config.validate().await.unwrap_err().to_string(),
1350            "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
1351             must use a HTTPS scheme"
1352        );
1353
1354        // Allow insecure URL
1355        let config = Config {
1356            backends: [(
1357                "default".to_string(),
1358                BackendConfig::Tes(
1359                    TesBackendConfig {
1360                        url: Some("http://example.com".parse().unwrap()),
1361                        inputs: Some("http://example.com".parse().unwrap()),
1362                        outputs: Some("http://example.com".parse().unwrap()),
1363                        insecure: true,
1364                        ..Default::default()
1365                    }
1366                    .into(),
1367                ),
1368            )]
1369            .into(),
1370            ..Default::default()
1371        };
1372        config
1373            .validate()
1374            .await
1375            .expect("configuration should validate");
1376
1377        let mut config = Config::default();
1378        config.http.parallelism = Some(0);
1379        assert_eq!(
1380            config.validate().await.unwrap_err().to_string(),
1381            "configuration value `http.parallelism` cannot be zero"
1382        );
1383
1384        let mut config = Config::default();
1385        config.http.parallelism = Some(5);
1386        assert!(
1387            config.validate().await.is_ok(),
1388            "should pass for valid configuration"
1389        );
1390
1391        let mut config = Config::default();
1392        config.http.parallelism = None;
1393        assert!(
1394            config.validate().await.is_ok(),
1395            "should pass for default (None)"
1396        );
1397    }
1398}