wdl_engine/eval/v1/
task.rs

1//! Implementation of evaluation for V1 tasks.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::mem;
7use std::path::Path;
8use std::path::absolute;
9use std::str::FromStr;
10use std::sync::Arc;
11
12use anyhow::Context;
13use anyhow::Result;
14use anyhow::anyhow;
15use anyhow::bail;
16use bimap::BiHashMap;
17use indexmap::IndexMap;
18use petgraph::algo::toposort;
19use tokio::task::JoinSet;
20use tokio_util::sync::CancellationToken;
21use tracing::Level;
22use tracing::debug;
23use tracing::enabled;
24use tracing::info;
25use tracing::warn;
26use wdl_analysis::Document;
27use wdl_analysis::diagnostics::Io;
28use wdl_analysis::diagnostics::multiple_type_mismatch;
29use wdl_analysis::diagnostics::unknown_name;
30use wdl_analysis::document::TASK_VAR_NAME;
31use wdl_analysis::document::Task;
32use wdl_analysis::eval::v1::TaskGraphBuilder;
33use wdl_analysis::eval::v1::TaskGraphNode;
34use wdl_analysis::types::Optional;
35use wdl_analysis::types::PrimitiveType;
36use wdl_analysis::types::Type;
37use wdl_analysis::types::v1::task_hint_types;
38use wdl_analysis::types::v1::task_requirement_types;
39use wdl_ast::Ast;
40use wdl_ast::AstNode;
41use wdl_ast::AstToken;
42use wdl_ast::Diagnostic;
43use wdl_ast::Span;
44use wdl_ast::SupportedVersion;
45use wdl_ast::v1::CommandPart;
46use wdl_ast::v1::CommandSection;
47use wdl_ast::v1::Decl;
48use wdl_ast::v1::RequirementsSection;
49use wdl_ast::v1::RuntimeSection;
50use wdl_ast::v1::StrippedCommandPart;
51use wdl_ast::v1::TASK_HINT_DISKS;
52use wdl_ast::v1::TASK_HINT_MAX_CPU;
53use wdl_ast::v1::TASK_HINT_MAX_CPU_ALIAS;
54use wdl_ast::v1::TASK_HINT_MAX_MEMORY;
55use wdl_ast::v1::TASK_HINT_MAX_MEMORY_ALIAS;
56use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER;
57use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER_ALIAS;
58use wdl_ast::v1::TASK_REQUIREMENT_CPU;
59use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
60use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES;
61use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES_ALIAS;
62use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
63use wdl_ast::v1::TaskDefinition;
64use wdl_ast::v1::TaskHintsSection;
65use wdl_ast::version::V1;
66
67use crate::Coercible;
68use crate::EvaluationContext;
69use crate::EvaluationError;
70use crate::EvaluationResult;
71use crate::Events;
72use crate::GuestPath;
73use crate::HostPath;
74use crate::Input;
75use crate::InputKind;
76use crate::ONE_GIBIBYTE;
77use crate::Outputs;
78use crate::PrimitiveValue;
79use crate::Scope;
80use crate::ScopeIndex;
81use crate::ScopeRef;
82use crate::StorageUnit;
83use crate::TaskExecutionBackend;
84use crate::TaskInputs;
85use crate::TaskSpawnInfo;
86use crate::TaskSpawnRequest;
87use crate::TaskValue;
88use crate::Value;
89use crate::config::Config;
90use crate::config::MAX_RETRIES;
91use crate::convert_unit_string;
92use crate::diagnostics::decl_evaluation_failed;
93use crate::diagnostics::runtime_type_mismatch;
94use crate::diagnostics::task_execution_failed;
95use crate::diagnostics::task_localization_failed;
96use crate::eval::EvaluatedTask;
97use crate::eval::trie::InputTrie;
98use crate::http::HttpTransferer;
99use crate::http::Transferer;
100use crate::path::EvaluationPath;
101use crate::path::is_file_url;
102use crate::path::is_url;
103use crate::tree::SyntaxNode;
104use crate::v1::ExprEvaluator;
105use crate::v1::INPUTS_FILE;
106use crate::v1::OUTPUTS_FILE;
107use crate::v1::write_json_file;
108
109/// The default container requirement.
110pub const DEFAULT_TASK_REQUIREMENT_CONTAINER: &str = "ubuntu:latest";
111/// The default value for the `cpu` requirement.
112pub const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
113/// The default value for the `memory` requirement.
114pub const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * (ONE_GIBIBYTE as i64);
115/// The default value for the `max_retries` requirement.
116pub const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
117/// The default value for the `disks` requirement (in GiB).
118pub const DEFAULT_TASK_REQUIREMENT_DISKS: f64 = 1.0;
119
120/// The index of a task's root scope.
121const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
122/// The index of a task's output scope.
123const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
124/// The index of the evaluation scope where the WDL 1.2 `task` variable is
125/// visible.
126const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
127
128/// Gets the `container` requirement from a requirements map.
129pub(crate) fn container<'a>(
130    requirements: &'a HashMap<String, Value>,
131    default: Option<&'a str>,
132) -> Cow<'a, str> {
133    requirements
134        .get(TASK_REQUIREMENT_CONTAINER)
135        .or_else(|| requirements.get(TASK_REQUIREMENT_CONTAINER_ALIAS))
136        .and_then(|v| -> Option<Cow<'_, str>> {
137            // If the value is an array, use the first element or the default
138            // Note: in the future we should be resolving which element in the array is
139            // usable; this will require some work in Crankshaft to enable
140            if let Some(array) = v.as_array() {
141                return array.as_slice().first().map(|v| {
142                    v.as_string()
143                        .expect("type should be string")
144                        .as_ref()
145                        .into()
146                });
147            }
148
149            Some(
150                v.coerce(None, &PrimitiveType::String.into())
151                    .expect("type should coerce")
152                    .unwrap_string()
153                    .as_ref()
154                    .clone()
155                    .into(),
156            )
157        })
158        .and_then(|v| {
159            // Treat star as the default
160            if v == "*" { None } else { Some(v) }
161        })
162        .unwrap_or_else(|| {
163            default
164                .map(Into::into)
165                .unwrap_or(DEFAULT_TASK_REQUIREMENT_CONTAINER.into())
166        })
167}
168
169/// Gets the `cpu` requirement from a requirements map.
170pub(crate) fn cpu(requirements: &HashMap<String, Value>) -> f64 {
171    requirements
172        .get(TASK_REQUIREMENT_CPU)
173        .map(|v| {
174            v.coerce(None, &PrimitiveType::Float.into())
175                .expect("type should coerce")
176                .unwrap_float()
177        })
178        .unwrap_or(DEFAULT_TASK_REQUIREMENT_CPU)
179}
180
181/// Gets the `max_cpu` hint from a hints map.
182pub(crate) fn max_cpu(hints: &HashMap<String, Value>) -> Option<f64> {
183    hints
184        .get(TASK_HINT_MAX_CPU)
185        .or_else(|| hints.get(TASK_HINT_MAX_CPU_ALIAS))
186        .map(|v| {
187            v.coerce(None, &PrimitiveType::Float.into())
188                .expect("type should coerce")
189                .unwrap_float()
190        })
191}
192
193/// Gets the `memory` requirement from a requirements map.
194pub(crate) fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
195    Ok(requirements
196        .get(TASK_REQUIREMENT_MEMORY)
197        .map(|v| {
198            if let Some(v) = v.as_integer() {
199                return Ok(v);
200            }
201
202            if let Some(s) = v.as_string() {
203                return convert_unit_string(s)
204                    .and_then(|v| v.try_into().ok())
205                    .with_context(|| {
206                        format!("task specifies an invalid `memory` requirement `{s}`")
207                    });
208            }
209
210            unreachable!("value should be an integer or string");
211        })
212        .transpose()?
213        .unwrap_or(DEFAULT_TASK_REQUIREMENT_MEMORY))
214}
215
216/// Gets the `max_memory` hint from a hints map.
217pub(crate) fn max_memory(hints: &HashMap<String, Value>) -> Result<Option<i64>> {
218    hints
219        .get(TASK_HINT_MAX_MEMORY)
220        .or_else(|| hints.get(TASK_HINT_MAX_MEMORY_ALIAS))
221        .map(|v| {
222            if let Some(v) = v.as_integer() {
223                return Ok(v);
224            }
225
226            if let Some(s) = v.as_string() {
227                return convert_unit_string(s)
228                    .and_then(|v| v.try_into().ok())
229                    .with_context(|| {
230                        format!("task specifies an invalid `memory` requirement `{s}`")
231                    });
232            }
233
234            unreachable!("value should be an integer or string");
235        })
236        .transpose()
237}
238
239/// Represents the type of a disk.
240///
241/// Disk types are specified via hints.
242#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
243pub enum DiskType {
244    /// The disk type is a solid state drive.
245    SSD,
246    /// The disk type is a hard disk drive.
247    HDD,
248}
249
250impl FromStr for DiskType {
251    type Err = ();
252
253    fn from_str(s: &str) -> Result<Self, Self::Err> {
254        match s {
255            "SSD" => Ok(Self::SSD),
256            "HDD" => Ok(Self::HDD),
257            _ => Err(()),
258        }
259    }
260}
261
262/// Represents a task disk requirement.
263pub struct DiskRequirement {
264    /// The size of the disk, in GiB.
265    pub size: i64,
266
267    /// The disk type as specified by a corresponding task hint.
268    pub ty: Option<DiskType>,
269}
270
271/// Gets the `disks` requirement.
272///
273/// Upon success, returns a mapping of mount point to disk requirement.
274pub(crate) fn disks<'a>(
275    requirements: &'a HashMap<String, Value>,
276    hints: &HashMap<String, Value>,
277) -> Result<HashMap<&'a str, DiskRequirement>> {
278    /// Helper for looking up a disk type from the hints.
279    ///
280    /// If we don't recognize the specification, we ignore it.
281    fn lookup_type(mount_point: Option<&str>, hints: &HashMap<String, Value>) -> Option<DiskType> {
282        hints.get(TASK_HINT_DISKS).and_then(|v| {
283            if let Some(ty) = v.as_string() {
284                return ty.parse().ok();
285            }
286
287            if let Some(map) = v.as_map() {
288                // Find the corresponding key; we have to scan the keys because the map is
289                // storing primitive values
290                if let Some((_, v)) = map.iter().find(|(k, _)| match (k, mount_point) {
291                    (None, None) => true,
292                    (None, Some(_)) | (Some(_), None) => false,
293                    (Some(k), Some(mount_point)) => k
294                        .as_string()
295                        .map(|k| k.as_str() == mount_point)
296                        .unwrap_or(false),
297                }) {
298                    return v.as_string().and_then(|ty| ty.parse().ok());
299                }
300            }
301
302            None
303        })
304    }
305
306    /// Parses a disk specification into a size (in GiB) and optional mount
307    /// point.
308    fn parse_disk_spec(spec: &str) -> Option<(i64, Option<&str>)> {
309        let iter = spec.split_whitespace();
310        let mut first = None;
311        let mut second = None;
312        let mut third = None;
313
314        for part in iter {
315            if first.is_none() {
316                first = Some(part);
317                continue;
318            }
319
320            if second.is_none() {
321                second = Some(part);
322                continue;
323            }
324
325            if third.is_none() {
326                third = Some(part);
327                continue;
328            }
329
330            return None;
331        }
332
333        match (first, second, third) {
334            (None, None, None) => None,
335            (Some(size), None, None) => {
336                // Specification is `<size>` (in GiB)
337                Some((size.parse().ok()?, None))
338            }
339            (Some(first), Some(second), None) => {
340                // Check for `<size> <unit>`; convert from the specified unit to GiB
341                if let Ok(size) = first.parse() {
342                    let unit: StorageUnit = second.parse().ok()?;
343                    let size = unit.bytes(size)? / (ONE_GIBIBYTE as u64);
344                    return Some((size.try_into().ok()?, None));
345                }
346
347                // Specification is `<mount-point> <size>` (where size is already in GiB)
348                // The mount point must be absolute, i.e. start with `/`
349                if !first.starts_with('/') {
350                    return None;
351                }
352
353                Some((second.parse().ok()?, Some(first)))
354            }
355            (Some(mount_point), Some(size), Some(unit)) => {
356                // Specification is `<mount-point> <size> <units>`
357                let unit: StorageUnit = unit.parse().ok()?;
358                let size = unit.bytes(size.parse().ok()?)? / (ONE_GIBIBYTE as u64);
359
360                // Mount point must be absolute
361                if !mount_point.starts_with('/') {
362                    return None;
363                }
364
365                Some((size.try_into().ok()?, Some(mount_point)))
366            }
367            _ => unreachable!("should have one, two, or three values"),
368        }
369    }
370
371    /// Inserts a disk into the disks map.
372    fn insert_disk<'a>(
373        spec: &'a str,
374        hints: &HashMap<String, Value>,
375        disks: &mut HashMap<&'a str, DiskRequirement>,
376    ) -> Result<()> {
377        let (size, mount_point) =
378            parse_disk_spec(spec).with_context(|| format!("invalid disk specification `{spec}"))?;
379
380        let prev = disks.insert(
381            mount_point.unwrap_or("/"),
382            DiskRequirement {
383                size,
384                ty: lookup_type(mount_point, hints),
385            },
386        );
387
388        if prev.is_some() {
389            bail!(
390                "duplicate mount point `{mp}` specified in `disks` requirement",
391                mp = mount_point.unwrap_or("/")
392            );
393        }
394
395        Ok(())
396    }
397
398    let mut disks = HashMap::new();
399    if let Some(v) = requirements.get(TASK_REQUIREMENT_DISKS) {
400        if let Some(size) = v.as_integer() {
401            // Disk spec is just the size (in GiB)
402            if size < 0 {
403                bail!("task requirement `disks` cannot be less than zero");
404            }
405
406            disks.insert(
407                "/",
408                DiskRequirement {
409                    size,
410                    ty: lookup_type(None, hints),
411                },
412            );
413        } else if let Some(spec) = v.as_string() {
414            insert_disk(spec, hints, &mut disks)?;
415        } else if let Some(v) = v.as_array() {
416            for spec in v.as_slice() {
417                insert_disk(
418                    spec.as_string().expect("spec should be a string"),
419                    hints,
420                    &mut disks,
421                )?;
422            }
423        } else {
424            unreachable!("value should be an integer, string, or array");
425        }
426    }
427
428    Ok(disks)
429}
430
431/// Gets the `preemptible` hint from a hints map.
432///
433/// This hint is not part of the WDL standard but is used for compatibility with
434/// Cromwell where backends can support preemptible retries before using
435/// dedicated instances.
436pub(crate) fn preemptible(hints: &HashMap<String, Value>) -> i64 {
437    const TASK_HINT_PREEMPTIBLE: &str = "preemptible";
438    const DEFAULT_TASK_HINT_PREEMPTIBLE: i64 = 0;
439
440    hints
441        .get(TASK_HINT_PREEMPTIBLE)
442        .and_then(|v| {
443            Some(
444                v.coerce(None, &PrimitiveType::Integer.into())
445                    .ok()?
446                    .unwrap_integer(),
447            )
448        })
449        .unwrap_or(DEFAULT_TASK_HINT_PREEMPTIBLE)
450}
451
452/// Used to evaluate expressions in tasks.
453struct TaskEvaluationContext<'a, 'b> {
454    /// The associated evaluation state.
455    state: &'a mut State<'b>,
456    /// The transferer to use for expression evaluation.
457    transferer: &'a dyn Transferer,
458    /// The current evaluation scope.
459    scope: ScopeIndex,
460    /// The task work directory.
461    ///
462    /// This is `None` unless the output section is being evaluated.
463    work_dir: Option<&'a EvaluationPath>,
464    /// The standard out value to use.
465    ///
466    /// This field is only available after task execution.
467    stdout: Option<&'a Value>,
468    /// The standard error value to use.
469    ///
470    /// This field is only available after task execution.
471    stderr: Option<&'a Value>,
472    /// Whether or not the evaluation has associated task information.
473    ///
474    /// This is `true` when evaluating hints sections.
475    task: bool,
476}
477
478impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
479    /// Constructs a new expression evaluation context.
480    pub fn new(
481        state: &'a mut State<'b>,
482        transferer: &'a dyn Transferer,
483        scope: ScopeIndex,
484    ) -> Self {
485        Self {
486            state,
487            transferer,
488            scope,
489            work_dir: None,
490            stdout: None,
491            stderr: None,
492            task: false,
493        }
494    }
495
496    /// Sets the task's work directory to use for the evaluation context.
497    pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
498        self.work_dir = Some(work_dir);
499        self
500    }
501
502    /// Sets the stdout value to use for the evaluation context.
503    pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
504        self.stdout = Some(stdout);
505        self
506    }
507
508    /// Sets the stderr value to use for the evaluation context.
509    pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
510        self.stderr = Some(stderr);
511        self
512    }
513
514    /// Marks the evaluation as having associated task information.
515    ///
516    /// This is used in evaluating hints sections.
517    pub fn with_task(mut self) -> Self {
518        self.task = true;
519        self
520    }
521}
522
523impl EvaluationContext for TaskEvaluationContext<'_, '_> {
524    fn version(&self) -> SupportedVersion {
525        self.state
526            .document
527            .version()
528            .expect("document should have a version")
529    }
530
531    fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
532        ScopeRef::new(&self.state.scopes, self.scope)
533            .lookup(name)
534            .cloned()
535            .ok_or_else(|| unknown_name(name, span))
536    }
537
538    fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
539        crate::resolve_type_name(self.state.document, name, span)
540    }
541
542    fn base_dir(&self) -> &EvaluationPath {
543        self.work_dir.unwrap_or(&self.state.base_dir)
544    }
545
546    fn temp_dir(&self) -> &Path {
547        self.state.temp_dir
548    }
549
550    fn stdout(&self) -> Option<&Value> {
551        self.stdout
552    }
553
554    fn stderr(&self) -> Option<&Value> {
555        self.stderr
556    }
557
558    fn task(&self) -> Option<&Task> {
559        if self.task {
560            Some(self.state.task)
561        } else {
562            None
563        }
564    }
565
566    fn transferer(&self) -> &dyn Transferer {
567        self.transferer
568    }
569
570    fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
571        self.state.path_map.get_by_right(path).cloned()
572    }
573
574    fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
575        self.state.path_map.get_by_left(path).cloned()
576    }
577
578    fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
579        self.state.insert_backend_input(InputKind::File, path)?;
580        Ok(())
581    }
582}
583
584/// Represents task evaluation state.
585struct State<'a> {
586    /// The temp directory.
587    temp_dir: &'a Path,
588    /// The base directory for evaluation.
589    ///
590    /// This is the document's directory.
591    ///
592    /// When outputs are evaluated, the task's work directory is used as the
593    /// base directory.
594    base_dir: EvaluationPath,
595    /// The document containing the workflow being evaluated.
596    document: &'a Document,
597    /// The task being evaluated.
598    task: &'a Task,
599    /// The scopes of the task being evaluated.
600    ///
601    /// The first scope is the root scope, the second is the output scope, and
602    /// the third is the scope where the "task" variable is visible in 1.2+
603    /// evaluations.
604    scopes: [Scope; 3],
605    /// The environment variables of the task.
606    ///
607    /// Environment variables do not change between retries.
608    env: IndexMap<String, String>,
609    /// The trie for mapping backend inputs.
610    backend_inputs: InputTrie,
611    /// A bi-map of host paths and guest paths.
612    path_map: BiHashMap<HostPath, GuestPath>,
613}
614
615impl<'a> State<'a> {
616    /// Constructs a new task evaluation state.
617    fn new(
618        document: &'a Document,
619        task: &'a Task,
620        temp_dir: &'a Path,
621        guest_inputs_dir: Option<&'static str>,
622    ) -> Result<Self> {
623        // Tasks have a root scope (index 0), an output scope (index 1), and a `task`
624        // variable scope (index 2). The output scope inherits from the root scope and
625        // the task scope inherits from the output scope. Inputs and private
626        // declarations are evaluated into the root scope. Outputs are evaluated into
627        // the output scope. The task scope is used for evaluating expressions in both
628        // the command and output sections. Only the `task` variable in WDL 1.2 is
629        // introduced into the task scope; in previous WDL versions, the task scope will
630        // not have any local names.
631        let scopes = [
632            Scope::default(),
633            Scope::new(ROOT_SCOPE_INDEX),
634            Scope::new(OUTPUT_SCOPE_INDEX),
635        ];
636
637        let backend_inputs = if let Some(guest_inputs_dir) = guest_inputs_dir {
638            InputTrie::new_with_guest_dir(guest_inputs_dir)
639        } else {
640            InputTrie::new()
641        };
642
643        let document_path = document.path();
644        let mut base_dir = EvaluationPath::parent_of(&document_path).with_context(|| {
645            format!("document `{document_path}` does not have a parent directory")
646        })?;
647
648        base_dir.make_absolute();
649
650        Ok(Self {
651            temp_dir,
652            base_dir,
653            document,
654            task,
655            scopes,
656            env: Default::default(),
657            backend_inputs,
658            path_map: Default::default(),
659        })
660    }
661
662    /// Adds backend inputs to the state for any `File` or `Directory` values
663    /// referenced by the given value.
664    ///
665    /// If the backend doesn't use containers, remote inputs are immediately
666    /// localized.
667    ///
668    /// If the backend does use containers, remote inputs are localized during
669    /// the call to `localize_inputs`.
670    ///
671    /// This method also ensures that a `File` or `Directory` paths exist for
672    /// WDL 1.2+.
673    async fn add_backend_inputs(
674        &mut self,
675        is_optional: bool,
676        value: &mut Value,
677        transferer: &Arc<dyn Transferer>,
678        needs_local_inputs: bool,
679    ) -> Result<()> {
680        let mut urls = Vec::new();
681        value.visit_paths_mut(is_optional, &mut |optional, value| {
682            // Ensure the path exists before we translate it (1.2+ behavior)
683            if self
684                .document
685                .version()
686                .expect("document should have a version")
687                >= SupportedVersion::V1(V1::Two)
688                && !value.ensure_path_exists(optional, self.base_dir.as_local())?
689            {
690                // Return `Ok(false)` to the caller to replace the optional value with `None`
691                // We don't need to insert a backend input for a `None` value
692                return Ok(false);
693            }
694
695            let (kind, path) = match value {
696                PrimitiveValue::File(path) => (InputKind::File, path),
697                PrimitiveValue::Directory(path) => (InputKind::Directory, path),
698                _ => unreachable!("only file and directory values should be visited"),
699            };
700
701            // Insert a backend input for the path
702            if let Some(index) = self.insert_backend_input(kind, path)? {
703                // Check to see if there's no guest path for a remote URL that needs to be
704                // localized; if so, we must localize it now
705                if needs_local_inputs
706                    && self.backend_inputs.as_slice()[index].guest_path.is_none()
707                    && is_url(path.as_str())
708                    && !is_file_url(path.as_str())
709                {
710                    urls.push((path.clone(), index));
711                }
712            }
713
714            Ok(true)
715        })?;
716
717        if urls.is_empty() {
718            return Ok(());
719        }
720
721        // Download any necessary files
722        let mut downloads = JoinSet::new();
723        for (url, index) in urls {
724            let transferer = transferer.clone();
725            downloads.spawn(async move {
726                transferer
727                    .download(
728                        &url.as_str()
729                            .parse()
730                            .with_context(|| format!("invalid URL `{url}`"))?,
731                    )
732                    .await
733                    .with_context(|| anyhow!("failed to localize `{url}`"))
734                    .map(|l| (url, l, index))
735            });
736        }
737
738        // Wait for the downloads to complete
739        while let Some(result) = downloads.join_next().await {
740            let (url, location, index) =
741                result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}")))?;
742
743            let guest_path = GuestPath::new(location.to_str().with_context(|| {
744                format!(
745                    "download location `{location}` is not UTF-8",
746                    location = location.display()
747                )
748            })?);
749
750            // Map the URL to the guest path
751            self.path_map.insert(url, guest_path);
752
753            // Finally, set the location of the input
754            self.backend_inputs.as_slice_mut()[index].set_location(location);
755        }
756
757        Ok(())
758    }
759
760    /// Inserts a backend input into the state.
761    ///
762    /// Responsible for mapping host and guest paths.
763    fn insert_backend_input(&mut self, kind: InputKind, path: &HostPath) -> Result<Option<usize>> {
764        // Insert an input for the path
765        if let Some(index) = self
766            .backend_inputs
767            .insert(kind, path.as_str(), &self.base_dir)?
768        {
769            // If the input has a guest path, map it
770            let input = &self.backend_inputs.as_slice()[index];
771            if let Some(guest_path) = &input.guest_path {
772                self.path_map.insert(path.clone(), guest_path.clone());
773            }
774
775            return Ok(Some(index));
776        }
777
778        Ok(None)
779    }
780}
781
782/// Represents the result of evaluating task sections before execution.
783struct EvaluatedSections {
784    /// The evaluated command.
785    command: String,
786    /// The evaluated requirements.
787    requirements: Arc<HashMap<String, Value>>,
788    /// The evaluated hints.
789    hints: Arc<HashMap<String, Value>>,
790}
791
792/// Represents a WDL V1 task evaluator.
793pub struct TaskEvaluator {
794    /// The associated evaluation configuration.
795    config: Arc<Config>,
796    /// The associated task execution backend.
797    backend: Arc<dyn TaskExecutionBackend>,
798    /// The cancellation token for cancelling task evaluation.
799    token: CancellationToken,
800    /// The transferer to use for expression evaluation.
801    transferer: Arc<dyn Transferer>,
802}
803
804impl TaskEvaluator {
805    /// Constructs a new task evaluator with the given evaluation
806    /// configuration, cancellation token, and events sender.
807    ///
808    /// Returns an error if the configuration isn't valid.
809    pub async fn new(config: Config, token: CancellationToken, events: Events) -> Result<Self> {
810        config.validate()?;
811
812        let config = Arc::new(config);
813        let backend = config.create_backend(events.crankshaft().clone()).await?;
814        let transferer =
815            HttpTransferer::new(config.clone(), token.clone(), events.transfer().clone())?;
816
817        Ok(Self {
818            config,
819            backend,
820            token,
821            transferer: Arc::new(transferer),
822        })
823    }
824
825    /// Creates a new task evaluator with the given configuration, backend,
826    /// cancellation token, and transferer.
827    ///
828    /// This method does not validate the configuration.
829    pub(crate) fn new_unchecked(
830        config: Arc<Config>,
831        backend: Arc<dyn TaskExecutionBackend>,
832        token: CancellationToken,
833        transferer: Arc<dyn Transferer>,
834    ) -> Self {
835        Self {
836            config,
837            backend,
838            token,
839            transferer,
840        }
841    }
842
843    /// Evaluates the given task.
844    ///
845    /// Upon success, returns the evaluated task.
846    pub async fn evaluate(
847        &self,
848        document: &Document,
849        task: &Task,
850        inputs: &TaskInputs,
851        root: impl AsRef<Path>,
852    ) -> EvaluationResult<EvaluatedTask> {
853        // We cannot evaluate a document with errors
854        if document.has_errors() {
855            return Err(anyhow!("cannot evaluate a document with errors").into());
856        }
857
858        self.perform_evaluation(document, task, inputs, root.as_ref(), task.name())
859            .await
860    }
861
862    /// Performs the evaluation of the given task.
863    ///
864    /// This method skips checking the document (and its transitive imports) for
865    /// analysis errors as the check occurs at the `evaluate` entrypoint.
866    pub(crate) async fn perform_evaluation(
867        &self,
868        document: &Document,
869        task: &Task,
870        inputs: &TaskInputs,
871        root: &Path,
872        id: &str,
873    ) -> EvaluationResult<EvaluatedTask> {
874        inputs.validate(document, task, None).with_context(|| {
875            format!(
876                "failed to validate the inputs to task `{task}`",
877                task = task.name()
878            )
879        })?;
880
881        let ast = match document.root().morph().ast() {
882            Ast::V1(ast) => ast,
883            _ => {
884                return Err(
885                    anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
886                );
887            }
888        };
889
890        // Find the task in the AST
891        let definition = ast
892            .tasks()
893            .find(|t| t.name().text() == task.name())
894            .expect("task should exist in the AST");
895
896        let version = document.version().expect("document should have version");
897
898        // Build an evaluation graph for the task
899        let mut diagnostics = Vec::new();
900        let graph = TaskGraphBuilder::default().build(version, &definition, &mut diagnostics);
901        assert!(
902            diagnostics.is_empty(),
903            "task evaluation graph should have no diagnostics"
904        );
905
906        debug!(
907            task_id = id,
908            task_name = task.name(),
909            document = document.uri().as_str(),
910            "evaluating task"
911        );
912
913        let root_dir = absolute(root).with_context(|| {
914            format!(
915                "failed to determine absolute path of `{path}`",
916                path = root.display()
917            )
918        })?;
919
920        // Create the temp directory now as it may be needed for task evaluation
921        let temp_dir = root_dir.join("tmp");
922        fs::create_dir_all(&temp_dir).with_context(|| {
923            format!(
924                "failed to create directory `{path}`",
925                path = temp_dir.display()
926            )
927        })?;
928
929        // Write the inputs to the task's root directory
930        write_json_file(root_dir.join(INPUTS_FILE), inputs)?;
931
932        let mut state = State::new(document, task, &temp_dir, self.backend.guest_inputs_dir())?;
933        let nodes = toposort(&graph, None).expect("graph should be acyclic");
934        let mut current = 0;
935        while current < nodes.len() {
936            match &graph[nodes[current]] {
937                TaskGraphNode::Input(decl) => {
938                    self.evaluate_input(id, &mut state, decl, inputs)
939                        .await
940                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
941                }
942                TaskGraphNode::Decl(decl) => {
943                    self.evaluate_decl(id, &mut state, decl)
944                        .await
945                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
946                }
947                TaskGraphNode::Output(_) => {
948                    // Stop at the first output
949                    break;
950                }
951                TaskGraphNode::Command(_)
952                | TaskGraphNode::Runtime(_)
953                | TaskGraphNode::Requirements(_)
954                | TaskGraphNode::Hints(_) => {
955                    // Skip these sections for now; they'll evaluate in the
956                    // retry loop
957                }
958            }
959
960            current += 1;
961        }
962
963        let env = Arc::new(mem::take(&mut state.env));
964        // Spawn the task in a retry loop
965        let mut attempt = 0;
966        let mut evaluated = loop {
967            let EvaluatedSections {
968                command,
969                requirements,
970                hints,
971            } = self
972                .evaluate_sections(id, &mut state, &definition, inputs, attempt)
973                .await?;
974
975            // Get the maximum number of retries, either from the task's requirements or
976            // from configuration
977            let max_retries = requirements
978                .get(TASK_REQUIREMENT_MAX_RETRIES)
979                .or_else(|| requirements.get(TASK_REQUIREMENT_MAX_RETRIES_ALIAS))
980                .cloned()
981                .map(|v| v.unwrap_integer() as u64)
982                .or_else(|| self.config.task.retries)
983                .unwrap_or(DEFAULT_TASK_REQUIREMENT_MAX_RETRIES);
984
985            if max_retries > MAX_RETRIES {
986                return Err(anyhow!(
987                    "task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
988                )
989                .into());
990            }
991
992            let mut attempt_dir = root_dir.clone();
993            attempt_dir.push("attempts");
994            attempt_dir.push(attempt.to_string());
995
996            let request = TaskSpawnRequest::new(
997                id.to_string(),
998                TaskSpawnInfo::new(
999                    command,
1000                    self.localize_inputs(id, &mut state).await?,
1001                    requirements.clone(),
1002                    hints.clone(),
1003                    env.clone(),
1004                    self.transferer.clone(),
1005                ),
1006                attempt,
1007                attempt_dir.clone(),
1008            );
1009
1010            let result = self
1011                .backend
1012                .spawn(request, self.token.clone())
1013                .with_context(|| {
1014                    format!(
1015                        "failed to spawn task `{name}` in `{path}` (task id `{id}`)",
1016                        name = task.name(),
1017                        path = document.path(),
1018                    )
1019                })?
1020                .await
1021                .expect("failed to receive response from spawned task")
1022                .map_err(|e| {
1023                    EvaluationError::new(
1024                        state.document.clone(),
1025                        task_execution_failed(e, task.name(), id, task.name_span()),
1026                    )
1027                })?;
1028
1029            // Update the task variable
1030            let evaluated = EvaluatedTask::new(attempt_dir, result)?;
1031            if version >= SupportedVersion::V1(V1::Two) {
1032                let task = state.scopes[TASK_SCOPE_INDEX.0]
1033                    .get_mut(TASK_VAR_NAME)
1034                    .unwrap()
1035                    .as_task_mut()
1036                    .unwrap();
1037
1038                task.set_attempt(attempt.try_into().with_context(|| {
1039                    format!(
1040                        "too many attempts were made to run task `{task}`",
1041                        task = state.task.name()
1042                    )
1043                })?);
1044                task.set_return_code(evaluated.result.exit_code);
1045            }
1046
1047            if let Err(e) = evaluated
1048                .handle_exit(&requirements, self.transferer.as_ref())
1049                .await
1050            {
1051                if attempt >= max_retries {
1052                    return Err(EvaluationError::new(
1053                        state.document.clone(),
1054                        task_execution_failed(e, task.name(), id, task.name_span()),
1055                    ));
1056                }
1057
1058                attempt += 1;
1059
1060                info!(
1061                    "retrying execution of task `{name}` (retry {attempt})",
1062                    name = state.task.name()
1063                );
1064                continue;
1065            }
1066
1067            break evaluated;
1068        };
1069
1070        // Evaluate the remaining inputs (unused), and decls, and outputs
1071        for index in &nodes[current..] {
1072            match &graph[*index] {
1073                TaskGraphNode::Decl(decl) => {
1074                    self.evaluate_decl(id, &mut state, decl)
1075                        .await
1076                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1077                }
1078                TaskGraphNode::Output(decl) => {
1079                    self.evaluate_output(id, &mut state, decl, &evaluated)
1080                        .await
1081                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1082                }
1083                _ => {
1084                    unreachable!(
1085                        "only declarations and outputs should be evaluated after the command"
1086                    )
1087                }
1088            }
1089        }
1090
1091        // Take the output scope and return it in declaration sort order
1092        let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
1093        if let Some(section) = definition.output() {
1094            let indexes: HashMap<_, _> = section
1095                .declarations()
1096                .enumerate()
1097                .map(|(i, d)| (d.name().hashable(), i))
1098                .collect();
1099            outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
1100        }
1101
1102        // Write the outputs to the task's root directory
1103        write_json_file(root_dir.join(OUTPUTS_FILE), &outputs)?;
1104
1105        evaluated.outputs = Ok(outputs);
1106        Ok(evaluated)
1107    }
1108
1109    /// Evaluates a task input.
1110    async fn evaluate_input(
1111        &self,
1112        id: &str,
1113        state: &mut State<'_>,
1114        decl: &Decl<SyntaxNode>,
1115        inputs: &TaskInputs,
1116    ) -> Result<(), Diagnostic> {
1117        let name = decl.name();
1118        let decl_ty = decl.ty();
1119        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1120
1121        // Evaluate the input if not provided one
1122        let (value, span) = match inputs.get(name.text()) {
1123            Some(input) => (input.clone(), name.span()),
1124            None => match decl.expr() {
1125                Some(expr) => {
1126                    debug!(
1127                        task_id = id,
1128                        task_name = state.task.name(),
1129                        document = state.document.uri().as_str(),
1130                        input_name = name.text(),
1131                        "evaluating input"
1132                    );
1133
1134                    let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1135                        state,
1136                        self.transferer.as_ref(),
1137                        ROOT_SCOPE_INDEX,
1138                    ));
1139                    (evaluator.evaluate_expr(&expr).await?, expr.span())
1140                }
1141                _ => {
1142                    assert!(ty.is_optional(), "type should be optional");
1143                    (Value::new_none(ty.clone()), name.span())
1144                }
1145            },
1146        };
1147
1148        // Coerce the value to the expected type
1149        let mut value = value
1150            .coerce(
1151                Some(&TaskEvaluationContext::new(
1152                    state,
1153                    self.transferer.as_ref(),
1154                    ROOT_SCOPE_INDEX,
1155                )),
1156                &ty,
1157            )
1158            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), span))?;
1159
1160        // Add any file or directory backend inputs
1161        state
1162            .add_backend_inputs(
1163                decl_ty.is_optional(),
1164                &mut value,
1165                &self.transferer,
1166                self.backend.needs_local_inputs(),
1167            )
1168            .await
1169            .map_err(|e| {
1170                decl_evaluation_failed(
1171                    e,
1172                    state.task.name(),
1173                    true,
1174                    name.text(),
1175                    Some(Io::Input),
1176                    name.span(),
1177                )
1178            })?;
1179
1180        // Insert the name into the scope
1181        state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1182
1183        // Insert an environment variable, if it is one
1184        if decl.env().is_some() {
1185            let value = value
1186                .as_primitive()
1187                .expect("value should be primitive")
1188                .raw(Some(&TaskEvaluationContext::new(
1189                    state,
1190                    self.transferer.as_ref(),
1191                    ROOT_SCOPE_INDEX,
1192                )))
1193                .to_string();
1194            state.env.insert(name.text().to_string(), value);
1195        }
1196
1197        Ok(())
1198    }
1199
1200    /// Evaluates a task private declaration.
1201    async fn evaluate_decl(
1202        &self,
1203        id: &str,
1204        state: &mut State<'_>,
1205        decl: &Decl<SyntaxNode>,
1206    ) -> Result<(), Diagnostic> {
1207        let name = decl.name();
1208        debug!(
1209            task_id = id,
1210            task_name = state.task.name(),
1211            document = state.document.uri().as_str(),
1212            decl_name = name.text(),
1213            "evaluating private declaration",
1214        );
1215
1216        let decl_ty = decl.ty();
1217        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1218
1219        let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1220            state,
1221            self.transferer.as_ref(),
1222            ROOT_SCOPE_INDEX,
1223        ));
1224
1225        let expr = decl.expr().expect("private decls should have expressions");
1226        let value = evaluator.evaluate_expr(&expr).await?;
1227        let mut value = value
1228            .coerce(
1229                Some(&TaskEvaluationContext::new(
1230                    state,
1231                    self.transferer.as_ref(),
1232                    ROOT_SCOPE_INDEX,
1233                )),
1234                &ty,
1235            )
1236            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1237
1238        // Add any file or directory backend inputs
1239        state
1240            .add_backend_inputs(
1241                decl_ty.is_optional(),
1242                &mut value,
1243                &self.transferer,
1244                self.backend.needs_local_inputs(),
1245            )
1246            .await
1247            .map_err(|e| {
1248                decl_evaluation_failed(e, state.task.name(), true, name.text(), None, name.span())
1249            })?;
1250
1251        state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1252
1253        // Insert an environment variable, if it is one
1254        if decl.env().is_some() {
1255            let value = value
1256                .as_primitive()
1257                .expect("value should be primitive")
1258                .raw(Some(&TaskEvaluationContext::new(
1259                    state,
1260                    self.transferer.as_ref(),
1261                    ROOT_SCOPE_INDEX,
1262                )))
1263                .to_string();
1264            state.env.insert(name.text().to_string(), value);
1265        }
1266
1267        Ok(())
1268    }
1269
1270    /// Evaluates the runtime section.
1271    ///
1272    /// Returns both the task's hints and requirements.
1273    async fn evaluate_runtime_section(
1274        &self,
1275        id: &str,
1276        state: &mut State<'_>,
1277        section: &RuntimeSection<SyntaxNode>,
1278        inputs: &TaskInputs,
1279    ) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
1280        debug!(
1281            task_id = id,
1282            task_name = state.task.name(),
1283            document = state.document.uri().as_str(),
1284            "evaluating runtimes section",
1285        );
1286
1287        let mut requirements = HashMap::new();
1288        let mut hints = HashMap::new();
1289
1290        let version = state
1291            .document
1292            .version()
1293            .expect("document should have version");
1294        for item in section.items() {
1295            let name = item.name();
1296            match inputs.requirement(name.text()) {
1297                Some(value) => {
1298                    requirements.insert(name.text().to_string(), value.clone());
1299                    continue;
1300                }
1301                _ => {
1302                    if let Some(value) = inputs.hint(name.text()) {
1303                        hints.insert(name.text().to_string(), value.clone());
1304                        continue;
1305                    }
1306                }
1307            }
1308
1309            let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1310                state,
1311                self.transferer.as_ref(),
1312                ROOT_SCOPE_INDEX,
1313            ));
1314
1315            let (types, requirement) = match task_requirement_types(version, name.text()) {
1316                Some(types) => (Some(types), true),
1317                None => match task_hint_types(version, name.text(), false) {
1318                    Some(types) => (Some(types), false),
1319                    None => (None, false),
1320                },
1321            };
1322
1323            // Evaluate and coerce to the expected type
1324            let expr = item.expr();
1325            let mut value = evaluator.evaluate_expr(&expr).await?;
1326            if let Some(types) = types {
1327                value = types
1328                    .iter()
1329                    .find_map(|ty| {
1330                        value
1331                            .coerce(
1332                                Some(&TaskEvaluationContext::new(
1333                                    state,
1334                                    self.transferer.as_ref(),
1335                                    ROOT_SCOPE_INDEX,
1336                                )),
1337                                ty,
1338                            )
1339                            .ok()
1340                    })
1341                    .ok_or_else(|| {
1342                        multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1343                    })?;
1344            }
1345
1346            if requirement {
1347                requirements.insert(name.text().to_string(), value);
1348            } else {
1349                hints.insert(name.text().to_string(), value);
1350            }
1351        }
1352
1353        Ok((requirements, hints))
1354    }
1355
1356    /// Evaluates the requirements section.
1357    async fn evaluate_requirements_section(
1358        &self,
1359        id: &str,
1360        state: &mut State<'_>,
1361        section: &RequirementsSection<SyntaxNode>,
1362        inputs: &TaskInputs,
1363    ) -> Result<HashMap<String, Value>, Diagnostic> {
1364        debug!(
1365            task_id = id,
1366            task_name = state.task.name(),
1367            document = state.document.uri().as_str(),
1368            "evaluating requirements",
1369        );
1370
1371        let mut requirements = HashMap::new();
1372
1373        let version = state
1374            .document
1375            .version()
1376            .expect("document should have version");
1377        for item in section.items() {
1378            let name = item.name();
1379            if let Some(value) = inputs.requirement(name.text()) {
1380                requirements.insert(name.text().to_string(), value.clone());
1381                continue;
1382            }
1383
1384            let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1385                state,
1386                self.transferer.as_ref(),
1387                ROOT_SCOPE_INDEX,
1388            ));
1389
1390            let types =
1391                task_requirement_types(version, name.text()).expect("requirement should be known");
1392
1393            // Evaluate and coerce to the expected type
1394            let expr = item.expr();
1395            let value = evaluator.evaluate_expr(&expr).await?;
1396            let value = types
1397                .iter()
1398                .find_map(|ty| {
1399                    value
1400                        .coerce(
1401                            Some(&TaskEvaluationContext::new(
1402                                state,
1403                                self.transferer.as_ref(),
1404                                ROOT_SCOPE_INDEX,
1405                            )),
1406                            ty,
1407                        )
1408                        .ok()
1409                })
1410                .ok_or_else(|| {
1411                    multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1412                })?;
1413
1414            requirements.insert(name.text().to_string(), value);
1415        }
1416
1417        Ok(requirements)
1418    }
1419
1420    /// Evaluates the hints section.
1421    async fn evaluate_hints_section(
1422        &self,
1423        id: &str,
1424        state: &mut State<'_>,
1425        section: &TaskHintsSection<SyntaxNode>,
1426        inputs: &TaskInputs,
1427    ) -> Result<HashMap<String, Value>, Diagnostic> {
1428        debug!(
1429            task_id = id,
1430            task_name = state.task.name(),
1431            document = state.document.uri().as_str(),
1432            "evaluating hints section",
1433        );
1434
1435        let mut hints = HashMap::new();
1436
1437        for item in section.items() {
1438            let name = item.name();
1439            if let Some(value) = inputs.hint(name.text()) {
1440                hints.insert(name.text().to_string(), value.clone());
1441                continue;
1442            }
1443
1444            let mut evaluator = ExprEvaluator::new(
1445                TaskEvaluationContext::new(state, self.transferer.as_ref(), ROOT_SCOPE_INDEX)
1446                    .with_task(),
1447            );
1448
1449            let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
1450            hints.insert(name.text().to_string(), value);
1451        }
1452
1453        Ok(hints)
1454    }
1455
1456    /// Evaluates the command of a task.
1457    ///
1458    /// Returns the evaluated command as a string.
1459    async fn evaluate_command(
1460        &self,
1461        id: &str,
1462        state: &mut State<'_>,
1463        section: &CommandSection<SyntaxNode>,
1464    ) -> EvaluationResult<String> {
1465        debug!(
1466            task_id = id,
1467            task_name = state.task.name(),
1468            document = state.document.uri().as_str(),
1469            "evaluating command section",
1470        );
1471
1472        let document = state.document.clone();
1473        let mut command = String::new();
1474        match section.strip_whitespace() {
1475            Some(parts) => {
1476                let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1477                    state,
1478                    self.transferer.as_ref(),
1479                    TASK_SCOPE_INDEX,
1480                ));
1481
1482                for part in parts {
1483                    match part {
1484                        StrippedCommandPart::Text(t) => {
1485                            command.push_str(t.as_str());
1486                        }
1487                        StrippedCommandPart::Placeholder(placeholder) => {
1488                            evaluator
1489                                .evaluate_placeholder(&placeholder, &mut command)
1490                                .await
1491                                .map_err(|d| EvaluationError::new(document.clone(), d))?;
1492                        }
1493                    }
1494                }
1495            }
1496            _ => {
1497                warn!(
1498                    "command for task `{task}` in `{uri}` has mixed indentation; whitespace \
1499                     stripping was skipped",
1500                    task = state.task.name(),
1501                    uri = state.document.uri(),
1502                );
1503
1504                let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1505                    state,
1506                    self.transferer.as_ref(),
1507                    TASK_SCOPE_INDEX,
1508                ));
1509
1510                let heredoc = section.is_heredoc();
1511                for part in section.parts() {
1512                    match part {
1513                        CommandPart::Text(t) => {
1514                            t.unescape_to(heredoc, &mut command);
1515                        }
1516                        CommandPart::Placeholder(placeholder) => {
1517                            evaluator
1518                                .evaluate_placeholder(&placeholder, &mut command)
1519                                .await
1520                                .map_err(|d| EvaluationError::new(document.clone(), d))?;
1521                        }
1522                    }
1523                }
1524            }
1525        }
1526
1527        Ok(command)
1528    }
1529
1530    /// Evaluates sections prior to spawning the command.
1531    ///
1532    /// This method evaluates the following sections:
1533    ///   * runtime
1534    ///   * requirements
1535    ///   * hints
1536    ///   * command
1537    async fn evaluate_sections(
1538        &self,
1539        id: &str,
1540        state: &mut State<'_>,
1541        definition: &TaskDefinition<SyntaxNode>,
1542        inputs: &TaskInputs,
1543        attempt: u64,
1544    ) -> EvaluationResult<EvaluatedSections> {
1545        // Start by evaluating requirements and hints
1546        let (requirements, hints) = match definition.runtime() {
1547            Some(section) => self
1548                .evaluate_runtime_section(id, state, &section, inputs)
1549                .await
1550                .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1551            _ => (
1552                match definition.requirements() {
1553                    Some(section) => self
1554                        .evaluate_requirements_section(id, state, &section, inputs)
1555                        .await
1556                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1557                    None => Default::default(),
1558                },
1559                match definition.hints() {
1560                    Some(section) => self
1561                        .evaluate_hints_section(id, state, &section, inputs)
1562                        .await
1563                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1564                    None => Default::default(),
1565                },
1566            ),
1567        };
1568
1569        // Update or insert the `task` variable in the task scope
1570        // TODO: if task variables become visible in `requirements` or `hints` section,
1571        // this needs to be relocated to before we evaluate those sections
1572        if state.document.version() >= Some(SupportedVersion::V1(V1::Two)) {
1573            // Get the execution constraints
1574            let constraints = self
1575                .backend
1576                .constraints(&requirements, &hints)
1577                .with_context(|| {
1578                    format!(
1579                        "failed to get constraints for task `{task}`",
1580                        task = state.task.name()
1581                    )
1582                })?;
1583
1584            let task = TaskValue::new_v1(
1585                state.task.name(),
1586                id,
1587                definition,
1588                constraints,
1589                attempt.try_into().with_context(|| {
1590                    format!(
1591                        "too many attempts were made to run task `{task}`",
1592                        task = state.task.name()
1593                    )
1594                })?,
1595            );
1596
1597            let scope = &mut state.scopes[TASK_SCOPE_INDEX.0];
1598            if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1599                *v = Value::Task(task);
1600            } else {
1601                scope.insert(TASK_VAR_NAME, Value::Task(task));
1602            }
1603        }
1604
1605        let command = self
1606            .evaluate_command(
1607                id,
1608                state,
1609                &definition.command().expect("must have command section"),
1610            )
1611            .await?;
1612
1613        Ok(EvaluatedSections {
1614            command,
1615            requirements: Arc::new(requirements),
1616            hints: Arc::new(hints),
1617        })
1618    }
1619
1620    /// Evaluates a task output.
1621    async fn evaluate_output(
1622        &self,
1623        id: &str,
1624        state: &mut State<'_>,
1625        decl: &Decl<SyntaxNode>,
1626        evaluated: &EvaluatedTask,
1627    ) -> Result<(), Diagnostic> {
1628        let name = decl.name();
1629        debug!(
1630            task_id = id,
1631            task_name = state.task.name(),
1632            document = state.document.uri().as_str(),
1633            output_name = name.text(),
1634            "evaluating output",
1635        );
1636
1637        let decl_ty = decl.ty();
1638        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1639        let mut evaluator = ExprEvaluator::new(
1640            TaskEvaluationContext::new(state, self.transferer.as_ref(), TASK_SCOPE_INDEX)
1641                .with_work_dir(&evaluated.result.work_dir)
1642                .with_stdout(&evaluated.result.stdout)
1643                .with_stderr(&evaluated.result.stderr),
1644        );
1645
1646        let expr = decl.expr().expect("outputs should have expressions");
1647        let value = evaluator.evaluate_expr(&expr).await?;
1648
1649        // Coerce the output value to the expected type
1650        let mut value = value
1651            .coerce(Some(evaluator.context()), &ty)
1652            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1653
1654        // Translate `File` and `Directory` values to host paths.
1655        // For output section evaluation, paths are relative to the task's work
1656        // directory
1657        value
1658            .visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1659                let path = match value {
1660                    PrimitiveValue::File(path) => path,
1661                    PrimitiveValue::Directory(path) => path,
1662                    _ => unreachable!("only file and directory values should be visited"),
1663                };
1664
1665                // Join the path with the work directory.
1666                // The work directory returned by the backend is already a host path
1667                let mut output_path = evaluated.result.work_dir.join(path.as_str())?;
1668
1669                // Ensure the output's path is valid
1670                let output_path = match (&mut output_path, &evaluated.result.work_dir) {
1671                    (EvaluationPath::Local(joined), EvaluationPath::Local(base))
1672                        if joined.starts_with(base)
1673                            || joined.starts_with(&evaluated.attempt_dir) =>
1674                    {
1675                        // The joined path is contained within the work directory or attempt
1676                        // directory
1677                        HostPath::new(
1678                            output_path
1679                                .into_string()
1680                                .with_context(|| format!("path `{path}` is not UTF-8"))?,
1681                        )
1682                    }
1683                    (EvaluationPath::Local(_), EvaluationPath::Local(_)) => {
1684                        // The joined path is not within the work or attempt directory; therefore,
1685                        // it is required to be an input
1686                        state
1687                            .path_map
1688                            .get_by_left(path)
1689                            .ok_or_else(|| {
1690                                anyhow!(
1691                                    "guest path `{path}` is not an input or within the task's \
1692                                     working directory"
1693                                )
1694                            })?
1695                            .0
1696                            .clone()
1697                            .into()
1698                    }
1699                    (EvaluationPath::Local(_), EvaluationPath::Remote(_)) => {
1700                        // Path is local (and absolute) and the working directory is remote
1701                        bail!("cannot access guest path `{path}` from a remotely executing task")
1702                    }
1703                    (EvaluationPath::Remote(_), _) => HostPath::new(
1704                        output_path
1705                            .into_string()
1706                            .with_context(|| format!("path `{path}` is not UTF-8"))?,
1707                    ),
1708                };
1709
1710                *path = output_path;
1711
1712                // Ensure the path exists; if it does not and it was optional, the value is
1713                // replaced with `None`
1714                value.ensure_path_exists(optional, state.base_dir.as_local())
1715            })
1716            .map_err(|e| {
1717                decl_evaluation_failed(
1718                    e,
1719                    state.task.name(),
1720                    true,
1721                    name.text(),
1722                    Some(Io::Output),
1723                    name.span(),
1724                )
1725            })?;
1726
1727        state.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
1728        Ok(())
1729    }
1730
1731    /// Localizes inputs for execution.
1732    ///
1733    /// Returns the inputs to pass to the backend.
1734    async fn localize_inputs(
1735        &self,
1736        task_id: &str,
1737        state: &mut State<'_>,
1738    ) -> EvaluationResult<Vec<Input>> {
1739        // If the backend needs local inputs, download them now
1740        if self.backend.needs_local_inputs() {
1741            let mut downloads = JoinSet::new();
1742
1743            // Download any necessary files
1744            for (idx, input) in state.backend_inputs.as_slice_mut().iter_mut().enumerate() {
1745                if input.local_path().is_some() {
1746                    continue;
1747                }
1748
1749                if let EvaluationPath::Remote(url) = input.path() {
1750                    let transferer = self.transferer.clone();
1751                    let url = url.clone();
1752                    downloads.spawn(async move {
1753                        transferer
1754                            .download(&url)
1755                            .await
1756                            .map(|l| (idx, l))
1757                            .with_context(|| anyhow!("failed to localize `{url}`"))
1758                    });
1759                }
1760            }
1761
1762            // Wait for the downloads to complete
1763            while let Some(result) = downloads.join_next().await {
1764                match result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}"))) {
1765                    Ok((idx, location)) => {
1766                        state.backend_inputs.as_slice_mut()[idx].set_location(location);
1767                    }
1768                    Err(e) => {
1769                        return Err(EvaluationError::new(
1770                            state.document.clone(),
1771                            task_localization_failed(e, state.task.name(), state.task.name_span()),
1772                        ));
1773                    }
1774                }
1775            }
1776        }
1777
1778        if enabled!(Level::DEBUG) {
1779            for input in state.backend_inputs.as_slice() {
1780                match (input.path().as_local().is_some(), input.guest_path()) {
1781                    (true, None) => {}
1782                    (true, Some(guest_path)) => {
1783                        debug!(
1784                            task_id,
1785                            task_name = state.task.name(),
1786                            document = state.document.uri().as_str(),
1787                            "task input `{path}` mapped to `{guest_path}`",
1788                            path = input.path().display(),
1789                        );
1790                    }
1791                    (false, None) => {
1792                        debug!(
1793                            task_id,
1794                            task_name = state.task.name(),
1795                            document = state.document.uri().as_str(),
1796                            "task input `{path}` downloaded to `{local_path}`",
1797                            path = input.path().display(),
1798                            local_path = input
1799                                .local_path()
1800                                .expect("input should be localized")
1801                                .display()
1802                        );
1803                    }
1804                    (false, Some(guest_path)) => {
1805                        debug!(
1806                            task_id,
1807                            task_name = state.task.name(),
1808                            document = state.document.uri().as_str(),
1809                            "task input `{path}` downloaded to `{local_path}` and mapped to \
1810                             `{guest_path}`",
1811                            path = input.path().display(),
1812                            local_path = input
1813                                .local_path()
1814                                .expect("input should be localized")
1815                                .display(),
1816                        );
1817                    }
1818                }
1819            }
1820        }
1821
1822        Ok(state.backend_inputs.as_slice().into())
1823    }
1824}