wdl_engine/eval/v1/
task.rs

1//! Implementation of evaluation for V1 tasks.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::mem;
7use std::path::Path;
8use std::path::absolute;
9use std::str::FromStr;
10use std::sync::Arc;
11
12use anyhow::Context;
13use anyhow::Result;
14use anyhow::anyhow;
15use anyhow::bail;
16use bimap::BiHashMap;
17use indexmap::IndexMap;
18use petgraph::algo::toposort;
19use tokio::task::JoinSet;
20use tokio_util::sync::CancellationToken;
21use tracing::Level;
22use tracing::debug;
23use tracing::enabled;
24use tracing::info;
25use tracing::warn;
26use wdl_analysis::Document;
27use wdl_analysis::diagnostics::Io;
28use wdl_analysis::diagnostics::multiple_type_mismatch;
29use wdl_analysis::diagnostics::unknown_name;
30use wdl_analysis::document::TASK_VAR_NAME;
31use wdl_analysis::document::Task;
32use wdl_analysis::eval::v1::TaskGraphBuilder;
33use wdl_analysis::eval::v1::TaskGraphNode;
34use wdl_analysis::types::Optional;
35use wdl_analysis::types::PrimitiveType;
36use wdl_analysis::types::Type;
37use wdl_analysis::types::v1::task_hint_types;
38use wdl_analysis::types::v1::task_requirement_types;
39use wdl_ast::Ast;
40use wdl_ast::AstNode;
41use wdl_ast::AstToken;
42use wdl_ast::Diagnostic;
43use wdl_ast::Span;
44use wdl_ast::SupportedVersion;
45use wdl_ast::v1::CommandPart;
46use wdl_ast::v1::CommandSection;
47use wdl_ast::v1::Decl;
48use wdl_ast::v1::RequirementsSection;
49use wdl_ast::v1::RuntimeSection;
50use wdl_ast::v1::StrippedCommandPart;
51use wdl_ast::v1::TASK_HINT_DISKS;
52use wdl_ast::v1::TASK_HINT_MAX_CPU;
53use wdl_ast::v1::TASK_HINT_MAX_CPU_ALIAS;
54use wdl_ast::v1::TASK_HINT_MAX_MEMORY;
55use wdl_ast::v1::TASK_HINT_MAX_MEMORY_ALIAS;
56use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER;
57use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER_ALIAS;
58use wdl_ast::v1::TASK_REQUIREMENT_CPU;
59use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
60use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES;
61use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES_ALIAS;
62use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
63use wdl_ast::v1::TaskDefinition;
64use wdl_ast::v1::TaskHintsSection;
65use wdl_ast::version::V1;
66
67use crate::Coercible;
68use crate::EvaluationContext;
69use crate::EvaluationError;
70use crate::EvaluationResult;
71use crate::Events;
72use crate::GuestPath;
73use crate::HostPath;
74use crate::Input;
75use crate::InputKind;
76use crate::ONE_GIBIBYTE;
77use crate::Outputs;
78use crate::Scope;
79use crate::ScopeIndex;
80use crate::ScopeRef;
81use crate::StorageUnit;
82use crate::TaskExecutionBackend;
83use crate::TaskInputs;
84use crate::TaskSpawnInfo;
85use crate::TaskSpawnRequest;
86use crate::TaskValue;
87use crate::Value;
88use crate::config::Config;
89use crate::config::MAX_RETRIES;
90use crate::convert_unit_string;
91use crate::diagnostics::decl_evaluation_failed;
92use crate::diagnostics::runtime_type_mismatch;
93use crate::diagnostics::task_execution_failed;
94use crate::diagnostics::task_localization_failed;
95use crate::eval::EvaluatedTask;
96use crate::eval::trie::InputTrie;
97use crate::http::HttpTransferer;
98use crate::http::Transferer;
99use crate::path::EvaluationPath;
100use crate::path::is_file_url;
101use crate::path::is_url;
102use crate::tree::SyntaxNode;
103use crate::v1::ExprEvaluator;
104use crate::v1::INPUTS_FILE;
105use crate::v1::OUTPUTS_FILE;
106use crate::v1::write_json_file;
107
108/// The default container requirement.
109pub const DEFAULT_TASK_REQUIREMENT_CONTAINER: &str = "ubuntu:latest";
110/// The default value for the `cpu` requirement.
111pub const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
112/// The default value for the `memory` requirement.
113pub const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * (ONE_GIBIBYTE as i64);
114/// The default value for the `max_retries` requirement.
115pub const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
116/// The default value for the `disks` requirement (in GiB).
117pub const DEFAULT_TASK_REQUIREMENT_DISKS: f64 = 1.0;
118
119/// The index of a task's root scope.
120const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
121/// The index of a task's output scope.
122const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
123/// The index of the evaluation scope where the WDL 1.2 `task` variable is
124/// visible.
125const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
126
127/// Gets the `container` requirement from a requirements map.
128pub(crate) fn container<'a>(
129    requirements: &'a HashMap<String, Value>,
130    default: Option<&'a str>,
131) -> Cow<'a, str> {
132    requirements
133        .get(TASK_REQUIREMENT_CONTAINER)
134        .or_else(|| requirements.get(TASK_REQUIREMENT_CONTAINER_ALIAS))
135        .and_then(|v| -> Option<Cow<'_, str>> {
136            // If the value is an array, use the first element or the default
137            // Note: in the future we should be resolving which element in the array is
138            // usable; this will require some work in Crankshaft to enable
139            if let Some(array) = v.as_array() {
140                return array.as_slice().first().map(|v| {
141                    v.as_string()
142                        .expect("type should be string")
143                        .as_ref()
144                        .into()
145                });
146            }
147
148            Some(
149                v.coerce(None, &PrimitiveType::String.into())
150                    .expect("type should coerce")
151                    .unwrap_string()
152                    .as_ref()
153                    .clone()
154                    .into(),
155            )
156        })
157        .and_then(|v| {
158            // Treat star as the default
159            if v == "*" { None } else { Some(v) }
160        })
161        .unwrap_or_else(|| {
162            default
163                .map(Into::into)
164                .unwrap_or(DEFAULT_TASK_REQUIREMENT_CONTAINER.into())
165        })
166}
167
168/// Gets the `cpu` requirement from a requirements map.
169pub(crate) fn cpu(requirements: &HashMap<String, Value>) -> f64 {
170    requirements
171        .get(TASK_REQUIREMENT_CPU)
172        .map(|v| {
173            v.coerce(None, &PrimitiveType::Float.into())
174                .expect("type should coerce")
175                .unwrap_float()
176        })
177        .unwrap_or(DEFAULT_TASK_REQUIREMENT_CPU)
178}
179
180/// Gets the `max_cpu` hint from a hints map.
181pub(crate) fn max_cpu(hints: &HashMap<String, Value>) -> Option<f64> {
182    hints
183        .get(TASK_HINT_MAX_CPU)
184        .or_else(|| hints.get(TASK_HINT_MAX_CPU_ALIAS))
185        .map(|v| {
186            v.coerce(None, &PrimitiveType::Float.into())
187                .expect("type should coerce")
188                .unwrap_float()
189        })
190}
191
192/// Gets the `memory` requirement from a requirements map.
193pub(crate) fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
194    Ok(requirements
195        .get(TASK_REQUIREMENT_MEMORY)
196        .map(|v| {
197            if let Some(v) = v.as_integer() {
198                return Ok(v);
199            }
200
201            if let Some(s) = v.as_string() {
202                return convert_unit_string(s)
203                    .and_then(|v| v.try_into().ok())
204                    .with_context(|| {
205                        format!("task specifies an invalid `memory` requirement `{s}`")
206                    });
207            }
208
209            unreachable!("value should be an integer or string");
210        })
211        .transpose()?
212        .unwrap_or(DEFAULT_TASK_REQUIREMENT_MEMORY))
213}
214
215/// Gets the `max_memory` hint from a hints map.
216pub(crate) fn max_memory(hints: &HashMap<String, Value>) -> Result<Option<i64>> {
217    hints
218        .get(TASK_HINT_MAX_MEMORY)
219        .or_else(|| hints.get(TASK_HINT_MAX_MEMORY_ALIAS))
220        .map(|v| {
221            if let Some(v) = v.as_integer() {
222                return Ok(v);
223            }
224
225            if let Some(s) = v.as_string() {
226                return convert_unit_string(s)
227                    .and_then(|v| v.try_into().ok())
228                    .with_context(|| {
229                        format!("task specifies an invalid `memory` requirement `{s}`")
230                    });
231            }
232
233            unreachable!("value should be an integer or string");
234        })
235        .transpose()
236}
237
238/// Represents the type of a disk.
239///
240/// Disk types are specified via hints.
241#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
242pub enum DiskType {
243    /// The disk type is a solid state drive.
244    SSD,
245    /// The disk type is a hard disk drive.
246    HDD,
247}
248
249impl FromStr for DiskType {
250    type Err = ();
251
252    fn from_str(s: &str) -> Result<Self, Self::Err> {
253        match s {
254            "SSD" => Ok(Self::SSD),
255            "HDD" => Ok(Self::HDD),
256            _ => Err(()),
257        }
258    }
259}
260
261/// Represents a task disk requirement.
262pub struct DiskRequirement {
263    /// The size of the disk, in GiB.
264    pub size: i64,
265
266    /// The disk type as specified by a corresponding task hint.
267    pub ty: Option<DiskType>,
268}
269
270/// Gets the `disks` requirement.
271///
272/// Upon success, returns a mapping of mount point to disk requirement.
273pub(crate) fn disks<'a>(
274    requirements: &'a HashMap<String, Value>,
275    hints: &HashMap<String, Value>,
276) -> Result<HashMap<&'a str, DiskRequirement>> {
277    /// Helper for looking up a disk type from the hints.
278    ///
279    /// If we don't recognize the specification, we ignore it.
280    fn lookup_type(mount_point: Option<&str>, hints: &HashMap<String, Value>) -> Option<DiskType> {
281        hints.get(TASK_HINT_DISKS).and_then(|v| {
282            if let Some(ty) = v.as_string() {
283                return ty.parse().ok();
284            }
285
286            if let Some(map) = v.as_map() {
287                // Find the corresponding key; we have to scan the keys because the map is
288                // storing primitive values
289                if let Some((_, v)) = map.iter().find(|(k, _)| match (k, mount_point) {
290                    (None, None) => true,
291                    (None, Some(_)) | (Some(_), None) => false,
292                    (Some(k), Some(mount_point)) => k
293                        .as_string()
294                        .map(|k| k.as_str() == mount_point)
295                        .unwrap_or(false),
296                }) {
297                    return v.as_string().and_then(|ty| ty.parse().ok());
298                }
299            }
300
301            None
302        })
303    }
304
305    /// Parses a disk specification into a size (in GiB) and optional mount
306    /// point.
307    fn parse_disk_spec(spec: &str) -> Option<(i64, Option<&str>)> {
308        let iter = spec.split_whitespace();
309        let mut first = None;
310        let mut second = None;
311        let mut third = None;
312
313        for part in iter {
314            if first.is_none() {
315                first = Some(part);
316                continue;
317            }
318
319            if second.is_none() {
320                second = Some(part);
321                continue;
322            }
323
324            if third.is_none() {
325                third = Some(part);
326                continue;
327            }
328
329            return None;
330        }
331
332        match (first, second, third) {
333            (None, None, None) => None,
334            (Some(size), None, None) => {
335                // Specification is `<size>` (in GiB)
336                Some((size.parse().ok()?, None))
337            }
338            (Some(first), Some(second), None) => {
339                // Check for `<size> <unit>`; convert from the specified unit to GiB
340                if let Ok(size) = first.parse() {
341                    let unit: StorageUnit = second.parse().ok()?;
342                    let size = unit.bytes(size)? / (ONE_GIBIBYTE as u64);
343                    return Some((size.try_into().ok()?, None));
344                }
345
346                // Specification is `<mount-point> <size>` (where size is already in GiB)
347                // The mount point must be absolute, i.e. start with `/`
348                if !first.starts_with('/') {
349                    return None;
350                }
351
352                Some((second.parse().ok()?, Some(first)))
353            }
354            (Some(mount_point), Some(size), Some(unit)) => {
355                // Specification is `<mount-point> <size> <units>`
356                let unit: StorageUnit = unit.parse().ok()?;
357                let size = unit.bytes(size.parse().ok()?)? / (ONE_GIBIBYTE as u64);
358
359                // Mount point must be absolute
360                if !mount_point.starts_with('/') {
361                    return None;
362                }
363
364                Some((size.try_into().ok()?, Some(mount_point)))
365            }
366            _ => unreachable!("should have one, two, or three values"),
367        }
368    }
369
370    /// Inserts a disk into the disks map.
371    fn insert_disk<'a>(
372        spec: &'a str,
373        hints: &HashMap<String, Value>,
374        disks: &mut HashMap<&'a str, DiskRequirement>,
375    ) -> Result<()> {
376        let (size, mount_point) =
377            parse_disk_spec(spec).with_context(|| format!("invalid disk specification `{spec}"))?;
378
379        let prev = disks.insert(
380            mount_point.unwrap_or("/"),
381            DiskRequirement {
382                size,
383                ty: lookup_type(mount_point, hints),
384            },
385        );
386
387        if prev.is_some() {
388            bail!(
389                "duplicate mount point `{mp}` specified in `disks` requirement",
390                mp = mount_point.unwrap_or("/")
391            );
392        }
393
394        Ok(())
395    }
396
397    let mut disks = HashMap::new();
398    if let Some(v) = requirements.get(TASK_REQUIREMENT_DISKS) {
399        if let Some(size) = v.as_integer() {
400            // Disk spec is just the size (in GiB)
401            if size < 0 {
402                bail!("task requirement `disks` cannot be less than zero");
403            }
404
405            disks.insert(
406                "/",
407                DiskRequirement {
408                    size,
409                    ty: lookup_type(None, hints),
410                },
411            );
412        } else if let Some(spec) = v.as_string() {
413            insert_disk(spec, hints, &mut disks)?;
414        } else if let Some(v) = v.as_array() {
415            for spec in v.as_slice() {
416                insert_disk(
417                    spec.as_string().expect("spec should be a string"),
418                    hints,
419                    &mut disks,
420                )?;
421            }
422        } else {
423            unreachable!("value should be an integer, string, or array");
424        }
425    }
426
427    Ok(disks)
428}
429
430/// Gets the `preemptible` hint from a hints map.
431///
432/// This hint is not part of the WDL standard but is used for compatibility with
433/// Cromwell where backends can support preemptible retries before using
434/// dedicated instances.
435pub(crate) fn preemptible(hints: &HashMap<String, Value>) -> i64 {
436    const TASK_HINT_PREEMPTIBLE: &str = "preemptible";
437    const DEFAULT_TASK_HINT_PREEMPTIBLE: i64 = 0;
438
439    hints
440        .get(TASK_HINT_PREEMPTIBLE)
441        .and_then(|v| {
442            Some(
443                v.coerce(None, &PrimitiveType::Integer.into())
444                    .ok()?
445                    .unwrap_integer(),
446            )
447        })
448        .unwrap_or(DEFAULT_TASK_HINT_PREEMPTIBLE)
449}
450
451/// Used to evaluate expressions in tasks.
452struct TaskEvaluationContext<'a, 'b> {
453    /// The associated evaluation state.
454    state: &'a mut State<'b>,
455    /// The transferer to use for expression evaluation.
456    transferer: &'a dyn Transferer,
457    /// The current evaluation scope.
458    scope: ScopeIndex,
459    /// The task work directory.
460    ///
461    /// This is `None` unless the output section is being evaluated.
462    work_dir: Option<&'a EvaluationPath>,
463    /// The standard out value to use.
464    ///
465    /// This field is only available after task execution.
466    stdout: Option<&'a Value>,
467    /// The standard error value to use.
468    ///
469    /// This field is only available after task execution.
470    stderr: Option<&'a Value>,
471    /// Whether or not the evaluation has associated task information.
472    ///
473    /// This is `true` when evaluating hints sections.
474    task: bool,
475}
476
477impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
478    /// Constructs a new expression evaluation context.
479    pub fn new(
480        state: &'a mut State<'b>,
481        transferer: &'a dyn Transferer,
482        scope: ScopeIndex,
483    ) -> Self {
484        Self {
485            state,
486            transferer,
487            scope,
488            work_dir: None,
489            stdout: None,
490            stderr: None,
491            task: false,
492        }
493    }
494
495    /// Sets the task's work directory to use for the evaluation context.
496    pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
497        self.work_dir = Some(work_dir);
498        self
499    }
500
501    /// Sets the stdout value to use for the evaluation context.
502    pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
503        self.stdout = Some(stdout);
504        self
505    }
506
507    /// Sets the stderr value to use for the evaluation context.
508    pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
509        self.stderr = Some(stderr);
510        self
511    }
512
513    /// Marks the evaluation as having associated task information.
514    ///
515    /// This is used in evaluating hints sections.
516    pub fn with_task(mut self) -> Self {
517        self.task = true;
518        self
519    }
520}
521
522impl EvaluationContext for TaskEvaluationContext<'_, '_> {
523    fn version(&self) -> SupportedVersion {
524        self.state
525            .document
526            .version()
527            .expect("document should have a version")
528    }
529
530    fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
531        ScopeRef::new(&self.state.scopes, self.scope)
532            .lookup(name)
533            .cloned()
534            .ok_or_else(|| unknown_name(name, span))
535    }
536
537    fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
538        crate::resolve_type_name(self.state.document, name, span)
539    }
540
541    fn base_dir(&self) -> &EvaluationPath {
542        self.work_dir.unwrap_or(&self.state.base_dir)
543    }
544
545    fn temp_dir(&self) -> &Path {
546        self.state.temp_dir
547    }
548
549    fn stdout(&self) -> Option<&Value> {
550        self.stdout
551    }
552
553    fn stderr(&self) -> Option<&Value> {
554        self.stderr
555    }
556
557    fn task(&self) -> Option<&Task> {
558        if self.task {
559            Some(self.state.task)
560        } else {
561            None
562        }
563    }
564
565    fn transferer(&self) -> &dyn Transferer {
566        self.transferer
567    }
568
569    fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
570        self.state.path_map.get_by_right(path).cloned()
571    }
572
573    fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
574        self.state.path_map.get_by_left(path).cloned()
575    }
576
577    fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
578        self.state.insert_backend_input(InputKind::File, path)?;
579        Ok(())
580    }
581}
582
583/// Represents task evaluation state.
584struct State<'a> {
585    /// The temp directory.
586    temp_dir: &'a Path,
587    /// The base directory for evaluation.
588    ///
589    /// This is the document's directory.
590    ///
591    /// When outputs are evaluated, the task's work directory is used as the
592    /// base directory.
593    base_dir: EvaluationPath,
594    /// The document containing the workflow being evaluated.
595    document: &'a Document,
596    /// The task being evaluated.
597    task: &'a Task,
598    /// The scopes of the task being evaluated.
599    ///
600    /// The first scope is the root scope, the second is the output scope, and
601    /// the third is the scope where the "task" variable is visible in 1.2+
602    /// evaluations.
603    scopes: [Scope; 3],
604    /// The environment variables of the task.
605    ///
606    /// Environment variables do not change between retries.
607    env: IndexMap<String, String>,
608    /// The trie for mapping backend inputs.
609    backend_inputs: InputTrie,
610    /// A bi-map of host paths and guest paths.
611    path_map: BiHashMap<HostPath, GuestPath>,
612}
613
614impl<'a> State<'a> {
615    /// Constructs a new task evaluation state.
616    fn new(
617        document: &'a Document,
618        task: &'a Task,
619        temp_dir: &'a Path,
620        guest_inputs_dir: Option<&'static str>,
621    ) -> Result<Self> {
622        // Tasks have a root scope (index 0), an output scope (index 1), and a `task`
623        // variable scope (index 2). The output scope inherits from the root scope and
624        // the task scope inherits from the output scope. Inputs and private
625        // declarations are evaluated into the root scope. Outputs are evaluated into
626        // the output scope. The task scope is used for evaluating expressions in both
627        // the command and output sections. Only the `task` variable in WDL 1.2 is
628        // introduced into the task scope; in previous WDL versions, the task scope will
629        // not have any local names.
630        let scopes = [
631            Scope::default(),
632            Scope::new(ROOT_SCOPE_INDEX),
633            Scope::new(OUTPUT_SCOPE_INDEX),
634        ];
635
636        let backend_inputs = if let Some(guest_inputs_dir) = guest_inputs_dir {
637            InputTrie::new_with_guest_dir(guest_inputs_dir)
638        } else {
639            InputTrie::new()
640        };
641
642        let document_path = document.path();
643        let mut base_dir = EvaluationPath::parent_of(&document_path).with_context(|| {
644            format!("document `{document_path}` does not have a parent directory")
645        })?;
646
647        base_dir.make_absolute();
648
649        Ok(Self {
650            temp_dir,
651            base_dir,
652            document,
653            task,
654            scopes,
655            env: Default::default(),
656            backend_inputs,
657            path_map: Default::default(),
658        })
659    }
660
661    /// Adds backend inputs to the state for any `File` or `Directory` values
662    /// referenced by the given value.
663    ///
664    /// If the backend doesn't use containers, remote inputs are immediately
665    /// localized.
666    ///
667    /// If the backend does use containers, remote inputs are localized during
668    /// the call to `localize_inputs`.
669    ///
670    /// This method also ensures that a `File` or `Directory` paths exist for
671    /// WDL 1.2+.
672    async fn add_backend_inputs(
673        &mut self,
674        is_optional: bool,
675        value: &mut Value,
676        transferer: &Arc<dyn Transferer>,
677        needs_local_inputs: bool,
678    ) -> Result<()> {
679        // For WDL 1.2 documents, start by ensuring paths exist.
680        // This will replace any non-existent optional paths with `None`
681        if self
682            .document
683            .version()
684            .expect("document should have a version")
685            >= SupportedVersion::V1(V1::Two)
686        {
687            value
688                .ensure_paths_exist(
689                    is_optional,
690                    self.base_dir.as_local(),
691                    Some(transferer.as_ref()),
692                    &|_| Ok(()),
693                )
694                .await?;
695        }
696
697        // Add inputs to the backend
698        let mut urls = Vec::new();
699        value.visit_paths(&mut |is_file, path| {
700            // Insert a backend input for the path
701            if let Some(index) = self.insert_backend_input(
702                if is_file {
703                    InputKind::File
704                } else {
705                    InputKind::Directory
706                },
707                path,
708            )? {
709                // Check to see if there's no guest path for a remote URL that needs to be
710                // localized; if so, we must localize it now
711                if needs_local_inputs
712                    && self.backend_inputs.as_slice()[index].guest_path.is_none()
713                    && is_url(path.as_str())
714                    && !is_file_url(path.as_str())
715                {
716                    urls.push((path.clone(), index));
717                }
718            }
719
720            Ok(())
721        })?;
722
723        if urls.is_empty() {
724            return Ok(());
725        }
726
727        // Download any necessary files
728        let mut downloads = JoinSet::new();
729        for (url, index) in urls {
730            let transferer = transferer.clone();
731            downloads.spawn(async move {
732                transferer
733                    .download(
734                        &url.as_str()
735                            .parse()
736                            .with_context(|| format!("invalid URL `{url}`"))?,
737                    )
738                    .await
739                    .with_context(|| anyhow!("failed to localize `{url}`"))
740                    .map(|l| (url, l, index))
741            });
742        }
743
744        // Wait for the downloads to complete
745        while let Some(result) = downloads.join_next().await {
746            let (url, location, index) =
747                result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}")))?;
748
749            let guest_path = GuestPath::new(location.to_str().with_context(|| {
750                format!(
751                    "download location `{location}` is not UTF-8",
752                    location = location.display()
753                )
754            })?);
755
756            // Map the URL to the guest path
757            self.path_map.insert(url, guest_path);
758
759            // Finally, set the location of the input
760            self.backend_inputs.as_slice_mut()[index].set_location(location);
761        }
762
763        Ok(())
764    }
765
766    /// Inserts a backend input into the state.
767    ///
768    /// Responsible for mapping host and guest paths.
769    fn insert_backend_input(&mut self, kind: InputKind, path: &HostPath) -> Result<Option<usize>> {
770        // Insert an input for the path
771        if let Some(index) = self
772            .backend_inputs
773            .insert(kind, path.as_str(), &self.base_dir)?
774        {
775            // If the input has a guest path, map it
776            let input = &self.backend_inputs.as_slice()[index];
777            if let Some(guest_path) = &input.guest_path {
778                self.path_map.insert(path.clone(), guest_path.clone());
779            }
780
781            return Ok(Some(index));
782        }
783
784        Ok(None)
785    }
786}
787
788/// Represents the result of evaluating task sections before execution.
789struct EvaluatedSections {
790    /// The evaluated command.
791    command: String,
792    /// The evaluated requirements.
793    requirements: Arc<HashMap<String, Value>>,
794    /// The evaluated hints.
795    hints: Arc<HashMap<String, Value>>,
796}
797
798/// Represents a WDL V1 task evaluator.
799pub struct TaskEvaluator {
800    /// The associated evaluation configuration.
801    config: Arc<Config>,
802    /// The associated task execution backend.
803    backend: Arc<dyn TaskExecutionBackend>,
804    /// The cancellation token for cancelling task evaluation.
805    token: CancellationToken,
806    /// The transferer to use for expression evaluation.
807    transferer: Arc<dyn Transferer>,
808}
809
810impl TaskEvaluator {
811    /// Constructs a new task evaluator with the given evaluation
812    /// configuration, cancellation token, and events sender.
813    ///
814    /// Returns an error if the configuration isn't valid.
815    pub async fn new(config: Config, token: CancellationToken, events: Events) -> Result<Self> {
816        config.validate().await?;
817
818        let config = Arc::new(config);
819        let backend = config.create_backend(events.crankshaft().clone()).await?;
820        let transferer =
821            HttpTransferer::new(config.clone(), token.clone(), events.transfer().clone())?;
822
823        Ok(Self {
824            config,
825            backend,
826            token,
827            transferer: Arc::new(transferer),
828        })
829    }
830
831    /// Creates a new task evaluator with the given configuration, backend,
832    /// cancellation token, and transferer.
833    ///
834    /// This method does not validate the configuration.
835    pub(crate) fn new_unchecked(
836        config: Arc<Config>,
837        backend: Arc<dyn TaskExecutionBackend>,
838        token: CancellationToken,
839        transferer: Arc<dyn Transferer>,
840    ) -> Self {
841        Self {
842            config,
843            backend,
844            token,
845            transferer,
846        }
847    }
848
849    /// Evaluates the given task.
850    ///
851    /// Upon success, returns the evaluated task.
852    pub async fn evaluate(
853        &self,
854        document: &Document,
855        task: &Task,
856        inputs: &TaskInputs,
857        root: impl AsRef<Path>,
858    ) -> EvaluationResult<EvaluatedTask> {
859        // We cannot evaluate a document with errors
860        if document.has_errors() {
861            return Err(anyhow!("cannot evaluate a document with errors").into());
862        }
863
864        self.perform_evaluation(document, task, inputs, root.as_ref(), task.name())
865            .await
866    }
867
868    /// Performs the evaluation of the given task.
869    ///
870    /// This method skips checking the document (and its transitive imports) for
871    /// analysis errors as the check occurs at the `evaluate` entrypoint.
872    pub(crate) async fn perform_evaluation(
873        &self,
874        document: &Document,
875        task: &Task,
876        inputs: &TaskInputs,
877        root: &Path,
878        id: &str,
879    ) -> EvaluationResult<EvaluatedTask> {
880        inputs.validate(document, task, None).with_context(|| {
881            format!(
882                "failed to validate the inputs to task `{task}`",
883                task = task.name()
884            )
885        })?;
886
887        let ast = match document.root().morph().ast() {
888            Ast::V1(ast) => ast,
889            _ => {
890                return Err(
891                    anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
892                );
893            }
894        };
895
896        // Find the task in the AST
897        let definition = ast
898            .tasks()
899            .find(|t| t.name().text() == task.name())
900            .expect("task should exist in the AST");
901
902        let version = document.version().expect("document should have version");
903
904        // Build an evaluation graph for the task
905        let mut diagnostics = Vec::new();
906        let graph = TaskGraphBuilder::default().build(version, &definition, &mut diagnostics);
907        assert!(
908            diagnostics.is_empty(),
909            "task evaluation graph should have no diagnostics"
910        );
911
912        debug!(
913            task_id = id,
914            task_name = task.name(),
915            document = document.uri().as_str(),
916            "evaluating task"
917        );
918
919        let root_dir = absolute(root).with_context(|| {
920            format!(
921                "failed to determine absolute path of `{path}`",
922                path = root.display()
923            )
924        })?;
925
926        // Create the temp directory now as it may be needed for task evaluation
927        let temp_dir = root_dir.join("tmp");
928        fs::create_dir_all(&temp_dir).with_context(|| {
929            format!(
930                "failed to create directory `{path}`",
931                path = temp_dir.display()
932            )
933        })?;
934
935        // Write the inputs to the task's root directory
936        write_json_file(root_dir.join(INPUTS_FILE), inputs)?;
937
938        let mut state = State::new(document, task, &temp_dir, self.backend.guest_inputs_dir())?;
939        let nodes = toposort(&graph, None).expect("graph should be acyclic");
940        let mut current = 0;
941        while current < nodes.len() {
942            match &graph[nodes[current]] {
943                TaskGraphNode::Input(decl) => {
944                    self.evaluate_input(id, &mut state, decl, inputs)
945                        .await
946                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
947                }
948                TaskGraphNode::Decl(decl) => {
949                    self.evaluate_decl(id, &mut state, decl)
950                        .await
951                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
952                }
953                TaskGraphNode::Output(_) => {
954                    // Stop at the first output
955                    break;
956                }
957                TaskGraphNode::Command(_)
958                | TaskGraphNode::Runtime(_)
959                | TaskGraphNode::Requirements(_)
960                | TaskGraphNode::Hints(_) => {
961                    // Skip these sections for now; they'll evaluate in the
962                    // retry loop
963                }
964            }
965
966            current += 1;
967        }
968
969        let env = Arc::new(mem::take(&mut state.env));
970        // Spawn the task in a retry loop
971        let mut attempt = 0;
972        let mut evaluated = loop {
973            let EvaluatedSections {
974                command,
975                requirements,
976                hints,
977            } = self
978                .evaluate_sections(id, &mut state, &definition, inputs, attempt)
979                .await?;
980
981            // Get the maximum number of retries, either from the task's requirements or
982            // from configuration
983            let max_retries = requirements
984                .get(TASK_REQUIREMENT_MAX_RETRIES)
985                .or_else(|| requirements.get(TASK_REQUIREMENT_MAX_RETRIES_ALIAS))
986                .cloned()
987                .map(|v| v.unwrap_integer() as u64)
988                .or_else(|| self.config.task.retries)
989                .unwrap_or(DEFAULT_TASK_REQUIREMENT_MAX_RETRIES);
990
991            if max_retries > MAX_RETRIES {
992                return Err(anyhow!(
993                    "task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
994                )
995                .into());
996            }
997
998            let mut attempt_dir = root_dir.clone();
999            attempt_dir.push("attempts");
1000            attempt_dir.push(attempt.to_string());
1001
1002            let request = TaskSpawnRequest::new(
1003                id.to_string(),
1004                TaskSpawnInfo::new(
1005                    command,
1006                    self.localize_inputs(id, &mut state).await?,
1007                    requirements.clone(),
1008                    hints.clone(),
1009                    env.clone(),
1010                    self.transferer.clone(),
1011                ),
1012                attempt,
1013                attempt_dir.clone(),
1014                root_dir.clone(),
1015                temp_dir.clone(),
1016            );
1017
1018            let result = self
1019                .backend
1020                .spawn(request, self.token.clone())
1021                .with_context(|| {
1022                    format!(
1023                        "failed to spawn task `{name}` in `{path}` (task id `{id}`)",
1024                        name = task.name(),
1025                        path = document.path(),
1026                    )
1027                })?
1028                .await
1029                .expect("failed to receive response from spawned task")
1030                .map_err(|e| {
1031                    EvaluationError::new(
1032                        state.document.clone(),
1033                        task_execution_failed(e, task.name(), id, task.name_span()),
1034                    )
1035                })?;
1036
1037            // Update the task variable
1038            let evaluated = EvaluatedTask::new(attempt_dir, result)?;
1039            if version >= SupportedVersion::V1(V1::Two) {
1040                let task = state.scopes[TASK_SCOPE_INDEX.0]
1041                    .get_mut(TASK_VAR_NAME)
1042                    .unwrap()
1043                    .as_task_mut()
1044                    .unwrap();
1045
1046                task.set_attempt(attempt.try_into().with_context(|| {
1047                    format!(
1048                        "too many attempts were made to run task `{task}`",
1049                        task = state.task.name()
1050                    )
1051                })?);
1052                task.set_return_code(evaluated.result.exit_code);
1053            }
1054
1055            if let Err(e) = evaluated
1056                .handle_exit(&requirements, self.transferer.as_ref())
1057                .await
1058            {
1059                if attempt >= max_retries {
1060                    return Err(EvaluationError::new(
1061                        state.document.clone(),
1062                        task_execution_failed(e, task.name(), id, task.name_span()),
1063                    ));
1064                }
1065
1066                attempt += 1;
1067
1068                info!(
1069                    "retrying execution of task `{name}` (retry {attempt})",
1070                    name = state.task.name()
1071                );
1072                continue;
1073            }
1074
1075            break evaluated;
1076        };
1077
1078        // Perform backend cleanup before output evaluation
1079        if let Some(cleanup) = self
1080            .backend
1081            .cleanup(&evaluated.result.work_dir, self.token.clone())
1082        {
1083            cleanup.await;
1084        }
1085
1086        // Evaluate the remaining inputs (unused), and decls, and outputs
1087        for index in &nodes[current..] {
1088            match &graph[*index] {
1089                TaskGraphNode::Decl(decl) => {
1090                    self.evaluate_decl(id, &mut state, decl)
1091                        .await
1092                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1093                }
1094                TaskGraphNode::Output(decl) => {
1095                    self.evaluate_output(id, &mut state, decl, &evaluated)
1096                        .await
1097                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1098                }
1099                _ => {
1100                    unreachable!(
1101                        "only declarations and outputs should be evaluated after the command"
1102                    )
1103                }
1104            }
1105        }
1106
1107        // Take the output scope and return it in declaration sort order
1108        let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
1109        if let Some(section) = definition.output() {
1110            let indexes: HashMap<_, _> = section
1111                .declarations()
1112                .enumerate()
1113                .map(|(i, d)| (d.name().hashable(), i))
1114                .collect();
1115            outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
1116        }
1117
1118        // Write the outputs to the task's root directory
1119        write_json_file(root_dir.join(OUTPUTS_FILE), &outputs)?;
1120
1121        evaluated.outputs = Ok(outputs);
1122        Ok(evaluated)
1123    }
1124
1125    /// Evaluates a task input.
1126    async fn evaluate_input(
1127        &self,
1128        id: &str,
1129        state: &mut State<'_>,
1130        decl: &Decl<SyntaxNode>,
1131        inputs: &TaskInputs,
1132    ) -> Result<(), Diagnostic> {
1133        let name = decl.name();
1134        let decl_ty = decl.ty();
1135        let expected_ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1136
1137        // Evaluate the input if not provided one
1138        let (value, span) = match inputs.get(name.text()) {
1139            Some(input) => {
1140                // For WDL 1.2 evaluation, a `None` value when the expected type is non-optional
1141                // will invoke the default expression
1142                if input.is_none()
1143                    && !expected_ty.is_optional()
1144                    && state
1145                        .document
1146                        .version()
1147                        .map(|v| v >= SupportedVersion::V1(V1::Two))
1148                        .unwrap_or(false)
1149                    && let Some(expr) = decl.expr()
1150                {
1151                    debug!(
1152                        task_id = id,
1153                        task_name = state.task.name(),
1154                        document = state.document.uri().as_str(),
1155                        input_name = name.text(),
1156                        "evaluating input default expression"
1157                    );
1158
1159                    let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1160                        state,
1161                        self.transferer.as_ref(),
1162                        ROOT_SCOPE_INDEX,
1163                    ));
1164                    (evaluator.evaluate_expr(&expr).await?, expr.span())
1165                } else {
1166                    (input.clone(), name.span())
1167                }
1168            }
1169            None => match decl.expr() {
1170                Some(expr) => {
1171                    debug!(
1172                        task_id = id,
1173                        task_name = state.task.name(),
1174                        document = state.document.uri().as_str(),
1175                        input_name = name.text(),
1176                        "evaluating input default expression"
1177                    );
1178
1179                    let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1180                        state,
1181                        self.transferer.as_ref(),
1182                        ROOT_SCOPE_INDEX,
1183                    ));
1184                    (evaluator.evaluate_expr(&expr).await?, expr.span())
1185                }
1186                _ => {
1187                    assert!(expected_ty.is_optional(), "type should be optional");
1188                    (Value::new_none(expected_ty.clone()), name.span())
1189                }
1190            },
1191        };
1192
1193        // Coerce the value to the expected type
1194        let mut value = value
1195            .coerce(
1196                Some(&TaskEvaluationContext::new(
1197                    state,
1198                    self.transferer.as_ref(),
1199                    ROOT_SCOPE_INDEX,
1200                )),
1201                &expected_ty,
1202            )
1203            .map_err(|e| runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), span))?;
1204
1205        // Add any file or directory backend inputs
1206        state
1207            .add_backend_inputs(
1208                decl_ty.is_optional(),
1209                &mut value,
1210                &self.transferer,
1211                self.backend.needs_local_inputs(),
1212            )
1213            .await
1214            .map_err(|e| {
1215                decl_evaluation_failed(
1216                    e,
1217                    state.task.name(),
1218                    true,
1219                    name.text(),
1220                    Some(Io::Input),
1221                    name.span(),
1222                )
1223            })?;
1224
1225        // Insert the name into the scope
1226        state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1227
1228        // Insert an environment variable, if it is one
1229        if decl.env().is_some() {
1230            let value = value
1231                .as_primitive()
1232                .expect("value should be primitive")
1233                .raw(Some(&TaskEvaluationContext::new(
1234                    state,
1235                    self.transferer.as_ref(),
1236                    ROOT_SCOPE_INDEX,
1237                )))
1238                .to_string();
1239            state.env.insert(name.text().to_string(), value);
1240        }
1241
1242        Ok(())
1243    }
1244
1245    /// Evaluates a task private declaration.
1246    async fn evaluate_decl(
1247        &self,
1248        id: &str,
1249        state: &mut State<'_>,
1250        decl: &Decl<SyntaxNode>,
1251    ) -> Result<(), Diagnostic> {
1252        let name = decl.name();
1253        debug!(
1254            task_id = id,
1255            task_name = state.task.name(),
1256            document = state.document.uri().as_str(),
1257            decl_name = name.text(),
1258            "evaluating private declaration",
1259        );
1260
1261        let decl_ty = decl.ty();
1262        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1263
1264        let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1265            state,
1266            self.transferer.as_ref(),
1267            ROOT_SCOPE_INDEX,
1268        ));
1269
1270        let expr = decl.expr().expect("private decls should have expressions");
1271        let value = evaluator.evaluate_expr(&expr).await?;
1272        let mut value = value
1273            .coerce(
1274                Some(&TaskEvaluationContext::new(
1275                    state,
1276                    self.transferer.as_ref(),
1277                    ROOT_SCOPE_INDEX,
1278                )),
1279                &ty,
1280            )
1281            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1282
1283        // Add any file or directory backend inputs
1284        state
1285            .add_backend_inputs(
1286                decl_ty.is_optional(),
1287                &mut value,
1288                &self.transferer,
1289                self.backend.needs_local_inputs(),
1290            )
1291            .await
1292            .map_err(|e| {
1293                decl_evaluation_failed(e, state.task.name(), true, name.text(), None, name.span())
1294            })?;
1295
1296        state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1297
1298        // Insert an environment variable, if it is one
1299        if decl.env().is_some() {
1300            let value = value
1301                .as_primitive()
1302                .expect("value should be primitive")
1303                .raw(Some(&TaskEvaluationContext::new(
1304                    state,
1305                    self.transferer.as_ref(),
1306                    ROOT_SCOPE_INDEX,
1307                )))
1308                .to_string();
1309            state.env.insert(name.text().to_string(), value);
1310        }
1311
1312        Ok(())
1313    }
1314
1315    /// Evaluates the runtime section.
1316    ///
1317    /// Returns both the task's hints and requirements.
1318    async fn evaluate_runtime_section(
1319        &self,
1320        id: &str,
1321        state: &mut State<'_>,
1322        section: &RuntimeSection<SyntaxNode>,
1323        inputs: &TaskInputs,
1324    ) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
1325        debug!(
1326            task_id = id,
1327            task_name = state.task.name(),
1328            document = state.document.uri().as_str(),
1329            "evaluating runtimes section",
1330        );
1331
1332        let mut requirements = HashMap::new();
1333        let mut hints = HashMap::new();
1334
1335        let version = state
1336            .document
1337            .version()
1338            .expect("document should have version");
1339        for item in section.items() {
1340            let name = item.name();
1341            match inputs.requirement(name.text()) {
1342                Some(value) => {
1343                    requirements.insert(name.text().to_string(), value.clone());
1344                    continue;
1345                }
1346                _ => {
1347                    if let Some(value) = inputs.hint(name.text()) {
1348                        hints.insert(name.text().to_string(), value.clone());
1349                        continue;
1350                    }
1351                }
1352            }
1353
1354            let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1355                state,
1356                self.transferer.as_ref(),
1357                ROOT_SCOPE_INDEX,
1358            ));
1359
1360            let (types, requirement) = match task_requirement_types(version, name.text()) {
1361                Some(types) => (Some(types), true),
1362                None => match task_hint_types(version, name.text(), false) {
1363                    Some(types) => (Some(types), false),
1364                    None => (None, false),
1365                },
1366            };
1367
1368            // Evaluate and coerce to the expected type
1369            let expr = item.expr();
1370            let mut value = evaluator.evaluate_expr(&expr).await?;
1371            if let Some(types) = types {
1372                value = types
1373                    .iter()
1374                    .find_map(|ty| {
1375                        value
1376                            .coerce(
1377                                Some(&TaskEvaluationContext::new(
1378                                    state,
1379                                    self.transferer.as_ref(),
1380                                    ROOT_SCOPE_INDEX,
1381                                )),
1382                                ty,
1383                            )
1384                            .ok()
1385                    })
1386                    .ok_or_else(|| {
1387                        multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1388                    })?;
1389            }
1390
1391            if requirement {
1392                requirements.insert(name.text().to_string(), value);
1393            } else {
1394                hints.insert(name.text().to_string(), value);
1395            }
1396        }
1397
1398        Ok((requirements, hints))
1399    }
1400
1401    /// Evaluates the requirements section.
1402    async fn evaluate_requirements_section(
1403        &self,
1404        id: &str,
1405        state: &mut State<'_>,
1406        section: &RequirementsSection<SyntaxNode>,
1407        inputs: &TaskInputs,
1408    ) -> Result<HashMap<String, Value>, Diagnostic> {
1409        debug!(
1410            task_id = id,
1411            task_name = state.task.name(),
1412            document = state.document.uri().as_str(),
1413            "evaluating requirements",
1414        );
1415
1416        let mut requirements = HashMap::new();
1417
1418        let version = state
1419            .document
1420            .version()
1421            .expect("document should have version");
1422        for item in section.items() {
1423            let name = item.name();
1424            if let Some(value) = inputs.requirement(name.text()) {
1425                requirements.insert(name.text().to_string(), value.clone());
1426                continue;
1427            }
1428
1429            let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1430                state,
1431                self.transferer.as_ref(),
1432                ROOT_SCOPE_INDEX,
1433            ));
1434
1435            let types =
1436                task_requirement_types(version, name.text()).expect("requirement should be known");
1437
1438            // Evaluate and coerce to the expected type
1439            let expr = item.expr();
1440            let value = evaluator.evaluate_expr(&expr).await?;
1441            let value = types
1442                .iter()
1443                .find_map(|ty| {
1444                    value
1445                        .coerce(
1446                            Some(&TaskEvaluationContext::new(
1447                                state,
1448                                self.transferer.as_ref(),
1449                                ROOT_SCOPE_INDEX,
1450                            )),
1451                            ty,
1452                        )
1453                        .ok()
1454                })
1455                .ok_or_else(|| {
1456                    multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1457                })?;
1458
1459            requirements.insert(name.text().to_string(), value);
1460        }
1461
1462        Ok(requirements)
1463    }
1464
1465    /// Evaluates the hints section.
1466    async fn evaluate_hints_section(
1467        &self,
1468        id: &str,
1469        state: &mut State<'_>,
1470        section: &TaskHintsSection<SyntaxNode>,
1471        inputs: &TaskInputs,
1472    ) -> Result<HashMap<String, Value>, Diagnostic> {
1473        debug!(
1474            task_id = id,
1475            task_name = state.task.name(),
1476            document = state.document.uri().as_str(),
1477            "evaluating hints section",
1478        );
1479
1480        let mut hints = HashMap::new();
1481
1482        for item in section.items() {
1483            let name = item.name();
1484            if let Some(value) = inputs.hint(name.text()) {
1485                hints.insert(name.text().to_string(), value.clone());
1486                continue;
1487            }
1488
1489            let mut evaluator = ExprEvaluator::new(
1490                TaskEvaluationContext::new(state, self.transferer.as_ref(), ROOT_SCOPE_INDEX)
1491                    .with_task(),
1492            );
1493
1494            let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
1495            hints.insert(name.text().to_string(), value);
1496        }
1497
1498        Ok(hints)
1499    }
1500
1501    /// Evaluates the command of a task.
1502    ///
1503    /// Returns the evaluated command as a string.
1504    async fn evaluate_command(
1505        &self,
1506        id: &str,
1507        state: &mut State<'_>,
1508        section: &CommandSection<SyntaxNode>,
1509    ) -> EvaluationResult<String> {
1510        debug!(
1511            task_id = id,
1512            task_name = state.task.name(),
1513            document = state.document.uri().as_str(),
1514            "evaluating command section",
1515        );
1516
1517        let document = state.document.clone();
1518        let mut command = String::new();
1519        match section.strip_whitespace() {
1520            Some(parts) => {
1521                let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1522                    state,
1523                    self.transferer.as_ref(),
1524                    TASK_SCOPE_INDEX,
1525                ));
1526
1527                for part in parts {
1528                    match part {
1529                        StrippedCommandPart::Text(t) => {
1530                            command.push_str(t.as_str());
1531                        }
1532                        StrippedCommandPart::Placeholder(placeholder) => {
1533                            evaluator
1534                                .evaluate_placeholder(&placeholder, &mut command)
1535                                .await
1536                                .map_err(|d| EvaluationError::new(document.clone(), d))?;
1537                        }
1538                    }
1539                }
1540            }
1541            _ => {
1542                warn!(
1543                    "command for task `{task}` in `{uri}` has mixed indentation; whitespace \
1544                     stripping was skipped",
1545                    task = state.task.name(),
1546                    uri = state.document.uri(),
1547                );
1548
1549                let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1550                    state,
1551                    self.transferer.as_ref(),
1552                    TASK_SCOPE_INDEX,
1553                ));
1554
1555                let heredoc = section.is_heredoc();
1556                for part in section.parts() {
1557                    match part {
1558                        CommandPart::Text(t) => {
1559                            t.unescape_to(heredoc, &mut command);
1560                        }
1561                        CommandPart::Placeholder(placeholder) => {
1562                            evaluator
1563                                .evaluate_placeholder(&placeholder, &mut command)
1564                                .await
1565                                .map_err(|d| EvaluationError::new(document.clone(), d))?;
1566                        }
1567                    }
1568                }
1569            }
1570        }
1571
1572        Ok(command)
1573    }
1574
1575    /// Evaluates sections prior to spawning the command.
1576    ///
1577    /// This method evaluates the following sections:
1578    ///   * runtime
1579    ///   * requirements
1580    ///   * hints
1581    ///   * command
1582    async fn evaluate_sections(
1583        &self,
1584        id: &str,
1585        state: &mut State<'_>,
1586        definition: &TaskDefinition<SyntaxNode>,
1587        inputs: &TaskInputs,
1588        attempt: u64,
1589    ) -> EvaluationResult<EvaluatedSections> {
1590        // Start by evaluating requirements and hints
1591        let (requirements, hints) = match definition.runtime() {
1592            Some(section) => self
1593                .evaluate_runtime_section(id, state, &section, inputs)
1594                .await
1595                .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1596            _ => (
1597                match definition.requirements() {
1598                    Some(section) => self
1599                        .evaluate_requirements_section(id, state, &section, inputs)
1600                        .await
1601                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1602                    None => Default::default(),
1603                },
1604                match definition.hints() {
1605                    Some(section) => self
1606                        .evaluate_hints_section(id, state, &section, inputs)
1607                        .await
1608                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1609                    None => Default::default(),
1610                },
1611            ),
1612        };
1613
1614        // Update or insert the `task` variable in the task scope
1615        // TODO: if task variables become visible in `requirements` or `hints` section,
1616        // this needs to be relocated to before we evaluate those sections
1617        if state.document.version() >= Some(SupportedVersion::V1(V1::Two)) {
1618            // Get the execution constraints
1619            let constraints = self
1620                .backend
1621                .constraints(&requirements, &hints)
1622                .with_context(|| {
1623                    format!(
1624                        "failed to get constraints for task `{task}`",
1625                        task = state.task.name()
1626                    )
1627                })?;
1628
1629            let task = TaskValue::new_v1(
1630                state.task.name(),
1631                id,
1632                definition,
1633                constraints,
1634                attempt.try_into().with_context(|| {
1635                    format!(
1636                        "too many attempts were made to run task `{task}`",
1637                        task = state.task.name()
1638                    )
1639                })?,
1640            );
1641
1642            let scope = &mut state.scopes[TASK_SCOPE_INDEX.0];
1643            if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1644                *v = Value::Task(task);
1645            } else {
1646                scope.insert(TASK_VAR_NAME, Value::Task(task));
1647            }
1648        }
1649
1650        let command = self
1651            .evaluate_command(
1652                id,
1653                state,
1654                &definition.command().expect("must have command section"),
1655            )
1656            .await?;
1657
1658        Ok(EvaluatedSections {
1659            command,
1660            requirements: Arc::new(requirements),
1661            hints: Arc::new(hints),
1662        })
1663    }
1664
1665    /// Evaluates a task output.
1666    async fn evaluate_output(
1667        &self,
1668        id: &str,
1669        state: &mut State<'_>,
1670        decl: &Decl<SyntaxNode>,
1671        evaluated: &EvaluatedTask,
1672    ) -> Result<(), Diagnostic> {
1673        let name = decl.name();
1674        debug!(
1675            task_id = id,
1676            task_name = state.task.name(),
1677            document = state.document.uri().as_str(),
1678            output_name = name.text(),
1679            "evaluating output",
1680        );
1681
1682        let decl_ty = decl.ty();
1683        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1684        let mut evaluator = ExprEvaluator::new(
1685            TaskEvaluationContext::new(state, self.transferer.as_ref(), TASK_SCOPE_INDEX)
1686                .with_work_dir(&evaluated.result.work_dir)
1687                .with_stdout(&evaluated.result.stdout)
1688                .with_stderr(&evaluated.result.stderr),
1689        );
1690
1691        let expr = decl.expr().expect("outputs should have expressions");
1692        let value = evaluator.evaluate_expr(&expr).await?;
1693
1694        // Coerce the output value to the expected type
1695        let mut value = value
1696            .coerce(Some(evaluator.context()), &ty)
1697            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1698
1699        value
1700            .ensure_paths_exist(
1701                ty.is_optional(),
1702                state.base_dir.as_local(),
1703                Some(self.transferer.as_ref()),
1704                &|path| {
1705                    // Join the path with the work directory.
1706                    let mut output_path = evaluated.result.work_dir.join(path.as_str())?;
1707
1708                    // Ensure the output's path is valid
1709                    let output_path = match (&mut output_path, &evaluated.result.work_dir) {
1710                        (EvaluationPath::Local(joined), EvaluationPath::Local(base))
1711                            if joined.starts_with(base)
1712                                || joined.starts_with(&evaluated.attempt_dir) =>
1713                        {
1714                            // The joined path is contained within the work directory or attempt
1715                            // directory
1716                            HostPath::new(String::try_from(output_path)?)
1717                        }
1718                        (EvaluationPath::Local(_), EvaluationPath::Local(_)) => {
1719                            // The joined path is not within the work or attempt directory;
1720                            // therefore, it is required to be an input
1721                            state
1722                                .path_map
1723                                .get_by_left(path)
1724                                .ok_or_else(|| {
1725                                    anyhow!(
1726                                        "guest path `{path}` is not an input or within the task's \
1727                                         working directory"
1728                                    )
1729                                })?
1730                                .0
1731                                .clone()
1732                                .into()
1733                        }
1734                        (EvaluationPath::Local(_), EvaluationPath::Remote(_)) => {
1735                            // Path is local (and absolute) and the working directory is remote
1736                            bail!(
1737                                "cannot access guest path `{path}` from a remotely executing task"
1738                            )
1739                        }
1740                        (EvaluationPath::Remote(_), _) => {
1741                            HostPath::new(String::try_from(output_path)?)
1742                        }
1743                    };
1744
1745                    *path = output_path;
1746                    Ok(())
1747                },
1748            )
1749            .await
1750            .map_err(|e| {
1751                decl_evaluation_failed(
1752                    e,
1753                    state.task.name(),
1754                    true,
1755                    name.text(),
1756                    Some(Io::Output),
1757                    name.span(),
1758                )
1759            })?;
1760
1761        state.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
1762        Ok(())
1763    }
1764
1765    /// Localizes inputs for execution.
1766    ///
1767    /// Returns the inputs to pass to the backend.
1768    async fn localize_inputs(
1769        &self,
1770        task_id: &str,
1771        state: &mut State<'_>,
1772    ) -> EvaluationResult<Vec<Input>> {
1773        // If the backend needs local inputs, download them now
1774        if self.backend.needs_local_inputs() {
1775            let mut downloads = JoinSet::new();
1776
1777            // Download any necessary files
1778            for (idx, input) in state.backend_inputs.as_slice_mut().iter_mut().enumerate() {
1779                if input.local_path().is_some() {
1780                    continue;
1781                }
1782
1783                if let EvaluationPath::Remote(url) = input.path() {
1784                    let transferer = self.transferer.clone();
1785                    let url = url.clone();
1786                    downloads.spawn(async move {
1787                        transferer
1788                            .download(&url)
1789                            .await
1790                            .map(|l| (idx, l))
1791                            .with_context(|| anyhow!("failed to localize `{url}`"))
1792                    });
1793                }
1794            }
1795
1796            // Wait for the downloads to complete
1797            while let Some(result) = downloads.join_next().await {
1798                match result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}"))) {
1799                    Ok((idx, location)) => {
1800                        state.backend_inputs.as_slice_mut()[idx].set_location(location);
1801                    }
1802                    Err(e) => {
1803                        return Err(EvaluationError::new(
1804                            state.document.clone(),
1805                            task_localization_failed(e, state.task.name(), state.task.name_span()),
1806                        ));
1807                    }
1808                }
1809            }
1810        }
1811
1812        if enabled!(Level::DEBUG) {
1813            for input in state.backend_inputs.as_slice() {
1814                match (
1815                    input.path().as_local().is_some(),
1816                    input.local_path(),
1817                    input.guest_path(),
1818                ) {
1819                    // Input is unmapped and either local or remote and not downloaded
1820                    (true, _, None) | (false, None, None) => {}
1821                    // Input is local and was mapped to a guest path
1822                    (true, _, Some(guest_path)) => {
1823                        debug!(
1824                            task_id,
1825                            task_name = state.task.name(),
1826                            document = state.document.uri().as_str(),
1827                            "task input `{path}` mapped to `{guest_path}`",
1828                            path = input.path().display(),
1829                        );
1830                    }
1831                    // Input is remote and was downloaded to a local path
1832                    (false, Some(local_path), None) => {
1833                        debug!(
1834                            task_id,
1835                            task_name = state.task.name(),
1836                            document = state.document.uri().as_str(),
1837                            "task input `{path}` downloaded to `{local_path}`",
1838                            path = input.path().display(),
1839                            local_path = local_path.display()
1840                        );
1841                    }
1842                    // Input is remote and was not downloaded, but mapped to a guest path
1843                    (false, None, Some(guest_path)) => {
1844                        debug!(
1845                            task_id,
1846                            task_name = state.task.name(),
1847                            document = state.document.uri().as_str(),
1848                            "task input `{path}` mapped to `{guest_path}`",
1849                            path = input.path().display(),
1850                        );
1851                    }
1852                    // Input is remote and was both downloaded and mapped to a guest path
1853                    (false, Some(local_path), Some(guest_path)) => {
1854                        debug!(
1855                            task_id,
1856                            task_name = state.task.name(),
1857                            document = state.document.uri().as_str(),
1858                            "task input `{path}` downloaded to `{local_path}` and mapped to \
1859                             `{guest_path}`",
1860                            path = input.path().display(),
1861                            local_path = local_path.display(),
1862                        );
1863                    }
1864                }
1865            }
1866        }
1867
1868        Ok(state.backend_inputs.as_slice().into())
1869    }
1870}