wdl_engine/
config.rs

1//! Implementation of engine configuration.
2
3use std::borrow::Cow;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use anyhow::Context;
8use anyhow::Result;
9use anyhow::anyhow;
10use anyhow::bail;
11use crankshaft::events::Event;
12use indexmap::IndexMap;
13use secrecy::ExposeSecret;
14use serde::Deserialize;
15use serde::Serialize;
16use tokio::sync::broadcast;
17use tracing::warn;
18use url::Url;
19
20use crate::DockerBackend;
21use crate::LocalBackend;
22use crate::SYSTEM;
23use crate::TaskExecutionBackend;
24use crate::TesBackend;
25use crate::convert_unit_string;
26use crate::path::is_url;
27
28/// The inclusive maximum number of task retries the engine supports.
29pub const MAX_RETRIES: u64 = 100;
30
31/// The default task shell.
32pub const DEFAULT_TASK_SHELL: &str = "bash";
33
34/// The default backend name.
35pub const DEFAULT_BACKEND_NAME: &str = "default";
36
37/// The string that replaces redacted serialization fields.
38const REDACTED: &str = "<REDACTED>";
39
40/// Represents a secret string that is, by default, redacted for serialization.
41///
42/// This type is a wrapper around [`secrecy::SecretString`].
43#[derive(Debug, Clone)]
44pub struct SecretString {
45    /// The inner secret string.
46    ///
47    /// This type is not serializable.
48    inner: secrecy::SecretString,
49    /// Whether or not the secret string is redacted for serialization.
50    ///
51    /// If `true` (the default), `<REDACTED>` is serialized for the string's
52    /// value.
53    ///
54    /// If `false`, the inner secret string is exposed for serialization.
55    redacted: bool,
56}
57
58impl SecretString {
59    /// Redacts the secret for serialization.
60    ///
61    /// By default, a [`SecretString`] is redacted; when redacted, the string is
62    /// replaced with `<REDACTED>` when serialized.
63    pub fn redact(&mut self) {
64        self.redacted = true;
65    }
66
67    /// Unredacts the secret for serialization.
68    pub fn unredact(&mut self) {
69        self.redacted = false;
70    }
71
72    /// Gets the inner [`secrecy::SecretString`].
73    pub fn inner(&self) -> &secrecy::SecretString {
74        &self.inner
75    }
76}
77
78impl From<String> for SecretString {
79    fn from(s: String) -> Self {
80        Self {
81            inner: s.into(),
82            redacted: true,
83        }
84    }
85}
86
87impl From<&str> for SecretString {
88    fn from(s: &str) -> Self {
89        Self {
90            inner: s.into(),
91            redacted: true,
92        }
93    }
94}
95
96impl Default for SecretString {
97    fn default() -> Self {
98        Self {
99            inner: Default::default(),
100            redacted: true,
101        }
102    }
103}
104
105impl serde::Serialize for SecretString {
106    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
107    where
108        S: serde::Serializer,
109    {
110        use secrecy::ExposeSecret;
111
112        if self.redacted {
113            serializer.serialize_str(REDACTED)
114        } else {
115            serializer.serialize_str(self.inner.expose_secret())
116        }
117    }
118}
119
120impl<'de> serde::Deserialize<'de> for SecretString {
121    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
122    where
123        D: serde::Deserializer<'de>,
124    {
125        let inner = secrecy::SecretString::deserialize(deserializer)?;
126        Ok(Self {
127            inner,
128            redacted: true,
129        })
130    }
131}
132
133/// Represents WDL evaluation configuration.
134///
135/// <div class="warning">
136///
137/// By default, serialization of [`Config`] will redact the values of secrets.
138///
139/// Use the [`Config::unredact`] method before serialization to prevent the
140/// secrets from being redacted.
141///
142/// </div>
143#[derive(Debug, Default, Clone, Serialize, Deserialize)]
144#[serde(rename_all = "snake_case", deny_unknown_fields)]
145pub struct Config {
146    /// HTTP configuration.
147    #[serde(default)]
148    pub http: HttpConfig,
149    /// Workflow evaluation configuration.
150    #[serde(default)]
151    pub workflow: WorkflowConfig,
152    /// Task evaluation configuration.
153    #[serde(default)]
154    pub task: TaskConfig,
155    /// The name of the backend to use.
156    ///
157    /// If not specified and `backends` has multiple entries, it will use a name
158    /// of `default`.
159    #[serde(skip_serializing_if = "Option::is_none")]
160    pub backend: Option<String>,
161    /// Task execution backends configuration.
162    ///
163    /// If the collection is empty and `backend` is not specified, the engine
164    /// default backend is used.
165    ///
166    /// If the collection has exactly one entry and `backend` is not specified,
167    /// the singular entry will be used.
168    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
169    pub backends: IndexMap<String, BackendConfig>,
170    /// Storage configuration.
171    #[serde(default)]
172    pub storage: StorageConfig,
173    /// (Experimental) Avoid environment-specific output; default is `false`.
174    ///
175    /// If this option is `true`, selected error messages and log output will
176    /// avoid emitting environment-specific output such as absolute paths
177    /// and system resource counts.
178    ///
179    /// This is largely meant to support "golden testing" where a test's success
180    /// depends on matching an expected set of outputs exactly. Cues that
181    /// help users overcome errors, such as the path to a temporary
182    /// directory or the number of CPUs available to the system, confound this
183    /// style of testing. This flag is a best-effort experimental attempt to
184    /// reduce the impact of these differences in order to allow a wider
185    /// range of golden tests to be written.
186    #[serde(default)]
187    pub suppress_env_specific_output: bool,
188}
189
190impl Config {
191    /// Validates the evaluation configuration.
192    pub fn validate(&self) -> Result<()> {
193        self.http.validate()?;
194        self.workflow.validate()?;
195        self.task.validate()?;
196
197        if self.backend.is_none() && self.backends.len() < 2 {
198            // This is OK, we'll use either the singular backends entry (1) or
199            // the default (0)
200        } else {
201            // Check the backends map for the backend name (or "default")
202            let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
203            if !self.backends.contains_key(backend) {
204                bail!("a backend named `{backend}` is not present in the configuration");
205            }
206        }
207
208        for backend in self.backends.values() {
209            backend.validate()?;
210        }
211
212        self.storage.validate()?;
213        Ok(())
214    }
215
216    /// Redacts the secrets contained in the configuration.
217    ///
218    /// By default, secrets are redacted for serialization.
219    pub fn redact(&mut self) {
220        for backend in self.backends.values_mut() {
221            backend.redact();
222        }
223
224        self.storage.azure.redact();
225
226        if let Some(auth) = &mut self.storage.s3.auth {
227            auth.redact();
228        }
229
230        if let Some(auth) = &mut self.storage.google.auth {
231            auth.redact();
232        }
233    }
234
235    /// Unredacts the secrets contained in the configuration.
236    ///
237    /// Calling this method will expose secrets for serialization.
238    pub fn unredact(&mut self) {
239        for backend in self.backends.values_mut() {
240            backend.unredact();
241        }
242
243        self.storage.azure.unredact();
244
245        if let Some(auth) = &mut self.storage.s3.auth {
246            auth.unredact();
247        }
248
249        if let Some(auth) = &mut self.storage.google.auth {
250            auth.unredact();
251        }
252    }
253
254    /// Creates a new task execution backend based on this configuration.
255    pub async fn create_backend(
256        self: &Arc<Self>,
257        events: Option<broadcast::Sender<Event>>,
258    ) -> Result<Arc<dyn TaskExecutionBackend>> {
259        let config = if self.backend.is_none() && self.backends.len() < 2 {
260            if self.backends.len() == 1 {
261                // Use the singular entry
262                Cow::Borrowed(self.backends.values().next().unwrap())
263            } else {
264                // Use the default
265                Cow::Owned(BackendConfig::default())
266            }
267        } else {
268            // Lookup the backend to use
269            let backend = self.backend.as_deref().unwrap_or(DEFAULT_BACKEND_NAME);
270            Cow::Borrowed(self.backends.get(backend).ok_or_else(|| {
271                anyhow!("a backend named `{backend}` is not present in the configuration")
272            })?)
273        };
274
275        match config.as_ref() {
276            BackendConfig::Local(config) => {
277                warn!(
278                    "the engine is configured to use the local backend: tasks will not be run \
279                     inside of a container"
280                );
281                Ok(Arc::new(LocalBackend::new(self.clone(), config, events)?))
282            }
283            BackendConfig::Docker(config) => Ok(Arc::new(
284                DockerBackend::new(self.clone(), config, events).await?,
285            )),
286            BackendConfig::Tes(config) => Ok(Arc::new(
287                TesBackend::new(self.clone(), config, events).await?,
288            )),
289        }
290    }
291}
292
293/// Represents HTTP configuration.
294#[derive(Debug, Default, Clone, Serialize, Deserialize)]
295#[serde(rename_all = "snake_case", deny_unknown_fields)]
296pub struct HttpConfig {
297    /// The HTTP download cache location.
298    ///
299    /// Defaults to using the system cache directory.
300    #[serde(default, skip_serializing_if = "Option::is_none")]
301    pub cache: Option<PathBuf>,
302    /// The number of retries for transferring files.
303    ///
304    /// Defaults to `5`.
305    #[serde(default, skip_serializing_if = "Option::is_none")]
306    pub retries: Option<usize>,
307    /// The maximum parallelism for file transfers.
308    ///
309    /// Defaults to the host's available parallelism.
310    #[serde(default, skip_serializing_if = "Option::is_none")]
311    pub parallelism: Option<usize>,
312}
313
314impl HttpConfig {
315    /// Validates the HTTP configuration.
316    pub fn validate(&self) -> Result<()> {
317        if let Some(parallelism) = self.parallelism
318            && parallelism == 0
319        {
320            bail!("configuration value `http.parallelism` cannot be zero");
321        }
322        Ok(())
323    }
324}
325
326/// Represents storage configuration.
327#[derive(Debug, Default, Clone, Serialize, Deserialize)]
328#[serde(rename_all = "snake_case", deny_unknown_fields)]
329pub struct StorageConfig {
330    /// Azure Blob Storage configuration.
331    #[serde(default)]
332    pub azure: AzureStorageConfig,
333    /// AWS S3 configuration.
334    #[serde(default)]
335    pub s3: S3StorageConfig,
336    /// Google Cloud Storage configuration.
337    #[serde(default)]
338    pub google: GoogleStorageConfig,
339}
340
341impl StorageConfig {
342    /// Validates the HTTP configuration.
343    pub fn validate(&self) -> Result<()> {
344        self.azure.validate()?;
345        self.s3.validate()?;
346        self.google.validate()?;
347        Ok(())
348    }
349}
350
351/// Represents configuration for Azure Blob Storage.
352#[derive(Debug, Default, Clone, Serialize, Deserialize)]
353#[serde(rename_all = "snake_case", deny_unknown_fields)]
354pub struct AzureStorageConfig {
355    /// The Azure Blob Storage authentication configuration.
356    ///
357    /// The key for the outer map is the Azure Storage account name.
358    ///
359    /// The key for the inner map is the Azure Storage container name.
360    ///
361    /// The value for the inner map is the SAS token to apply for requests to
362    /// the Azure Storage container.
363    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
364    pub auth: IndexMap<String, IndexMap<String, SecretString>>,
365}
366
367impl AzureStorageConfig {
368    /// Validates the Azure Blob Storage configuration.
369    pub fn validate(&self) -> Result<()> {
370        Ok(())
371    }
372
373    /// Redacts the secrets contained in the Azure Blob Storage configuration.
374    pub fn redact(&mut self) {
375        for v in self.auth.values_mut() {
376            for v in v.values_mut() {
377                v.redact();
378            }
379        }
380    }
381
382    /// Unredacts the secrets contained in the Azure Blob Storage configuration.
383    pub fn unredact(&mut self) {
384        for v in self.auth.values_mut() {
385            for v in v.values_mut() {
386                v.unredact();
387            }
388        }
389    }
390}
391
392/// Represents authentication information for AWS S3 storage.
393#[derive(Debug, Default, Clone, Serialize, Deserialize)]
394#[serde(rename_all = "snake_case", deny_unknown_fields)]
395pub struct S3StorageAuthConfig {
396    /// The AWS Access Key ID to use.
397    pub access_key_id: String,
398    /// The AWS Secret Access Key to use.
399    pub secret_access_key: SecretString,
400}
401
402impl S3StorageAuthConfig {
403    /// Validates the AWS S3 storage authentication configuration.
404    pub fn validate(&self) -> Result<()> {
405        if self.access_key_id.is_empty() {
406            bail!("configuration value `storage.s3.auth.access_key_id` is required");
407        }
408
409        if self.secret_access_key.inner.expose_secret().is_empty() {
410            bail!("configuration value `storage.s3.auth.secret_access_key` is required");
411        }
412
413        Ok(())
414    }
415
416    /// Redacts the secrets contained in the AWS S3 storage authentication
417    /// configuration.
418    pub fn redact(&mut self) {
419        self.secret_access_key.redact();
420    }
421
422    /// Unredacts the secrets contained in the AWS S3 storage authentication
423    /// configuration.
424    pub fn unredact(&mut self) {
425        self.secret_access_key.unredact();
426    }
427}
428
429/// Represents configuration for AWS S3 storage.
430#[derive(Debug, Default, Clone, Serialize, Deserialize)]
431#[serde(rename_all = "snake_case", deny_unknown_fields)]
432pub struct S3StorageConfig {
433    /// The default region to use for S3-schemed URLs (e.g.
434    /// `s3://<bucket>/<blob>`).
435    ///
436    /// Defaults to `us-east-1`.
437    #[serde(default, skip_serializing_if = "Option::is_none")]
438    pub region: Option<String>,
439
440    /// The AWS S3 storage authentication configuration.
441    #[serde(default, skip_serializing_if = "Option::is_none")]
442    pub auth: Option<S3StorageAuthConfig>,
443}
444
445impl S3StorageConfig {
446    /// Validates the AWS S3 storage configuration.
447    pub fn validate(&self) -> Result<()> {
448        if let Some(auth) = &self.auth {
449            auth.validate()?;
450        }
451
452        Ok(())
453    }
454}
455
456/// Represents authentication information for Google Cloud Storage.
457#[derive(Debug, Default, Clone, Serialize, Deserialize)]
458#[serde(rename_all = "snake_case", deny_unknown_fields)]
459pub struct GoogleStorageAuthConfig {
460    /// The HMAC Access Key to use.
461    pub access_key: String,
462    /// The HMAC Secret to use.
463    pub secret: SecretString,
464}
465
466impl GoogleStorageAuthConfig {
467    /// Validates the Google Cloud Storage authentication configuration.
468    pub fn validate(&self) -> Result<()> {
469        if self.access_key.is_empty() {
470            bail!("configuration value `storage.google.auth.access_key` is required");
471        }
472
473        if self.secret.inner.expose_secret().is_empty() {
474            bail!("configuration value `storage.google.auth.secret` is required");
475        }
476
477        Ok(())
478    }
479
480    /// Redacts the secrets contained in the Google Cloud Storage authentication
481    /// configuration.
482    pub fn redact(&mut self) {
483        self.secret.redact();
484    }
485
486    /// Unredacts the secrets contained in the Google Cloud Storage
487    /// authentication configuration.
488    pub fn unredact(&mut self) {
489        self.secret.unredact();
490    }
491}
492
493/// Represents configuration for Google Cloud Storage.
494#[derive(Debug, Default, Clone, Serialize, Deserialize)]
495#[serde(rename_all = "snake_case", deny_unknown_fields)]
496pub struct GoogleStorageConfig {
497    /// The Google Cloud Storage authentication configuration.
498    #[serde(default, skip_serializing_if = "Option::is_none")]
499    pub auth: Option<GoogleStorageAuthConfig>,
500}
501
502impl GoogleStorageConfig {
503    /// Validates the Google Cloud Storage configuration.
504    pub fn validate(&self) -> Result<()> {
505        if let Some(auth) = &self.auth {
506            auth.validate()?;
507        }
508
509        Ok(())
510    }
511}
512
513/// Represents workflow evaluation configuration.
514#[derive(Debug, Default, Clone, Serialize, Deserialize)]
515#[serde(rename_all = "snake_case", deny_unknown_fields)]
516pub struct WorkflowConfig {
517    /// Scatter statement evaluation configuration.
518    #[serde(default)]
519    pub scatter: ScatterConfig,
520}
521
522impl WorkflowConfig {
523    /// Validates the workflow configuration.
524    pub fn validate(&self) -> Result<()> {
525        self.scatter.validate()?;
526        Ok(())
527    }
528}
529
530/// Represents scatter statement evaluation configuration.
531#[derive(Debug, Default, Clone, Serialize, Deserialize)]
532#[serde(rename_all = "snake_case", deny_unknown_fields)]
533pub struct ScatterConfig {
534    /// The number of scatter array elements to process concurrently.
535    ///
536    /// By default, the value is the parallelism supported by the task
537    /// execution backend.
538    ///
539    /// A value of `0` is invalid.
540    ///
541    /// Lower values use less memory for evaluation and higher values may better
542    /// saturate the task execution backend with tasks to execute.
543    ///
544    /// This setting does not change how many tasks an execution backend can run
545    /// concurrently, but may affect how many tasks are sent to the backend to
546    /// run at a time.
547    ///
548    /// For example, if `concurrency` was set to 10 and we evaluate the
549    /// following scatters:
550    ///
551    /// ```wdl
552    /// scatter (i in range(100)) {
553    ///     call my_task
554    /// }
555    ///
556    /// scatter (j in range(100)) {
557    ///     call my_task as my_task2
558    /// }
559    /// ```
560    ///
561    /// Here each scatter is independent and therefore there will be 20 calls
562    /// (10 for each scatter) made concurrently. If the task execution
563    /// backend can only execute 5 tasks concurrently, 5 tasks will execute
564    /// and 15 will be "ready" to execute and waiting for an executing task
565    /// to complete.
566    ///
567    /// If instead we evaluate the following scatters:
568    ///
569    /// ```wdl
570    /// scatter (i in range(100)) {
571    ///     scatter (j in range(100)) {
572    ///         call my_task
573    ///     }
574    /// }
575    /// ```
576    ///
577    /// Then there will be 100 calls (10*10 as 10 are made for each outer
578    /// element) made concurrently. If the task execution backend can only
579    /// execute 5 tasks concurrently, 5 tasks will execute and 95 will be
580    /// "ready" to execute and waiting for an executing task to complete.
581    ///
582    /// <div class="warning">
583    /// Warning: nested scatter statements cause exponential memory usage based
584    /// on this value, as each scatter statement evaluation requires allocating
585    /// new scopes for scatter array elements being processed. </div>
586    #[serde(default, skip_serializing_if = "Option::is_none")]
587    pub concurrency: Option<u64>,
588}
589
590impl ScatterConfig {
591    /// Validates the scatter configuration.
592    pub fn validate(&self) -> Result<()> {
593        if let Some(concurrency) = self.concurrency
594            && concurrency == 0
595        {
596            bail!("configuration value `workflow.scatter.concurrency` cannot be zero");
597        }
598
599        Ok(())
600    }
601}
602
603/// Represents task evaluation configuration.
604#[derive(Debug, Default, Clone, Serialize, Deserialize)]
605#[serde(rename_all = "snake_case", deny_unknown_fields)]
606pub struct TaskConfig {
607    /// The default maximum number of retries to attempt if a task fails.
608    ///
609    /// A task's `max_retries` requirement will override this value.
610    ///
611    /// Defaults to 0 (no retries).
612    #[serde(default, skip_serializing_if = "Option::is_none")]
613    pub retries: Option<u64>,
614    /// The default container to use if a container is not specified in a task's
615    /// requirements.
616    ///
617    /// Defaults to `ubuntu:latest`.
618    #[serde(default, skip_serializing_if = "Option::is_none")]
619    pub container: Option<String>,
620    /// The default shell to use for tasks.
621    ///
622    /// Defaults to `bash`.
623    ///
624    /// <div class="warning">
625    /// Warning: the use of a shell other than `bash` may lead to tasks that may
626    /// not be portable to other execution engines.</div>
627    #[serde(default, skip_serializing_if = "Option::is_none")]
628    pub shell: Option<String>,
629    /// The behavior when a task's `cpu` requirement cannot be met.
630    #[serde(default)]
631    pub cpu_limit_behavior: TaskResourceLimitBehavior,
632    /// The behavior when a task's `memory` requirement cannot be met.
633    #[serde(default)]
634    pub memory_limit_behavior: TaskResourceLimitBehavior,
635}
636
637impl TaskConfig {
638    /// Validates the task evaluation configuration.
639    pub fn validate(&self) -> Result<()> {
640        if self.retries.unwrap_or(0) > MAX_RETRIES {
641            bail!("configuration value `task.retries` cannot exceed {MAX_RETRIES}");
642        }
643
644        Ok(())
645    }
646}
647
648/// The behavior when a task resource requirement, such as `cpu` or `memory`,
649/// cannot be met.
650#[derive(Debug, Default, Clone, Serialize, Deserialize)]
651#[serde(rename_all = "snake_case", deny_unknown_fields)]
652pub enum TaskResourceLimitBehavior {
653    /// Try executing a task with the maximum amount of the resource available
654    /// when the task's corresponding requirement cannot be met.
655    TryWithMax,
656    /// Do not execute a task if its corresponding requirement cannot be met.
657    ///
658    /// This is the default behavior.
659    #[default]
660    Deny,
661}
662
663/// Represents supported task execution backends.
664#[derive(Debug, Clone, Serialize, Deserialize)]
665#[serde(rename_all = "snake_case", tag = "type")]
666pub enum BackendConfig {
667    /// Use the local task execution backend.
668    Local(LocalBackendConfig),
669    /// Use the Docker task execution backend.
670    Docker(DockerBackendConfig),
671    /// Use the TES task execution backend.
672    Tes(Box<TesBackendConfig>),
673}
674
675impl Default for BackendConfig {
676    fn default() -> Self {
677        Self::Docker(Default::default())
678    }
679}
680
681impl BackendConfig {
682    /// Validates the backend configuration.
683    pub fn validate(&self) -> Result<()> {
684        match self {
685            Self::Local(config) => config.validate(),
686            Self::Docker(config) => config.validate(),
687            Self::Tes(config) => config.validate(),
688        }
689    }
690
691    /// Converts the backend configuration into a local backend configuration
692    ///
693    /// Returns `None` if the backend configuration is not local.
694    pub fn as_local(&self) -> Option<&LocalBackendConfig> {
695        match self {
696            Self::Local(config) => Some(config),
697            _ => None,
698        }
699    }
700
701    /// Converts the backend configuration into a Docker backend configuration
702    ///
703    /// Returns `None` if the backend configuration is not Docker.
704    pub fn as_docker(&self) -> Option<&DockerBackendConfig> {
705        match self {
706            Self::Docker(config) => Some(config),
707            _ => None,
708        }
709    }
710
711    /// Converts the backend configuration into a TES backend configuration
712    ///
713    /// Returns `None` if the backend configuration is not TES.
714    pub fn as_tes(&self) -> Option<&TesBackendConfig> {
715        match self {
716            Self::Tes(config) => Some(config),
717            _ => None,
718        }
719    }
720
721    /// Redacts the secrets contained in the backend configuration.
722    pub fn redact(&mut self) {
723        match self {
724            Self::Local(_) | Self::Docker(_) => {}
725            Self::Tes(config) => config.redact(),
726        }
727    }
728
729    /// Unredacts the secrets contained in the backend configuration.
730    pub fn unredact(&mut self) {
731        match self {
732            Self::Local(_) | Self::Docker(_) => {}
733            Self::Tes(config) => config.unredact(),
734        }
735    }
736}
737
738/// Represents configuration for the local task execution backend.
739///
740/// <div class="warning">
741/// Warning: the local task execution backend spawns processes on the host
742/// directly without the use of a container; only use this backend on trusted
743/// WDL. </div>
744#[derive(Debug, Default, Clone, Serialize, Deserialize)]
745#[serde(rename_all = "snake_case", deny_unknown_fields)]
746pub struct LocalBackendConfig {
747    /// Set the number of CPUs available for task execution.
748    ///
749    /// Defaults to the number of logical CPUs for the host.
750    ///
751    /// The value cannot be zero or exceed the host's number of CPUs.
752    #[serde(default, skip_serializing_if = "Option::is_none")]
753    pub cpu: Option<u64>,
754
755    /// Set the total amount of memory for task execution as a unit string (e.g.
756    /// `2 GiB`).
757    ///
758    /// Defaults to the total amount of memory for the host.
759    ///
760    /// The value cannot be zero or exceed the host's total amount of memory.
761    #[serde(default, skip_serializing_if = "Option::is_none")]
762    pub memory: Option<String>,
763}
764
765impl LocalBackendConfig {
766    /// Validates the local task execution backend configuration.
767    pub fn validate(&self) -> Result<()> {
768        if let Some(cpu) = self.cpu {
769            if cpu == 0 {
770                bail!("local backend configuration value `cpu` cannot be zero");
771            }
772
773            let total = SYSTEM.cpus().len() as u64;
774            if cpu > total {
775                bail!(
776                    "local backend configuration value `cpu` cannot exceed the virtual CPUs \
777                     available to the host ({total})"
778                );
779            }
780        }
781
782        if let Some(memory) = &self.memory {
783            let memory = convert_unit_string(memory).with_context(|| {
784                format!("local backend configuration value `memory` has invalid value `{memory}`")
785            })?;
786
787            if memory == 0 {
788                bail!("local backend configuration value `memory` cannot be zero");
789            }
790
791            let total = SYSTEM.total_memory();
792            if memory > total {
793                bail!(
794                    "local backend configuration value `memory` cannot exceed the total memory of \
795                     the host ({total} bytes)"
796                );
797            }
798        }
799
800        Ok(())
801    }
802}
803
804/// Gets the default value for the docker `cleanup` field.
805const fn cleanup_default() -> bool {
806    true
807}
808
809/// Represents configuration for the Docker backend.
810#[derive(Debug, Clone, Serialize, Deserialize)]
811#[serde(rename_all = "snake_case", deny_unknown_fields)]
812pub struct DockerBackendConfig {
813    /// Whether or not to remove a task's container after the task completes.
814    ///
815    /// Defaults to `true`.
816    #[serde(default = "cleanup_default")]
817    pub cleanup: bool,
818}
819
820impl DockerBackendConfig {
821    /// Validates the Docker backend configuration.
822    pub fn validate(&self) -> Result<()> {
823        Ok(())
824    }
825}
826
827impl Default for DockerBackendConfig {
828    fn default() -> Self {
829        Self { cleanup: true }
830    }
831}
832
833/// Represents HTTP basic authentication configuration.
834#[derive(Debug, Default, Clone, Serialize, Deserialize)]
835#[serde(rename_all = "snake_case", deny_unknown_fields)]
836pub struct BasicAuthConfig {
837    /// The HTTP basic authentication username.
838    #[serde(default)]
839    pub username: String,
840    /// The HTTP basic authentication password.
841    #[serde(default)]
842    pub password: SecretString,
843}
844
845impl BasicAuthConfig {
846    /// Validates the HTTP basic auth configuration.
847    pub fn validate(&self) -> Result<()> {
848        Ok(())
849    }
850
851    /// Redacts the secrets contained in the HTTP basic auth configuration.
852    pub fn redact(&mut self) {
853        self.password.redact();
854    }
855
856    /// Unredacts the secrets contained in the HTTP basic auth configuration.
857    pub fn unredact(&mut self) {
858        self.password.unredact();
859    }
860}
861
862/// Represents HTTP bearer token authentication configuration.
863#[derive(Debug, Default, Clone, Serialize, Deserialize)]
864#[serde(rename_all = "snake_case", deny_unknown_fields)]
865pub struct BearerAuthConfig {
866    /// The HTTP bearer authentication token.
867    #[serde(default)]
868    pub token: SecretString,
869}
870
871impl BearerAuthConfig {
872    /// Validates the HTTP bearer auth configuration.
873    pub fn validate(&self) -> Result<()> {
874        Ok(())
875    }
876
877    /// Redacts the secrets contained in the HTTP bearer auth configuration.
878    pub fn redact(&mut self) {
879        self.token.redact();
880    }
881
882    /// Unredacts the secrets contained in the HTTP bearer auth configuration.
883    pub fn unredact(&mut self) {
884        self.token.unredact();
885    }
886}
887
888/// Represents the kind of authentication for a TES backend.
889#[derive(Debug, Clone, Serialize, Deserialize)]
890#[serde(rename_all = "snake_case", tag = "type")]
891pub enum TesBackendAuthConfig {
892    /// Use basic authentication for the TES backend.
893    Basic(BasicAuthConfig),
894    /// Use bearer token authentication for the TES backend.
895    Bearer(BearerAuthConfig),
896}
897
898impl TesBackendAuthConfig {
899    /// Validates the TES backend authentication configuration.
900    pub fn validate(&self) -> Result<()> {
901        match self {
902            Self::Basic(config) => config.validate(),
903            Self::Bearer(config) => config.validate(),
904        }
905    }
906
907    /// Redacts the secrets contained in the TES backend authentication
908    /// configuration.
909    pub fn redact(&mut self) {
910        match self {
911            Self::Basic(auth) => auth.redact(),
912            Self::Bearer(auth) => auth.redact(),
913        }
914    }
915
916    /// Unredacts the secrets contained in the TES backend authentication
917    /// configuration.
918    pub fn unredact(&mut self) {
919        match self {
920            Self::Basic(auth) => auth.unredact(),
921            Self::Bearer(auth) => auth.unredact(),
922        }
923    }
924}
925
926/// Represents configuration for the Task Execution Service (TES) backend.
927#[derive(Debug, Default, Clone, Serialize, Deserialize)]
928#[serde(rename_all = "snake_case", deny_unknown_fields)]
929pub struct TesBackendConfig {
930    /// The URL of the Task Execution Service.
931    #[serde(default, skip_serializing_if = "Option::is_none")]
932    pub url: Option<Url>,
933
934    /// The authentication configuration for the TES backend.
935    #[serde(default, skip_serializing_if = "Option::is_none")]
936    pub auth: Option<TesBackendAuthConfig>,
937
938    /// The root cloud storage URL for storing inputs.
939    #[serde(default, skip_serializing_if = "Option::is_none")]
940    pub inputs: Option<Url>,
941
942    /// The root cloud storage URL for storing outputs.
943    #[serde(default, skip_serializing_if = "Option::is_none")]
944    pub outputs: Option<Url>,
945
946    /// The polling interval, in seconds, for checking task status.
947    ///
948    /// Defaults to 60 second.
949    #[serde(default, skip_serializing_if = "Option::is_none")]
950    pub interval: Option<u64>,
951
952    /// The maximum task concurrency for the backend.
953    ///
954    /// Defaults to unlimited.
955    #[serde(default, skip_serializing_if = "Option::is_none")]
956    pub max_concurrency: Option<u64>,
957
958    /// Whether or not the TES server URL may use an insecure protocol like
959    /// HTTP.
960    #[serde(default)]
961    pub insecure: bool,
962}
963
964impl TesBackendConfig {
965    /// Validates the TES backend configuration.
966    pub fn validate(&self) -> Result<()> {
967        match &self.url {
968            Some(url) => {
969                if !self.insecure && url.scheme() != "https" {
970                    bail!(
971                        "TES backend configuration value `url` has invalid value `{url}`: URL \
972                         must use a HTTPS scheme"
973                    );
974                }
975            }
976            None => bail!("TES backend configuration value `url` is required"),
977        }
978
979        if let Some(auth) = &self.auth {
980            auth.validate()?;
981        }
982
983        match &self.inputs {
984            Some(url) => {
985                if !is_url(url.as_str()) {
986                    bail!(
987                        "TES backend storage configuration value `inputs` has invalid value \
988                         `{url}`: URL scheme is not supported"
989                    );
990                }
991
992                if !url.path().ends_with('/') {
993                    bail!(
994                        "TES backend storage configuration value `inputs` has invalid value \
995                         `{url}`: URL path must end with a slash"
996                    );
997                }
998            }
999            None => bail!("TES backend configuration value `inputs` is required"),
1000        }
1001
1002        match &self.outputs {
1003            Some(url) => {
1004                if !is_url(url.as_str()) {
1005                    bail!(
1006                        "TES backend storage configuration value `outputs` has invalid value \
1007                         `{url}`: URL scheme is not supported"
1008                    );
1009                }
1010
1011                if !url.path().ends_with('/') {
1012                    bail!(
1013                        "TES backend storage configuration value `outputs` has invalid value \
1014                         `{url}`: URL path must end with a slash"
1015                    );
1016                }
1017            }
1018            None => bail!("TES backend storage configuration value `outputs` is required"),
1019        }
1020
1021        Ok(())
1022    }
1023
1024    /// Redacts the secrets contained in the TES backend configuration.
1025    pub fn redact(&mut self) {
1026        if let Some(auth) = &mut self.auth {
1027            auth.redact();
1028        }
1029    }
1030
1031    /// Unredacts the secrets contained in the TES backend configuration.
1032    pub fn unredact(&mut self) {
1033        if let Some(auth) = &mut self.auth {
1034            auth.unredact();
1035        }
1036    }
1037}
1038
1039#[cfg(test)]
1040mod test {
1041    use pretty_assertions::assert_eq;
1042
1043    use super::*;
1044
1045    #[test]
1046    fn redacted_secret() {
1047        let mut secret: SecretString = "secret".into();
1048
1049        assert_eq!(
1050            serde_json::to_string(&secret).unwrap(),
1051            format!(r#""{REDACTED}""#)
1052        );
1053
1054        secret.unredact();
1055        assert_eq!(serde_json::to_string(&secret).unwrap(), r#""secret""#);
1056
1057        secret.redact();
1058        assert_eq!(
1059            serde_json::to_string(&secret).unwrap(),
1060            format!(r#""{REDACTED}""#)
1061        );
1062    }
1063
1064    #[test]
1065    fn redacted_config() {
1066        let config = Config {
1067            backends: [
1068                (
1069                    "first".to_string(),
1070                    BackendConfig::Tes(
1071                        TesBackendConfig {
1072                            auth: Some(TesBackendAuthConfig::Basic(BasicAuthConfig {
1073                                username: "foo".into(),
1074                                password: "secret".into(),
1075                            })),
1076                            ..Default::default()
1077                        }
1078                        .into(),
1079                    ),
1080                ),
1081                (
1082                    "second".to_string(),
1083                    BackendConfig::Tes(
1084                        TesBackendConfig {
1085                            auth: Some(TesBackendAuthConfig::Bearer(BearerAuthConfig {
1086                                token: "secret".into(),
1087                            })),
1088                            ..Default::default()
1089                        }
1090                        .into(),
1091                    ),
1092                ),
1093            ]
1094            .into(),
1095            storage: StorageConfig {
1096                azure: AzureStorageConfig {
1097                    auth: [("foo".into(), [("bar".into(), "secret".into())].into())].into(),
1098                },
1099                s3: S3StorageConfig {
1100                    auth: Some(S3StorageAuthConfig {
1101                        access_key_id: "foo".into(),
1102                        secret_access_key: "secret".into(),
1103                    }),
1104                    ..Default::default()
1105                },
1106                google: GoogleStorageConfig {
1107                    auth: Some(GoogleStorageAuthConfig {
1108                        access_key: "foo".into(),
1109                        secret: "secret".into(),
1110                    }),
1111                },
1112            },
1113            ..Default::default()
1114        };
1115
1116        let json = serde_json::to_string_pretty(&config).unwrap();
1117        assert!(json.contains("secret"), "`{json}` contains a secret");
1118    }
1119
1120    #[test]
1121    fn test_config_validate() {
1122        // Test invalid task config
1123        let mut config = Config::default();
1124        config.task.retries = Some(1000000);
1125        assert_eq!(
1126            config.validate().unwrap_err().to_string(),
1127            "configuration value `task.retries` cannot exceed 100"
1128        );
1129
1130        // Test invalid scatter concurrency config
1131        let mut config = Config::default();
1132        config.workflow.scatter.concurrency = Some(0);
1133        assert_eq!(
1134            config.validate().unwrap_err().to_string(),
1135            "configuration value `workflow.scatter.concurrency` cannot be zero"
1136        );
1137
1138        // Test invalid backend name
1139        let config = Config {
1140            backend: Some("foo".into()),
1141            ..Default::default()
1142        };
1143        assert_eq!(
1144            config.validate().unwrap_err().to_string(),
1145            "a backend named `foo` is not present in the configuration"
1146        );
1147        let config = Config {
1148            backend: Some("bar".into()),
1149            backends: [("foo".to_string(), BackendConfig::default())].into(),
1150            ..Default::default()
1151        };
1152        assert_eq!(
1153            config.validate().unwrap_err().to_string(),
1154            "a backend named `bar` is not present in the configuration"
1155        );
1156
1157        // Test a singular backend
1158        let config = Config {
1159            backends: [("foo".to_string(), BackendConfig::default())].into(),
1160            ..Default::default()
1161        };
1162        config.validate().expect("config should validate");
1163
1164        // Test invalid local backend cpu config
1165        let config = Config {
1166            backends: [(
1167                "default".to_string(),
1168                BackendConfig::Local(LocalBackendConfig {
1169                    cpu: Some(0),
1170                    ..Default::default()
1171                }),
1172            )]
1173            .into(),
1174            ..Default::default()
1175        };
1176        assert_eq!(
1177            config.validate().unwrap_err().to_string(),
1178            "local backend configuration value `cpu` cannot be zero"
1179        );
1180        let config = Config {
1181            backends: [(
1182                "default".to_string(),
1183                BackendConfig::Local(LocalBackendConfig {
1184                    cpu: Some(10000000),
1185                    ..Default::default()
1186                }),
1187            )]
1188            .into(),
1189            ..Default::default()
1190        };
1191        assert!(config.validate().unwrap_err().to_string().starts_with(
1192            "local backend configuration value `cpu` cannot exceed the virtual CPUs available to \
1193             the host"
1194        ));
1195
1196        // Test invalid local backend memory config
1197        let config = Config {
1198            backends: [(
1199                "default".to_string(),
1200                BackendConfig::Local(LocalBackendConfig {
1201                    memory: Some("0 GiB".to_string()),
1202                    ..Default::default()
1203                }),
1204            )]
1205            .into(),
1206            ..Default::default()
1207        };
1208        assert_eq!(
1209            config.validate().unwrap_err().to_string(),
1210            "local backend configuration value `memory` cannot be zero"
1211        );
1212        let config = Config {
1213            backends: [(
1214                "default".to_string(),
1215                BackendConfig::Local(LocalBackendConfig {
1216                    memory: Some("100 meows".to_string()),
1217                    ..Default::default()
1218                }),
1219            )]
1220            .into(),
1221            ..Default::default()
1222        };
1223        assert_eq!(
1224            config.validate().unwrap_err().to_string(),
1225            "local backend configuration value `memory` has invalid value `100 meows`"
1226        );
1227
1228        let config = Config {
1229            backends: [(
1230                "default".to_string(),
1231                BackendConfig::Local(LocalBackendConfig {
1232                    memory: Some("1000 TiB".to_string()),
1233                    ..Default::default()
1234                }),
1235            )]
1236            .into(),
1237            ..Default::default()
1238        };
1239        assert!(config.validate().unwrap_err().to_string().starts_with(
1240            "local backend configuration value `memory` cannot exceed the total memory of the host"
1241        ));
1242
1243        // Test missing TES URL
1244        let config = Config {
1245            backends: [(
1246                "default".to_string(),
1247                BackendConfig::Tes(Default::default()),
1248            )]
1249            .into(),
1250            ..Default::default()
1251        };
1252        assert_eq!(
1253            config.validate().unwrap_err().to_string(),
1254            "TES backend configuration value `url` is required"
1255        );
1256
1257        // Insecure TES URL
1258        let config = Config {
1259            backends: [(
1260                "default".to_string(),
1261                BackendConfig::Tes(
1262                    TesBackendConfig {
1263                        url: Some("http://example.com".parse().unwrap()),
1264                        inputs: Some("http://example.com".parse().unwrap()),
1265                        outputs: Some("http://example.com".parse().unwrap()),
1266                        ..Default::default()
1267                    }
1268                    .into(),
1269                ),
1270            )]
1271            .into(),
1272            ..Default::default()
1273        };
1274        assert_eq!(
1275            config.validate().unwrap_err().to_string(),
1276            "TES backend configuration value `url` has invalid value `http://example.com/`: URL \
1277             must use a HTTPS scheme"
1278        );
1279
1280        // Allow insecure URL
1281        let config = Config {
1282            backends: [(
1283                "default".to_string(),
1284                BackendConfig::Tes(
1285                    TesBackendConfig {
1286                        url: Some("http://example.com".parse().unwrap()),
1287                        inputs: Some("http://example.com".parse().unwrap()),
1288                        outputs: Some("http://example.com".parse().unwrap()),
1289                        insecure: true,
1290                        ..Default::default()
1291                    }
1292                    .into(),
1293                ),
1294            )]
1295            .into(),
1296            ..Default::default()
1297        };
1298        config.validate().expect("configuration should validate");
1299
1300        let mut config = Config::default();
1301        config.http.parallelism = Some(0);
1302        assert_eq!(
1303            config.validate().unwrap_err().to_string(),
1304            "configuration value `http.parallelism` cannot be zero"
1305        );
1306
1307        let mut config = Config::default();
1308        config.http.parallelism = Some(5);
1309        assert!(
1310            config.validate().is_ok(),
1311            "should pass for valid configuration"
1312        );
1313
1314        let mut config = Config::default();
1315        config.http.parallelism = None;
1316        assert!(config.validate().is_ok(), "should pass for default (None)");
1317    }
1318}