wdl_engine/eval/v1/
task.rs

1//! Implementation of evaluation for V1 tasks.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::future::Future;
7use std::mem;
8use std::path::Path;
9use std::path::absolute;
10use std::str::FromStr;
11use std::sync::Arc;
12
13use anyhow::Context;
14use anyhow::Result;
15use anyhow::anyhow;
16use anyhow::bail;
17use indexmap::IndexMap;
18use petgraph::algo::toposort;
19use tokio_util::sync::CancellationToken;
20use tracing::Level;
21use tracing::debug;
22use tracing::enabled;
23use tracing::info;
24use tracing::warn;
25use wdl_analysis::Document;
26use wdl_analysis::diagnostics::multiple_type_mismatch;
27use wdl_analysis::diagnostics::unknown_name;
28use wdl_analysis::document::TASK_VAR_NAME;
29use wdl_analysis::document::Task;
30use wdl_analysis::eval::v1::TaskGraphBuilder;
31use wdl_analysis::eval::v1::TaskGraphNode;
32use wdl_analysis::types::Optional;
33use wdl_analysis::types::PrimitiveType;
34use wdl_analysis::types::Type;
35use wdl_analysis::types::v1::task_hint_types;
36use wdl_analysis::types::v1::task_requirement_types;
37use wdl_ast::Ast;
38use wdl_ast::AstNode;
39use wdl_ast::AstToken;
40use wdl_ast::Diagnostic;
41use wdl_ast::Span;
42use wdl_ast::SupportedVersion;
43use wdl_ast::v1::CommandPart;
44use wdl_ast::v1::CommandSection;
45use wdl_ast::v1::Decl;
46use wdl_ast::v1::RequirementsSection;
47use wdl_ast::v1::RuntimeSection;
48use wdl_ast::v1::StrippedCommandPart;
49use wdl_ast::v1::TASK_HINT_DISKS;
50use wdl_ast::v1::TASK_HINT_MAX_CPU;
51use wdl_ast::v1::TASK_HINT_MAX_CPU_ALIAS;
52use wdl_ast::v1::TASK_HINT_MAX_MEMORY;
53use wdl_ast::v1::TASK_HINT_MAX_MEMORY_ALIAS;
54use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER;
55use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER_ALIAS;
56use wdl_ast::v1::TASK_REQUIREMENT_CPU;
57use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
58use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES;
59use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES_ALIAS;
60use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
61use wdl_ast::v1::TaskDefinition;
62use wdl_ast::v1::TaskHintsSection;
63use wdl_ast::version::V1;
64
65use super::ProgressKind;
66use crate::Coercible;
67use crate::EvaluationContext;
68use crate::EvaluationError;
69use crate::EvaluationResult;
70use crate::Input;
71use crate::InputKind;
72use crate::ONE_GIBIBYTE;
73use crate::Outputs;
74use crate::PrimitiveValue;
75use crate::Scope;
76use crate::ScopeIndex;
77use crate::ScopeRef;
78use crate::StorageUnit;
79use crate::TaskExecutionBackend;
80use crate::TaskInputs;
81use crate::TaskSpawnInfo;
82use crate::TaskSpawnRequest;
83use crate::TaskValue;
84use crate::Value;
85use crate::config::Config;
86use crate::config::MAX_RETRIES;
87use crate::convert_unit_string;
88use crate::diagnostics::output_evaluation_failed;
89use crate::diagnostics::runtime_type_mismatch;
90use crate::diagnostics::task_execution_failed;
91use crate::diagnostics::task_localization_failed;
92use crate::eval::EvaluatedTask;
93use crate::http::Downloader;
94use crate::http::HttpDownloader;
95use crate::path;
96use crate::path::EvaluationPath;
97use crate::tree::SyntaxNode;
98use crate::v1::ExprEvaluator;
99use crate::v1::INPUTS_FILE;
100use crate::v1::OUTPUTS_FILE;
101use crate::v1::write_json_file;
102
103/// The default container requirement.
104pub const DEFAULT_TASK_REQUIREMENT_CONTAINER: &str = "ubuntu:latest";
105/// The default value for the `cpu` requirement.
106pub const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
107/// The default value for the `memory` requirement.
108pub const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * (ONE_GIBIBYTE as i64);
109/// The default value for the `max_retries` requirement.
110pub const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
111/// The default value for the `disks` requirement (in GiB).
112pub const DEFAULT_TASK_REQUIREMENT_DISKS: f64 = 1.0;
113
114/// The index of a task's root scope.
115const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
116/// The index of a task's output scope.
117const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
118/// The index of the evaluation scope where the WDL 1.2 `task` variable is
119/// visible.
120const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
121
122/// Gets the `container` requirement from a requirements map.
123pub(crate) fn container<'a>(
124    requirements: &'a HashMap<String, Value>,
125    default: Option<&'a str>,
126) -> Cow<'a, str> {
127    requirements
128        .get(TASK_REQUIREMENT_CONTAINER)
129        .or_else(|| requirements.get(TASK_REQUIREMENT_CONTAINER_ALIAS))
130        .and_then(|v| -> Option<Cow<'_, str>> {
131            // If the value is an array, use the first element or the default
132            // Note: in the future we should be resolving which element in the array is
133            // usable; this will require some work in Crankshaft to enable
134            if let Some(array) = v.as_array() {
135                return array.as_slice().first().map(|v| {
136                    v.as_string()
137                        .expect("type should be string")
138                        .as_ref()
139                        .into()
140                });
141            }
142
143            Some(
144                v.coerce(&PrimitiveType::String.into())
145                    .expect("type should coerce")
146                    .unwrap_string()
147                    .as_ref()
148                    .clone()
149                    .into(),
150            )
151        })
152        .and_then(|v| {
153            // Treat star as the default
154            if v == "*" { None } else { Some(v) }
155        })
156        .unwrap_or_else(|| {
157            default
158                .map(Into::into)
159                .unwrap_or(DEFAULT_TASK_REQUIREMENT_CONTAINER.into())
160        })
161}
162
163/// Gets the `cpu` requirement from a requirements map.
164pub(crate) fn cpu(requirements: &HashMap<String, Value>) -> f64 {
165    requirements
166        .get(TASK_REQUIREMENT_CPU)
167        .map(|v| {
168            v.coerce(&PrimitiveType::Float.into())
169                .expect("type should coerce")
170                .unwrap_float()
171        })
172        .unwrap_or(DEFAULT_TASK_REQUIREMENT_CPU)
173}
174
175/// Gets the `max_cpu` hint from a hints map.
176pub(crate) fn max_cpu(hints: &HashMap<String, Value>) -> Option<f64> {
177    hints
178        .get(TASK_HINT_MAX_CPU)
179        .or_else(|| hints.get(TASK_HINT_MAX_CPU_ALIAS))
180        .map(|v| {
181            v.coerce(&PrimitiveType::Float.into())
182                .expect("type should coerce")
183                .unwrap_float()
184        })
185}
186
187/// Gets the `memory` requirement from a requirements map.
188pub(crate) fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
189    Ok(requirements
190        .get(TASK_REQUIREMENT_MEMORY)
191        .map(|v| {
192            if let Some(v) = v.as_integer() {
193                return Ok(v);
194            }
195
196            if let Some(s) = v.as_string() {
197                return convert_unit_string(s)
198                    .and_then(|v| v.try_into().ok())
199                    .with_context(|| {
200                        format!("task specifies an invalid `memory` requirement `{s}`")
201                    });
202            }
203
204            unreachable!("value should be an integer or string");
205        })
206        .transpose()?
207        .unwrap_or(DEFAULT_TASK_REQUIREMENT_MEMORY))
208}
209
210/// Gets the `max_memory` hint from a hints map.
211pub(crate) fn max_memory(hints: &HashMap<String, Value>) -> Result<Option<i64>> {
212    hints
213        .get(TASK_HINT_MAX_MEMORY)
214        .or_else(|| hints.get(TASK_HINT_MAX_MEMORY_ALIAS))
215        .map(|v| {
216            if let Some(v) = v.as_integer() {
217                return Ok(v);
218            }
219
220            if let Some(s) = v.as_string() {
221                return convert_unit_string(s)
222                    .and_then(|v| v.try_into().ok())
223                    .with_context(|| {
224                        format!("task specifies an invalid `memory` requirement `{s}`")
225                    });
226            }
227
228            unreachable!("value should be an integer or string");
229        })
230        .transpose()
231}
232
233/// Represents the type of a disk.
234///
235/// Disk types are specified via hints.
236#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
237pub enum DiskType {
238    /// The disk type is a solid state drive.
239    SSD,
240    /// The disk type is a hard disk drive.
241    HDD,
242}
243
244impl FromStr for DiskType {
245    type Err = ();
246
247    fn from_str(s: &str) -> Result<Self, Self::Err> {
248        match s {
249            "SSD" => Ok(Self::SSD),
250            "HDD" => Ok(Self::HDD),
251            _ => Err(()),
252        }
253    }
254}
255
256/// Represents a task disk requirement.
257pub struct DiskRequirement {
258    /// The size of the disk, in GiB.
259    pub size: i64,
260
261    /// The disk type as specified by a corresponding task hint.
262    pub ty: Option<DiskType>,
263}
264
265/// Gets the `disks` requirement.
266///
267/// Upon success, returns a mapping of mount point to disk requirement.
268pub(crate) fn disks<'a>(
269    requirements: &'a HashMap<String, Value>,
270    hints: &HashMap<String, Value>,
271) -> Result<HashMap<&'a str, DiskRequirement>> {
272    /// Helper for looking up a disk type from the hints.
273    ///
274    /// If we don't recognize the specification, we ignore it.
275    fn lookup_type(mount_point: Option<&str>, hints: &HashMap<String, Value>) -> Option<DiskType> {
276        hints.get(TASK_HINT_DISKS).and_then(|v| {
277            if let Some(ty) = v.as_string() {
278                return ty.parse().ok();
279            }
280
281            if let Some(map) = v.as_map() {
282                // Find the corresponding key; we have to scan the keys because the map is
283                // storing primitive values
284                if let Some((_, v)) = map.iter().find(|(k, _)| match (k, mount_point) {
285                    (None, None) => true,
286                    (None, Some(_)) | (Some(_), None) => false,
287                    (Some(k), Some(mount_point)) => k
288                        .as_string()
289                        .map(|k| k.as_str() == mount_point)
290                        .unwrap_or(false),
291                }) {
292                    return v.as_string().and_then(|ty| ty.parse().ok());
293                }
294            }
295
296            None
297        })
298    }
299
300    /// Parses a disk specification into a size (in GiB) and optional mount
301    /// point.
302    fn parse_disk_spec(spec: &str) -> Option<(i64, Option<&str>)> {
303        let iter = spec.split_whitespace();
304        let mut first = None;
305        let mut second = None;
306        let mut third = None;
307
308        for part in iter {
309            if first.is_none() {
310                first = Some(part);
311                continue;
312            }
313
314            if second.is_none() {
315                second = Some(part);
316                continue;
317            }
318
319            if third.is_none() {
320                third = Some(part);
321                continue;
322            }
323
324            return None;
325        }
326
327        match (first, second, third) {
328            (None, None, None) => None,
329            (Some(size), None, None) => {
330                // Specification is `<size>` (in GiB)
331                Some((size.parse().ok()?, None))
332            }
333            (Some(first), Some(second), None) => {
334                // Check for `<size> <unit>`; convert from the specified unit to GiB
335                if let Ok(size) = first.parse() {
336                    let unit: StorageUnit = second.parse().ok()?;
337                    let size = unit.bytes(size)? / (ONE_GIBIBYTE as u64);
338                    return Some((size.try_into().ok()?, None));
339                }
340
341                // Specification is `<mount-point> <size>` (where size is already in GiB)
342                // The mount point must be absolute, i.e. start with `/`
343                if !first.starts_with('/') {
344                    return None;
345                }
346
347                Some((second.parse().ok()?, Some(first)))
348            }
349            (Some(mount_point), Some(size), Some(unit)) => {
350                // Specification is `<mount-point> <size> <units>`
351                let unit: StorageUnit = unit.parse().ok()?;
352                let size = unit.bytes(size.parse().ok()?)? / (ONE_GIBIBYTE as u64);
353
354                // Mount point must be absolute
355                if !mount_point.starts_with('/') {
356                    return None;
357                }
358
359                Some((size.try_into().ok()?, Some(mount_point)))
360            }
361            _ => unreachable!("should have one, two, or three values"),
362        }
363    }
364
365    /// Inserts a disk into the disks map.
366    fn insert_disk<'a>(
367        spec: &'a str,
368        hints: &HashMap<String, Value>,
369        disks: &mut HashMap<&'a str, DiskRequirement>,
370    ) -> Result<()> {
371        let (size, mount_point) =
372            parse_disk_spec(spec).with_context(|| format!("invalid disk specification `{spec}"))?;
373
374        let prev = disks.insert(
375            mount_point.unwrap_or("/"),
376            DiskRequirement {
377                size,
378                ty: lookup_type(mount_point, hints),
379            },
380        );
381
382        if prev.is_some() {
383            bail!(
384                "duplicate mount point `{mp}` specified in `disks` requirement",
385                mp = mount_point.unwrap_or("/")
386            );
387        }
388
389        Ok(())
390    }
391
392    let mut disks = HashMap::new();
393    if let Some(v) = requirements.get(TASK_REQUIREMENT_DISKS) {
394        if let Some(size) = v.as_integer() {
395            // Disk spec is just the size (in GiB)
396            if size < 0 {
397                bail!("task requirement `disks` cannot be less than zero");
398            }
399
400            disks.insert(
401                "/",
402                DiskRequirement {
403                    size,
404                    ty: lookup_type(None, hints),
405                },
406            );
407        } else if let Some(spec) = v.as_string() {
408            insert_disk(spec, hints, &mut disks)?;
409        } else if let Some(v) = v.as_array() {
410            for spec in v.as_slice() {
411                insert_disk(
412                    spec.as_string().expect("spec should be a string"),
413                    hints,
414                    &mut disks,
415                )?;
416            }
417        } else {
418            unreachable!("value should be an integer, string, or array");
419        }
420    }
421
422    Ok(disks)
423}
424
425/// Gets the `preemptible` hint from a hints map.
426///
427/// This hint is not part of the WDL standard but is used for compatibility with
428/// Cromwell where backends can support preemptible retries before using
429/// dedicated instances.
430pub(crate) fn preemptible(hints: &HashMap<String, Value>) -> i64 {
431    const TASK_HINT_PREEMPTIBLE: &str = "preemptible";
432    const DEFAULT_TASK_HINT_PREEMPTIBLE: i64 = 0;
433
434    hints
435        .get(TASK_HINT_PREEMPTIBLE)
436        .and_then(|v| {
437            Some(
438                v.coerce(&PrimitiveType::Integer.into())
439                    .ok()?
440                    .unwrap_integer(),
441            )
442        })
443        .unwrap_or(DEFAULT_TASK_HINT_PREEMPTIBLE)
444}
445
446/// Used to evaluate expressions in tasks.
447struct TaskEvaluationContext<'a, 'b> {
448    /// The associated evaluation state.
449    state: &'a State<'b>,
450    /// The downloader to use for expression evaluation.
451    downloader: &'a HttpDownloader,
452    /// The current evaluation scope.
453    scope: ScopeIndex,
454    /// The work directory.
455    ///
456    /// This field is only available after task execution.
457    work_dir: Option<&'a EvaluationPath>,
458    /// The standard out value to use.
459    ///
460    /// This field is only available after task execution.
461    stdout: Option<&'a Value>,
462    /// The standard error value to use.
463    ///
464    /// This field is only available after task execution.
465    stderr: Option<&'a Value>,
466    /// The inputs for the evaluation.
467    inputs: Option<&'a [Input]>,
468    /// Whether or not the evaluation has associated task information.
469    ///
470    /// This is `true` when evaluating hints sections.
471    task: bool,
472}
473
474impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
475    /// Constructs a new expression evaluation context.
476    pub fn new(state: &'a State<'b>, downloader: &'a HttpDownloader, scope: ScopeIndex) -> Self {
477        Self {
478            state,
479            downloader,
480            scope,
481            work_dir: None,
482            stdout: None,
483            stderr: None,
484            inputs: None,
485            task: false,
486        }
487    }
488
489    /// Sets the working directory to use for the evaluation context.
490    pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
491        self.work_dir = Some(work_dir);
492        self
493    }
494
495    /// Sets the stdout value to use for the evaluation context.
496    pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
497        self.stdout = Some(stdout);
498        self
499    }
500
501    /// Sets the stderr value to use for the evaluation context.
502    pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
503        self.stderr = Some(stderr);
504        self
505    }
506
507    /// Sets the inputs associated with the evaluation context.
508    pub fn with_inputs(mut self, inputs: &'a [Input]) -> Self {
509        self.inputs = Some(inputs);
510        self
511    }
512
513    /// Marks the evaluation as having associated task information.
514    ///
515    /// This is used in evaluating hints sections.
516    pub fn with_task(mut self) -> Self {
517        self.task = true;
518        self
519    }
520}
521
522impl EvaluationContext for TaskEvaluationContext<'_, '_> {
523    fn version(&self) -> SupportedVersion {
524        self.state
525            .document
526            .version()
527            .expect("document should have a version")
528    }
529
530    fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
531        ScopeRef::new(&self.state.scopes, self.scope)
532            .lookup(name)
533            .cloned()
534            .ok_or_else(|| unknown_name(name, span))
535    }
536
537    fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
538        crate::resolve_type_name(self.state.document, name, span)
539    }
540
541    fn work_dir(&self) -> Option<&EvaluationPath> {
542        self.work_dir
543    }
544
545    fn temp_dir(&self) -> &Path {
546        self.state.temp_dir
547    }
548
549    fn stdout(&self) -> Option<&Value> {
550        self.stdout
551    }
552
553    fn stderr(&self) -> Option<&Value> {
554        self.stderr
555    }
556
557    fn task(&self) -> Option<&Task> {
558        if self.task {
559            Some(self.state.task)
560        } else {
561            None
562        }
563    }
564
565    fn translate_path(&self, path: &str) -> Option<Cow<'_, Path>> {
566        let inputs = self.inputs?;
567        let is_url = path::is_url(path);
568
569        // We cannot translate a relative path
570        if !is_url && Path::new(path).is_relative() {
571            return None;
572        }
573
574        // The most specific (i.e. shortest stripped path) wins
575        let (guest_path, stripped) = inputs
576            .iter()
577            .filter_map(|i| {
578                match i.path() {
579                    EvaluationPath::Local(base) if !is_url => {
580                        let stripped = Path::new(path).strip_prefix(base).ok()?;
581                        Some((i.guest_path()?, stripped.to_str()?))
582                    }
583                    EvaluationPath::Remote(url) if is_url => {
584                        let url = url.as_str();
585                        let stripped = path.strip_prefix(url.strip_suffix('/').unwrap_or(url))?;
586
587                        // Strip off the query string or fragment
588                        let stripped = if let Some(pos) = stripped.find('?') {
589                            &stripped[..pos]
590                        } else if let Some(pos) = stripped.find('#') {
591                            &stripped[..pos]
592                        } else {
593                            stripped.strip_prefix('/').unwrap_or(stripped)
594                        };
595
596                        Some((i.guest_path()?, stripped))
597                    }
598                    _ => None,
599                }
600            })
601            .min_by(|(_, a), (_, b)| a.len().cmp(&b.len()))?;
602
603        if stripped.is_empty() {
604            return Some(Path::new(guest_path).into());
605        }
606
607        Some(Path::new(guest_path).join(stripped).into())
608    }
609
610    fn downloader(&self) -> &dyn Downloader {
611        self.downloader
612    }
613}
614
615/// Represents task evaluation state.
616struct State<'a> {
617    /// The temp directory.
618    temp_dir: &'a Path,
619    /// The document containing the workflow being evaluated.
620    document: &'a Document,
621    /// The task being evaluated.
622    task: &'a Task,
623    /// The scopes of the task being evaluated.
624    ///
625    /// The first scope is the root scope, the second is the output scope, and
626    /// the third is the scope where the "task" variable is visible in 1.2+
627    /// evaluations.
628    scopes: [Scope; 3],
629    /// The environment variables of the task.
630    ///
631    /// Environment variables do not change between retries.
632    env: IndexMap<String, String>,
633}
634
635impl<'a> State<'a> {
636    /// Constructs a new task evaluation state.
637    fn new(temp_dir: &'a Path, document: &'a Document, task: &'a Task) -> Result<Self> {
638        // Tasks have a root scope (index 0), an output scope (index 1), and a `task`
639        // variable scope (index 2). The output scope inherits from the root scope and
640        // the task scope inherits from the output scope. Inputs and private
641        // declarations are evaluated into the root scope. Outputs are evaluated into
642        // the output scope. The task scope is used for evaluating expressions in both
643        // the command and output sections. Only the `task` variable in WDL 1.2 is
644        // introduced into the task scope; in previous WDL versions, the task scope will
645        // not have any local names.
646        let scopes = [
647            Scope::default(),
648            Scope::new(ROOT_SCOPE_INDEX),
649            Scope::new(OUTPUT_SCOPE_INDEX),
650        ];
651
652        Ok(Self {
653            temp_dir,
654            document,
655            task,
656            scopes,
657            env: Default::default(),
658        })
659    }
660}
661
662/// Represents the result of evaluating task sections before execution.
663struct EvaluatedSections {
664    /// The evaluated command.
665    command: String,
666    /// The evaluated requirements.
667    requirements: Arc<HashMap<String, Value>>,
668    /// The evaluated hints.
669    hints: Arc<HashMap<String, Value>>,
670    /// The inputs to the task.
671    inputs: Vec<Input>,
672}
673
674/// Represents a WDL V1 task evaluator.
675pub struct TaskEvaluator {
676    /// The associated evaluation configuration.
677    config: Arc<Config>,
678    /// The associated task execution backend.
679    backend: Arc<dyn TaskExecutionBackend>,
680    /// The cancellation token for cancelling task evaluation.
681    token: CancellationToken,
682    /// The downloader to use for expression evaluation.
683    downloader: HttpDownloader,
684}
685
686impl TaskEvaluator {
687    /// Constructs a new task evaluator with the given evaluation
688    /// configuration and cancellation token.
689    ///
690    /// This method creates a default task execution backend.
691    ///
692    /// Returns an error if the configuration isn't valid.
693    pub async fn new(config: Config, token: CancellationToken) -> Result<Self> {
694        let backend = config.create_backend().await?;
695        Self::new_with_backend(config, backend, token)
696    }
697
698    /// Constructs a new task evaluator with the given evaluation
699    /// configuration, task execution backend, and cancellation token.
700    ///
701    /// Returns an error if the configuration isn't valid.
702    pub fn new_with_backend(
703        config: Config,
704        backend: Arc<dyn TaskExecutionBackend>,
705        token: CancellationToken,
706    ) -> Result<Self> {
707        config.validate()?;
708
709        let config = Arc::new(config);
710        let downloader = HttpDownloader::new(config.clone())?;
711
712        Ok(Self {
713            config,
714            backend,
715            token,
716            downloader,
717        })
718    }
719
720    /// Creates a new task evaluator with the given configuration, backend,
721    /// cancellation token, and downloader.
722    ///
723    /// This method does not validate the configuration.
724    pub(crate) fn new_unchecked(
725        config: Arc<Config>,
726        backend: Arc<dyn TaskExecutionBackend>,
727        token: CancellationToken,
728        downloader: HttpDownloader,
729    ) -> Self {
730        Self {
731            config,
732            backend,
733            token,
734            downloader,
735        }
736    }
737
738    /// Evaluates the given task.
739    ///
740    /// Upon success, returns the evaluated task.
741    pub async fn evaluate<P, R>(
742        &self,
743        document: &Document,
744        task: &Task,
745        inputs: &TaskInputs,
746        root: impl AsRef<Path>,
747        progress: P,
748    ) -> EvaluationResult<EvaluatedTask>
749    where
750        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
751        R: Future<Output = ()> + Send,
752    {
753        self.evaluate_with_progress(
754            document,
755            task,
756            inputs,
757            root.as_ref(),
758            task.name(),
759            Arc::new(progress),
760        )
761        .await
762    }
763
764    /// Evaluates the given task with the given shared progress callback.
765    pub(crate) async fn evaluate_with_progress<P, R>(
766        &self,
767        document: &Document,
768        task: &Task,
769        inputs: &TaskInputs,
770        root: &Path,
771        id: &str,
772        progress: Arc<P>,
773    ) -> EvaluationResult<EvaluatedTask>
774    where
775        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
776        R: Future<Output = ()> + Send,
777    {
778        // We cannot evaluate a document with errors
779        if document.has_errors() {
780            return Err(anyhow!("cannot evaluate a document with errors").into());
781        }
782
783        progress(ProgressKind::TaskStarted { id }).await;
784
785        let result = self
786            .perform_evaluation(document, task, inputs, root, id, progress.clone())
787            .await;
788
789        progress(ProgressKind::TaskCompleted {
790            id,
791            result: &result,
792        })
793        .await;
794
795        result
796    }
797
798    /// Performs the actual evaluation of the task.
799    async fn perform_evaluation<P, R>(
800        &self,
801        document: &Document,
802        task: &Task,
803        inputs: &TaskInputs,
804        root: &Path,
805        id: &str,
806        progress: Arc<P>,
807    ) -> EvaluationResult<EvaluatedTask>
808    where
809        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
810        R: Future<Output = ()> + Send,
811    {
812        inputs.validate(document, task, None).with_context(|| {
813            format!(
814                "failed to validate the inputs to task `{task}`",
815                task = task.name()
816            )
817        })?;
818
819        let ast = match document.root().morph().ast() {
820            Ast::V1(ast) => ast,
821            _ => {
822                return Err(
823                    anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
824                );
825            }
826        };
827
828        // Find the task in the AST
829        let definition = ast
830            .tasks()
831            .find(|t| t.name().text() == task.name())
832            .expect("task should exist in the AST");
833
834        let version = document.version().expect("document should have version");
835
836        // Build an evaluation graph for the task
837        let mut diagnostics = Vec::new();
838        let graph = TaskGraphBuilder::default().build(version, &definition, &mut diagnostics);
839        assert!(
840            diagnostics.is_empty(),
841            "task evaluation graph should have no diagnostics"
842        );
843
844        debug!(
845            task_id = id,
846            task_name = task.name(),
847            document = document.uri().as_str(),
848            "evaluating task"
849        );
850
851        let root_dir = absolute(root).with_context(|| {
852            format!(
853                "failed to determine absolute path of `{path}`",
854                path = root.display()
855            )
856        })?;
857
858        // Create the temp directory now as it may be needed for task evaluation
859        let temp_dir = root_dir.join("tmp");
860        fs::create_dir_all(&temp_dir).with_context(|| {
861            format!(
862                "failed to create directory `{path}`",
863                path = temp_dir.display()
864            )
865        })?;
866
867        // Write the inputs to the task's root directory
868        write_json_file(root_dir.join(INPUTS_FILE), inputs)?;
869
870        let mut state = State::new(&temp_dir, document, task)?;
871        let nodes = toposort(&graph, None).expect("graph should be acyclic");
872        let mut current = 0;
873        while current < nodes.len() {
874            match &graph[nodes[current]] {
875                TaskGraphNode::Input(decl) => {
876                    self.evaluate_input(id, &mut state, decl, inputs)
877                        .await
878                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
879                }
880                TaskGraphNode::Decl(decl) => {
881                    self.evaluate_decl(id, &mut state, decl)
882                        .await
883                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
884                }
885                TaskGraphNode::Output(_) => {
886                    // Stop at the first output
887                    break;
888                }
889                TaskGraphNode::Command(_)
890                | TaskGraphNode::Runtime(_)
891                | TaskGraphNode::Requirements(_)
892                | TaskGraphNode::Hints(_) => {
893                    // Skip these sections for now; they'll evaluate in the
894                    // retry loop
895                }
896            }
897
898            current += 1;
899        }
900
901        // TODO: check call cache for a hit. if so, skip task execution and use cache
902        // paths for output evaluation
903
904        let env = Arc::new(mem::take(&mut state.env));
905
906        // Spawn the task in a retry loop
907        let mut attempt = 0;
908        let mut evaluated = loop {
909            let EvaluatedSections {
910                command,
911                requirements,
912                hints,
913                inputs,
914            } = self
915                .evaluate_sections(id, &mut state, &definition, inputs, attempt)
916                .await?;
917
918            // Get the maximum number of retries, either from the task's requirements or
919            // from configuration
920            let max_retries = requirements
921                .get(TASK_REQUIREMENT_MAX_RETRIES)
922                .or_else(|| requirements.get(TASK_REQUIREMENT_MAX_RETRIES_ALIAS))
923                .cloned()
924                .map(|v| v.unwrap_integer() as u64)
925                .or_else(|| self.config.task.retries)
926                .unwrap_or(DEFAULT_TASK_REQUIREMENT_MAX_RETRIES);
927
928            if max_retries > MAX_RETRIES {
929                return Err(anyhow!(
930                    "task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
931                )
932                .into());
933            }
934
935            let mut attempt_dir = root_dir.clone();
936            attempt_dir.push("attempts");
937            attempt_dir.push(attempt.to_string());
938
939            let request = TaskSpawnRequest::new(
940                id.to_string(),
941                TaskSpawnInfo::new(
942                    command,
943                    inputs,
944                    requirements.clone(),
945                    hints.clone(),
946                    env.clone(),
947                ),
948                attempt,
949                attempt_dir.clone(),
950            );
951
952            let events = self
953                .backend
954                .spawn(request, self.token.clone())
955                .with_context(|| {
956                    format!(
957                        "failed to spawn task `{name}` in `{path}` (task id `{id}`)",
958                        name = task.name(),
959                        path = document.path(),
960                    )
961                })?;
962
963            if attempt > 0 {
964                progress(ProgressKind::TaskRetried {
965                    id,
966                    retry: attempt - 1,
967                });
968            }
969
970            // Await the spawned notification first
971            events.spawned.await.ok();
972
973            progress(ProgressKind::TaskExecutionStarted { id });
974
975            let result = events
976                .completed
977                .await
978                .expect("failed to receive response from spawned task");
979
980            progress(ProgressKind::TaskExecutionCompleted {
981                id,
982                result: &result,
983            });
984
985            let result = result.map_err(|e| {
986                EvaluationError::new(
987                    state.document.clone(),
988                    task_execution_failed(e, task.name(), id, task.name_span()),
989                )
990            })?;
991
992            // Update the task variable
993            let evaluated = EvaluatedTask::new(attempt_dir, result)?;
994            if version >= SupportedVersion::V1(V1::Two) {
995                let task = state.scopes[TASK_SCOPE_INDEX.0]
996                    .get_mut(TASK_VAR_NAME)
997                    .unwrap()
998                    .as_task_mut()
999                    .unwrap();
1000
1001                task.set_attempt(attempt.try_into().with_context(|| {
1002                    format!(
1003                        "too many attempts were made to run task `{task}`",
1004                        task = state.task.name()
1005                    )
1006                })?);
1007                task.set_return_code(evaluated.result.exit_code);
1008            }
1009
1010            if let Err(e) = evaluated.handle_exit(&requirements, &self.downloader).await {
1011                if attempt >= max_retries {
1012                    return Err(EvaluationError::new(
1013                        state.document.clone(),
1014                        task_execution_failed(e, task.name(), id, task.name_span()),
1015                    ));
1016                }
1017
1018                attempt += 1;
1019
1020                info!(
1021                    "retrying execution of task `{name}` (retry {attempt})",
1022                    name = state.task.name()
1023                );
1024                continue;
1025            }
1026
1027            break evaluated;
1028        };
1029
1030        // Evaluate the remaining inputs (unused), and decls, and outputs
1031        for index in &nodes[current..] {
1032            match &graph[*index] {
1033                TaskGraphNode::Decl(decl) => {
1034                    self.evaluate_decl(id, &mut state, decl)
1035                        .await
1036                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1037                }
1038                TaskGraphNode::Output(decl) => {
1039                    self.evaluate_output(id, &mut state, decl, &evaluated)
1040                        .await
1041                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1042                }
1043                _ => {
1044                    unreachable!(
1045                        "only declarations and outputs should be evaluated after the command"
1046                    )
1047                }
1048            }
1049        }
1050
1051        // Take the output scope and return it
1052        let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
1053        drop(state);
1054        if let Some(section) = definition.output() {
1055            let indexes: HashMap<_, _> = section
1056                .declarations()
1057                .enumerate()
1058                .map(|(i, d)| (d.name().hashable(), i))
1059                .collect();
1060            outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
1061        }
1062
1063        // Write the outputs to the task's root directory
1064        write_json_file(root_dir.join(OUTPUTS_FILE), &outputs)?;
1065
1066        evaluated.outputs = Ok(outputs);
1067        Ok(evaluated)
1068    }
1069
1070    /// Evaluates a task input.
1071    async fn evaluate_input(
1072        &self,
1073        id: &str,
1074        state: &mut State<'_>,
1075        decl: &Decl<SyntaxNode>,
1076        inputs: &TaskInputs,
1077    ) -> Result<(), Diagnostic> {
1078        let name = decl.name();
1079        let decl_ty = decl.ty();
1080        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1081
1082        let (value, span) = match inputs.get(name.text()) {
1083            Some(input) => (input.clone(), name.span()),
1084            None => match decl.expr() {
1085                Some(expr) => {
1086                    debug!(
1087                        task_id = id,
1088                        task_name = state.task.name(),
1089                        document = state.document.uri().as_str(),
1090                        input_name = name.text(),
1091                        "evaluating input"
1092                    );
1093
1094                    let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1095                        state,
1096                        &self.downloader,
1097                        ROOT_SCOPE_INDEX,
1098                    ));
1099                    let value = evaluator.evaluate_expr(&expr).await?;
1100                    (value, expr.span())
1101                }
1102                _ => {
1103                    assert!(decl.ty().is_optional(), "type should be optional");
1104                    (Value::None, name.span())
1105                }
1106            },
1107        };
1108
1109        let value = value
1110            .coerce(&ty)
1111            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), span))?;
1112        state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1113
1114        // Insert an environment variable, if it is one
1115        if decl.env().is_some() {
1116            state.env.insert(
1117                name.text().to_string(),
1118                value
1119                    .as_primitive()
1120                    .expect("value should be primitive")
1121                    .raw(None)
1122                    .to_string(),
1123            );
1124        }
1125
1126        Ok(())
1127    }
1128
1129    /// Evaluates a task private declaration.
1130    async fn evaluate_decl(
1131        &self,
1132        id: &str,
1133        state: &mut State<'_>,
1134        decl: &Decl<SyntaxNode>,
1135    ) -> Result<(), Diagnostic> {
1136        let name = decl.name();
1137        debug!(
1138            task_id = id,
1139            task_name = state.task.name(),
1140            document = state.document.uri().as_str(),
1141            decl_name = name.text(),
1142            "evaluating private declaration",
1143        );
1144
1145        let decl_ty = decl.ty();
1146        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1147
1148        let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1149            state,
1150            &self.downloader,
1151            ROOT_SCOPE_INDEX,
1152        ));
1153
1154        let expr = decl.expr().expect("private decls should have expressions");
1155        let value = evaluator.evaluate_expr(&expr).await?;
1156        let value = value
1157            .coerce(&ty)
1158            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1159        state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1160
1161        // Insert an environment variable, if it is one
1162        if decl.env().is_some() {
1163            state.env.insert(
1164                name.text().to_string(),
1165                value
1166                    .as_primitive()
1167                    .expect("value should be primitive")
1168                    .raw(None)
1169                    .to_string(),
1170            );
1171        }
1172
1173        Ok(())
1174    }
1175
1176    /// Evaluates the runtime section.
1177    ///
1178    /// Returns both the task's hints and requirements.
1179    async fn evaluate_runtime_section(
1180        &self,
1181        id: &str,
1182        state: &State<'_>,
1183        section: &RuntimeSection<SyntaxNode>,
1184        inputs: &TaskInputs,
1185    ) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
1186        debug!(
1187            task_id = id,
1188            task_name = state.task.name(),
1189            document = state.document.uri().as_str(),
1190            "evaluating runtimes section",
1191        );
1192
1193        let mut requirements = HashMap::new();
1194        let mut hints = HashMap::new();
1195
1196        let version = state
1197            .document
1198            .version()
1199            .expect("document should have version");
1200        for item in section.items() {
1201            let name = item.name();
1202            match inputs.requirement(name.text()) {
1203                Some(value) => {
1204                    requirements.insert(name.text().to_string(), value.clone());
1205                    continue;
1206                }
1207                _ => {
1208                    if let Some(value) = inputs.hint(name.text()) {
1209                        hints.insert(name.text().to_string(), value.clone());
1210                        continue;
1211                    }
1212                }
1213            }
1214
1215            let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1216                state,
1217                &self.downloader,
1218                ROOT_SCOPE_INDEX,
1219            ));
1220
1221            let (types, requirement) = match task_requirement_types(version, name.text()) {
1222                Some(types) => (Some(types), true),
1223                None => match task_hint_types(version, name.text(), false) {
1224                    Some(types) => (Some(types), false),
1225                    None => (None, false),
1226                },
1227            };
1228
1229            // Evaluate and coerce to the expected type
1230            let expr = item.expr();
1231            let mut value = evaluator.evaluate_expr(&expr).await?;
1232            if let Some(types) = types {
1233                value = types
1234                    .iter()
1235                    .find_map(|ty| value.coerce(ty).ok())
1236                    .ok_or_else(|| {
1237                        multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1238                    })?;
1239            }
1240
1241            if requirement {
1242                requirements.insert(name.text().to_string(), value);
1243            } else {
1244                hints.insert(name.text().to_string(), value);
1245            }
1246        }
1247
1248        Ok((requirements, hints))
1249    }
1250
1251    /// Evaluates the requirements section.
1252    async fn evaluate_requirements_section(
1253        &self,
1254        id: &str,
1255        state: &State<'_>,
1256        section: &RequirementsSection<SyntaxNode>,
1257        inputs: &TaskInputs,
1258    ) -> Result<HashMap<String, Value>, Diagnostic> {
1259        debug!(
1260            task_id = id,
1261            task_name = state.task.name(),
1262            document = state.document.uri().as_str(),
1263            "evaluating requirements",
1264        );
1265
1266        let mut requirements = HashMap::new();
1267
1268        let version = state
1269            .document
1270            .version()
1271            .expect("document should have version");
1272        for item in section.items() {
1273            let name = item.name();
1274            if let Some(value) = inputs.requirement(name.text()) {
1275                requirements.insert(name.text().to_string(), value.clone());
1276                continue;
1277            }
1278
1279            let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1280                state,
1281                &self.downloader,
1282                ROOT_SCOPE_INDEX,
1283            ));
1284
1285            let types =
1286                task_requirement_types(version, name.text()).expect("requirement should be known");
1287
1288            // Evaluate and coerce to the expected type
1289            let expr = item.expr();
1290            let value = evaluator.evaluate_expr(&expr).await?;
1291            let value = types
1292                .iter()
1293                .find_map(|ty| value.coerce(ty).ok())
1294                .ok_or_else(|| {
1295                    multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1296                })?;
1297
1298            requirements.insert(name.text().to_string(), value);
1299        }
1300
1301        Ok(requirements)
1302    }
1303
1304    /// Evaluates the hints section.
1305    async fn evaluate_hints_section(
1306        &self,
1307        id: &str,
1308        state: &State<'_>,
1309        section: &TaskHintsSection<SyntaxNode>,
1310        inputs: &TaskInputs,
1311    ) -> Result<HashMap<String, Value>, Diagnostic> {
1312        debug!(
1313            task_id = id,
1314            task_name = state.task.name(),
1315            document = state.document.uri().as_str(),
1316            "evaluating hints section",
1317        );
1318
1319        let mut hints = HashMap::new();
1320
1321        for item in section.items() {
1322            let name = item.name();
1323            if let Some(value) = inputs.hint(name.text()) {
1324                hints.insert(name.text().to_string(), value.clone());
1325                continue;
1326            }
1327
1328            let mut evaluator = ExprEvaluator::new(
1329                TaskEvaluationContext::new(state, &self.downloader, ROOT_SCOPE_INDEX).with_task(),
1330            );
1331
1332            let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
1333            hints.insert(name.text().to_string(), value);
1334        }
1335
1336        Ok(hints)
1337    }
1338
1339    /// Evaluates the command of a task.
1340    ///
1341    /// Returns the evaluated command and the mounts to use for spawning the
1342    /// task.
1343    async fn evaluate_command(
1344        &self,
1345        id: &str,
1346        state: &State<'_>,
1347        section: &CommandSection<SyntaxNode>,
1348    ) -> EvaluationResult<(String, Vec<Input>)> {
1349        debug!(
1350            task_id = id,
1351            task_name = state.task.name(),
1352            document = state.document.uri().as_str(),
1353            "evaluating command section",
1354        );
1355
1356        // Determine the inputs to the task
1357        let mut inputs = Vec::new();
1358
1359        // Discover every input that's visible to the scope
1360        ScopeRef::new(&state.scopes, TASK_SCOPE_INDEX.0).for_each(|_, v| {
1361            v.visit_paths(false, &mut |_, value| {
1362                inputs.push(Input::from_primitive(value)?);
1363                Ok(())
1364            })
1365        })?;
1366
1367        // The temp directory should always be an input
1368        inputs.push(Input::new(
1369            InputKind::Directory,
1370            EvaluationPath::Local(state.temp_dir.to_path_buf()),
1371        ));
1372
1373        // Localize the inputs
1374        self.backend
1375            .localize_inputs(&self.downloader, &mut inputs)
1376            .await
1377            .map_err(|e| {
1378                EvaluationError::new(
1379                    state.document.clone(),
1380                    task_localization_failed(e, state.task.name(), state.task.name_span()),
1381                )
1382            })?;
1383
1384        if enabled!(Level::DEBUG) {
1385            for input in inputs.iter() {
1386                if let Some(location) = input.location() {
1387                    debug!(
1388                        task_id = id,
1389                        task_name = state.task.name(),
1390                        document = state.document.uri().as_str(),
1391                        "task input `{path}` (downloaded to `{location}`) mapped to `{guest_path}`",
1392                        path = input.path().display(),
1393                        location = location.display(),
1394                        guest_path = input.guest_path().unwrap_or(""),
1395                    );
1396                } else {
1397                    debug!(
1398                        task_id = id,
1399                        task_name = state.task.name(),
1400                        document = state.document.uri().as_str(),
1401                        "task input `{path}` mapped to `{guest_path}`",
1402                        path = input.path().display(),
1403                        guest_path = input.guest_path().unwrap_or(""),
1404                    );
1405                }
1406            }
1407        }
1408
1409        let mut command = String::new();
1410        match section.strip_whitespace() {
1411            Some(parts) => {
1412                let mut evaluator = ExprEvaluator::new(
1413                    TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1414                        .with_inputs(&inputs),
1415                );
1416
1417                for part in parts {
1418                    match part {
1419                        StrippedCommandPart::Text(t) => {
1420                            command.push_str(t.as_str());
1421                        }
1422                        StrippedCommandPart::Placeholder(placeholder) => {
1423                            evaluator
1424                                .evaluate_placeholder(&placeholder, &mut command)
1425                                .await
1426                                .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1427                        }
1428                    }
1429                }
1430            }
1431            _ => {
1432                warn!(
1433                    "command for task `{task}` in `{uri}` has mixed indentation; whitespace \
1434                     stripping was skipped",
1435                    task = state.task.name(),
1436                    uri = state.document.uri(),
1437                );
1438
1439                let mut evaluator = ExprEvaluator::new(
1440                    TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1441                        .with_inputs(&inputs),
1442                );
1443
1444                let heredoc = section.is_heredoc();
1445                for part in section.parts() {
1446                    match part {
1447                        CommandPart::Text(t) => {
1448                            t.unescape_to(heredoc, &mut command);
1449                        }
1450                        CommandPart::Placeholder(placeholder) => {
1451                            evaluator
1452                                .evaluate_placeholder(&placeholder, &mut command)
1453                                .await
1454                                .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1455                        }
1456                    }
1457                }
1458            }
1459        }
1460
1461        Ok((command, inputs))
1462    }
1463
1464    /// Evaluates sections prior to spawning the command.
1465    ///
1466    /// This method evaluates the following sections:
1467    ///   * runtime
1468    ///   * requirements
1469    ///   * hints
1470    ///   * command
1471    async fn evaluate_sections(
1472        &self,
1473        id: &str,
1474        state: &mut State<'_>,
1475        definition: &TaskDefinition<SyntaxNode>,
1476        inputs: &TaskInputs,
1477        attempt: u64,
1478    ) -> EvaluationResult<EvaluatedSections> {
1479        // Start by evaluating requirements and hints
1480        let (requirements, hints) = match definition.runtime() {
1481            Some(section) => self
1482                .evaluate_runtime_section(id, state, &section, inputs)
1483                .await
1484                .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1485            _ => (
1486                match definition.requirements() {
1487                    Some(section) => self
1488                        .evaluate_requirements_section(id, state, &section, inputs)
1489                        .await
1490                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1491                    None => Default::default(),
1492                },
1493                match definition.hints() {
1494                    Some(section) => self
1495                        .evaluate_hints_section(id, state, &section, inputs)
1496                        .await
1497                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1498                    None => Default::default(),
1499                },
1500            ),
1501        };
1502
1503        // Update or insert the `task` variable in the task scope
1504        // TODO: if task variables become visible in `requirements` or `hints` section,
1505        // this needs to be relocated to before we evaluate those sections
1506        if state.document.version() >= Some(SupportedVersion::V1(V1::Two)) {
1507            // Get the execution constraints
1508            let constraints = self
1509                .backend
1510                .constraints(&requirements, &hints)
1511                .with_context(|| {
1512                    format!(
1513                        "failed to get constraints for task `{task}`",
1514                        task = state.task.name()
1515                    )
1516                })?;
1517
1518            let task = TaskValue::new_v1(
1519                state.task.name(),
1520                id,
1521                definition,
1522                constraints,
1523                attempt.try_into().with_context(|| {
1524                    format!(
1525                        "too many attempts were made to run task `{task}`",
1526                        task = state.task.name()
1527                    )
1528                })?,
1529            );
1530
1531            let scope = &mut state.scopes[TASK_SCOPE_INDEX.0];
1532            if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1533                *v = Value::Task(task);
1534            } else {
1535                scope.insert(TASK_VAR_NAME, Value::Task(task));
1536            }
1537        }
1538
1539        let (command, inputs) = self
1540            .evaluate_command(
1541                id,
1542                state,
1543                &definition.command().expect("must have command section"),
1544            )
1545            .await?;
1546
1547        Ok(EvaluatedSections {
1548            command,
1549            requirements: Arc::new(requirements),
1550            hints: Arc::new(hints),
1551            inputs,
1552        })
1553    }
1554
1555    /// Evaluates a task output.
1556    async fn evaluate_output(
1557        &self,
1558        id: &str,
1559        state: &mut State<'_>,
1560        decl: &Decl<SyntaxNode>,
1561        evaluated: &EvaluatedTask,
1562    ) -> Result<(), Diagnostic> {
1563        let name = decl.name();
1564        debug!(
1565            task_id = id,
1566            task_name = state.task.name(),
1567            document = state.document.uri().as_str(),
1568            output_name = name.text(),
1569            "evaluating output",
1570        );
1571
1572        let decl_ty = decl.ty();
1573        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1574        let mut evaluator = ExprEvaluator::new(
1575            TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1576                .with_work_dir(&evaluated.result.work_dir)
1577                .with_stdout(&evaluated.result.stdout)
1578                .with_stderr(&evaluated.result.stderr),
1579        );
1580
1581        let expr = decl.expr().expect("outputs should have expressions");
1582        let value = evaluator.evaluate_expr(&expr).await?;
1583
1584        // First coerce the output value to the expected type
1585        let mut value = value
1586            .coerce(&ty)
1587            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1588
1589        let result = if let Some(guest_work_dir) = self.backend.guest_work_dir() {
1590            // Perform guest to host path translation and check for existence
1591            value.visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1592                let path = match value {
1593                    PrimitiveValue::File(path) => path,
1594                    PrimitiveValue::Directory(path) => path,
1595                    _ => unreachable!("only file and directory values should be visited"),
1596                };
1597
1598                // If the path isn't in the temp directory or the attempt directory, perform
1599                // translation
1600                if !Path::new(path.as_str()).starts_with(state.temp_dir)
1601                    && !Path::new(path.as_str()).starts_with(evaluated.attempt_dir())
1602                {
1603                    // It's a file scheme'd URL, treat it as an absolute guest path
1604                    let guest = if path::is_file_url(path) {
1605                        path::parse_url(path)
1606                            .and_then(|u| u.to_file_path().ok())
1607                            .ok_or_else(|| anyhow!("guest path `{path}` is not a valid file URI"))?
1608                    } else if path::is_url(path) {
1609                        // Treat other URLs as if they exist
1610                        // TODO: should probably issue a HEAD request to verify
1611                        return Ok(true);
1612                    } else {
1613                        // Otherwise, treat as relative to the guest working directory
1614                        guest_work_dir.join(path.as_str())
1615                    };
1616
1617                    // If the path is inside of the working directory, join with the task's working
1618                    // directory
1619                    let host = if let Ok(stripped) = guest.strip_prefix(guest_work_dir) {
1620                        Cow::Owned(
1621                            evaluated.result.work_dir.join(
1622                                stripped.to_str().with_context(|| {
1623                                    format!("output path `{path}` is not UTF-8")
1624                                })?,
1625                            )?,
1626                        )
1627                    } else {
1628                        evaluated
1629                            .inputs()
1630                            .iter()
1631                            .filter_map(|i| {
1632                                Some((i.path(), guest.strip_prefix(i.guest_path()?).ok()?))
1633                            })
1634                            .min_by(|(_, a), (_, b)| a.as_os_str().len().cmp(&b.as_os_str().len()))
1635                            .and_then(|(path, stripped)| {
1636                                if stripped.as_os_str().is_empty() {
1637                                    return Some(Cow::Borrowed(path));
1638                                }
1639
1640                                Some(Cow::Owned(path.join(stripped.to_str()?).ok()?))
1641                            })
1642                            .ok_or_else(|| {
1643                                anyhow!("guest path `{path}` is not within a container mount")
1644                            })?
1645                    };
1646
1647                    // Update the value to the host path
1648                    *Arc::make_mut(path) = host.into_owned().try_into()?;
1649                }
1650
1651                // Finally, ensure the value exists
1652                value.ensure_path_exists(optional)
1653            })
1654        } else {
1655            // Backend isn't containerized, just join host paths and check for existence
1656            value.visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1657                if let Some(work_dir) = evaluated.result.work_dir.as_local() {
1658                    value.join_path_to(work_dir);
1659                }
1660
1661                value.ensure_path_exists(optional)
1662            })
1663        };
1664
1665        result.map_err(|e| {
1666            output_evaluation_failed(e, state.task.name(), true, name.text(), name.span())
1667        })?;
1668
1669        state.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
1670        Ok(())
1671    }
1672}