wdl_engine/eval/v1/
task.rs

1//! Implementation of evaluation for V1 tasks.
2
3use std::borrow::Cow;
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fs;
7use std::mem;
8use std::path::Path;
9use std::path::absolute;
10use std::str::FromStr;
11use std::sync::Arc;
12
13use anyhow::Context;
14use anyhow::Result;
15use anyhow::anyhow;
16use anyhow::bail;
17use bimap::BiHashMap;
18use indexmap::IndexMap;
19use petgraph::algo::toposort;
20use tokio::task::JoinSet;
21use tracing::Level;
22use tracing::debug;
23use tracing::enabled;
24use tracing::error;
25use tracing::info;
26use tracing::warn;
27use wdl_analysis::Document;
28use wdl_analysis::diagnostics::Io;
29use wdl_analysis::diagnostics::multiple_type_mismatch;
30use wdl_analysis::diagnostics::unknown_name;
31use wdl_analysis::document::TASK_VAR_NAME;
32use wdl_analysis::document::Task;
33use wdl_analysis::eval::v1::TaskGraphBuilder;
34use wdl_analysis::eval::v1::TaskGraphNode;
35use wdl_analysis::types::Optional;
36use wdl_analysis::types::PrimitiveType;
37use wdl_analysis::types::Type;
38use wdl_analysis::types::v1::task_hint_types;
39use wdl_analysis::types::v1::task_requirement_types;
40use wdl_ast::Ast;
41use wdl_ast::AstNode;
42use wdl_ast::AstToken;
43use wdl_ast::Diagnostic;
44use wdl_ast::Span;
45use wdl_ast::SupportedVersion;
46use wdl_ast::v1::CommandPart;
47use wdl_ast::v1::CommandSection;
48use wdl_ast::v1::Decl;
49use wdl_ast::v1::RequirementsSection;
50use wdl_ast::v1::RuntimeSection;
51use wdl_ast::v1::StrippedCommandPart;
52use wdl_ast::v1::TASK_HINT_CACHEABLE;
53use wdl_ast::v1::TASK_HINT_DISKS;
54use wdl_ast::v1::TASK_HINT_GPU;
55use wdl_ast::v1::TASK_HINT_MAX_CPU;
56use wdl_ast::v1::TASK_HINT_MAX_CPU_ALIAS;
57use wdl_ast::v1::TASK_HINT_MAX_MEMORY;
58use wdl_ast::v1::TASK_HINT_MAX_MEMORY_ALIAS;
59use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER;
60use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER_ALIAS;
61use wdl_ast::v1::TASK_REQUIREMENT_CPU;
62use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
63use wdl_ast::v1::TASK_REQUIREMENT_GPU;
64use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES;
65use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES_ALIAS;
66use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
67use wdl_ast::v1::TaskDefinition;
68use wdl_ast::v1::TaskHintsSection;
69use wdl_ast::version::V1;
70
71use super::TopLevelEvaluator;
72use crate::CancellationContextState;
73use crate::Coercible;
74use crate::ContentKind;
75use crate::EngineEvent;
76use crate::EvaluationContext;
77use crate::EvaluationError;
78use crate::EvaluationResult;
79use crate::GuestPath;
80use crate::HiddenValue;
81use crate::HostPath;
82use crate::Input;
83use crate::ONE_GIBIBYTE;
84use crate::Object;
85use crate::Outputs;
86use crate::Scope;
87use crate::ScopeIndex;
88use crate::ScopeRef;
89use crate::StorageUnit;
90use crate::TaskInputs;
91use crate::TaskPostEvaluationData;
92use crate::TaskPostEvaluationValue;
93use crate::TaskPreEvaluationValue;
94use crate::Value;
95use crate::backend::TaskSpawnInfo;
96use crate::backend::TaskSpawnRequest;
97use crate::cache::KeyRequest;
98use crate::config::CallCachingMode;
99use crate::config::Config;
100use crate::config::DEFAULT_TASK_SHELL;
101use crate::config::MAX_RETRIES;
102use crate::convert_unit_string;
103use crate::diagnostics::decl_evaluation_failed;
104use crate::diagnostics::runtime_type_mismatch;
105use crate::diagnostics::task_execution_failed;
106use crate::diagnostics::task_localization_failed;
107use crate::eval::EvaluatedTask;
108use crate::eval::trie::InputTrie;
109use crate::http::Transferer;
110use crate::path::EvaluationPath;
111use crate::path::is_file_url;
112use crate::path::is_supported_url;
113use crate::tree::SyntaxNode;
114use crate::v1::INPUTS_FILE;
115use crate::v1::OUTPUTS_FILE;
116use crate::v1::expr::ExprEvaluator;
117use crate::v1::write_json_file;
118
119/// The default container requirement.
120pub const DEFAULT_TASK_REQUIREMENT_CONTAINER: &str = "ubuntu:latest";
121/// The default value for the `cpu` requirement.
122pub const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
123/// The default value for the `memory` requirement.
124pub const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * (ONE_GIBIBYTE as i64);
125/// The default value for the `max_retries` requirement.
126pub const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
127/// The default value for the `disks` requirement (in GiB).
128pub const DEFAULT_TASK_REQUIREMENT_DISKS: f64 = 1.0;
129/// The default GPU count when a GPU is required but no supported hint is
130/// provided.
131pub const DEFAULT_GPU_COUNT: u64 = 1;
132
133/// The index of a task's root scope.
134const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
135/// The index of a task's output scope.
136const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
137/// The index of the evaluation scope where the WDL 1.2 `task` variable is
138/// visible.
139const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
140
141/// Gets the `container` requirement from a requirements map.
142pub(crate) fn container<'a>(
143    requirements: &'a HashMap<String, Value>,
144    default: Option<&'a str>,
145) -> Cow<'a, str> {
146    requirements
147        .get(TASK_REQUIREMENT_CONTAINER)
148        .or_else(|| requirements.get(TASK_REQUIREMENT_CONTAINER_ALIAS))
149        .and_then(|v| -> Option<Cow<'_, str>> {
150            // If the value is an array, use the first element or the default
151            // Note: in the future we should be resolving which element in the array is
152            // usable; this will require some work in Crankshaft to enable
153            if let Some(array) = v.as_array() {
154                return array.as_slice().first().map(|v| {
155                    v.as_string()
156                        .expect("type should be string")
157                        .as_ref()
158                        .into()
159                });
160            }
161
162            Some(
163                v.coerce(None, &PrimitiveType::String.into())
164                    .expect("type should coerce")
165                    .unwrap_string()
166                    .as_ref()
167                    .clone()
168                    .into(),
169            )
170        })
171        .and_then(|v| {
172            // Treat star as the default
173            if v == "*" { None } else { Some(v) }
174        })
175        .unwrap_or_else(|| {
176            default
177                .map(Into::into)
178                .unwrap_or(DEFAULT_TASK_REQUIREMENT_CONTAINER.into())
179        })
180}
181
182/// Gets the `cpu` requirement from a requirements map.
183pub(crate) fn cpu(requirements: &HashMap<String, Value>) -> f64 {
184    requirements
185        .get(TASK_REQUIREMENT_CPU)
186        .map(|v| {
187            v.coerce(None, &PrimitiveType::Float.into())
188                .expect("type should coerce")
189                .unwrap_float()
190        })
191        .unwrap_or(DEFAULT_TASK_REQUIREMENT_CPU)
192}
193
194/// Gets the `max_cpu` hint from a hints map.
195pub(crate) fn max_cpu(hints: &HashMap<String, Value>) -> Option<f64> {
196    hints
197        .get(TASK_HINT_MAX_CPU)
198        .or_else(|| hints.get(TASK_HINT_MAX_CPU_ALIAS))
199        .map(|v| {
200            v.coerce(None, &PrimitiveType::Float.into())
201                .expect("type should coerce")
202                .unwrap_float()
203        })
204}
205
206/// Gets the `memory` requirement from a requirements map.
207pub(crate) fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
208    Ok(requirements
209        .get(TASK_REQUIREMENT_MEMORY)
210        .map(|v| {
211            if let Some(v) = v.as_integer() {
212                return Ok(v);
213            }
214
215            if let Some(s) = v.as_string() {
216                return convert_unit_string(s)
217                    .and_then(|v| v.try_into().ok())
218                    .with_context(|| {
219                        format!("task specifies an invalid `memory` requirement `{s}`")
220                    });
221            }
222
223            unreachable!("value should be an integer or string");
224        })
225        .transpose()?
226        .unwrap_or(DEFAULT_TASK_REQUIREMENT_MEMORY))
227}
228
229/// Gets the `max_memory` hint from a hints map.
230pub(crate) fn max_memory(hints: &HashMap<String, Value>) -> Result<Option<i64>> {
231    hints
232        .get(TASK_HINT_MAX_MEMORY)
233        .or_else(|| hints.get(TASK_HINT_MAX_MEMORY_ALIAS))
234        .map(|v| {
235            if let Some(v) = v.as_integer() {
236                return Ok(v);
237            }
238
239            if let Some(s) = v.as_string() {
240                return convert_unit_string(s)
241                    .and_then(|v| v.try_into().ok())
242                    .with_context(|| {
243                        format!("task specifies an invalid `memory` requirement `{s}`")
244                    });
245            }
246
247            unreachable!("value should be an integer or string");
248        })
249        .transpose()
250}
251
252/// Gets the number of required GPUs from requirements and hints.
253pub(crate) fn gpu(
254    requirements: &HashMap<String, Value>,
255    hints: &HashMap<String, Value>,
256) -> Option<u64> {
257    // If `requirements { gpu: false }` or there is no `gpu` requirement, return
258    // `None`.
259    let Some(true) = requirements
260        .get(TASK_REQUIREMENT_GPU)
261        .and_then(|v| v.as_boolean())
262    else {
263        return None;
264    };
265
266    // If there is no `gpu` hint giving us more detail on the request, use the
267    // default count.
268    let Some(hint) = hints.get(TASK_HINT_GPU) else {
269        return Some(DEFAULT_GPU_COUNT);
270    };
271
272    // A string `gpu` hint is allowed by the spec, but we do not support them yet.
273    // Fall back to the default count.
274    //
275    // TODO(clay): support string hints for GPU specifications.
276    if let Some(hint) = hint.as_string() {
277        warn!(
278            %hint,
279            "string `gpu` hints are not supported; falling back to {DEFAULT_GPU_COUNT} GPU(s)"
280        );
281        return Some(DEFAULT_GPU_COUNT);
282    }
283
284    match hint.as_integer() {
285        Some(count) if count >= 1 => Some(count as u64),
286        // If the hint is zero or negative, it's not clear what the user intends. Maybe they have
287        // tried to disable GPUs by setting the count to zero, or have made a logic error. Emit a
288        // warning, and continue with no GPU request.
289        Some(count) => {
290            warn!(
291                %count,
292                "`gpu` hint specified {count} GPU(s); no GPUs will be requested for execution"
293            );
294            None
295        }
296        None => {
297            // Typechecking should have already validated that the hint is an integer or
298            // a string.
299            unreachable!("`gpu` hint must be an integer or string")
300        }
301    }
302}
303
304/// Represents the type of a disk.
305///
306/// Disk types are specified via hints.
307#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
308pub enum DiskType {
309    /// The disk type is a solid state drive.
310    SSD,
311    /// The disk type is a hard disk drive.
312    HDD,
313}
314
315impl FromStr for DiskType {
316    type Err = ();
317
318    fn from_str(s: &str) -> Result<Self, Self::Err> {
319        match s {
320            "SSD" => Ok(Self::SSD),
321            "HDD" => Ok(Self::HDD),
322            _ => Err(()),
323        }
324    }
325}
326
327/// Represents a task disk requirement.
328pub struct DiskRequirement {
329    /// The size of the disk, in GiB.
330    pub size: i64,
331
332    /// The disk type as specified by a corresponding task hint.
333    pub ty: Option<DiskType>,
334}
335
336/// Gets the `disks` requirement.
337///
338/// Upon success, returns a mapping of mount point to disk requirement.
339pub(crate) fn disks<'a>(
340    requirements: &'a HashMap<String, Value>,
341    hints: &HashMap<String, Value>,
342) -> Result<HashMap<&'a str, DiskRequirement>> {
343    /// Helper for looking up a disk type from the hints.
344    ///
345    /// If we don't recognize the specification, we ignore it.
346    fn lookup_type(mount_point: Option<&str>, hints: &HashMap<String, Value>) -> Option<DiskType> {
347        hints.get(TASK_HINT_DISKS).and_then(|v| {
348            if let Some(ty) = v.as_string() {
349                return ty.parse().ok();
350            }
351
352            if let Some(map) = v.as_map() {
353                // Find the corresponding key; we have to scan the keys because the map is
354                // storing primitive values
355                if let Some((_, v)) = map.iter().find(|(k, _)| match (k, mount_point) {
356                    (None, None) => true,
357                    (None, Some(_)) | (Some(_), None) => false,
358                    (Some(k), Some(mount_point)) => k
359                        .as_string()
360                        .map(|k| k.as_str() == mount_point)
361                        .unwrap_or(false),
362                }) {
363                    return v.as_string().and_then(|ty| ty.parse().ok());
364                }
365            }
366
367            None
368        })
369    }
370
371    /// Parses a disk specification into a size (in GiB) and optional mount
372    /// point.
373    fn parse_disk_spec(spec: &str) -> Option<(i64, Option<&str>)> {
374        let iter = spec.split_whitespace();
375        let mut first = None;
376        let mut second = None;
377        let mut third = None;
378
379        for part in iter {
380            if first.is_none() {
381                first = Some(part);
382                continue;
383            }
384
385            if second.is_none() {
386                second = Some(part);
387                continue;
388            }
389
390            if third.is_none() {
391                third = Some(part);
392                continue;
393            }
394
395            return None;
396        }
397
398        match (first, second, third) {
399            (None, None, None) => None,
400            (Some(size), None, None) => {
401                // Specification is `<size>` (in GiB)
402                Some((size.parse().ok()?, None))
403            }
404            (Some(first), Some(second), None) => {
405                // Check for `<size> <unit>`; convert from the specified unit to GiB
406                if let Ok(size) = first.parse() {
407                    let unit: StorageUnit = second.parse().ok()?;
408                    let size = unit.bytes(size)? / (ONE_GIBIBYTE as u64);
409                    return Some((size.try_into().ok()?, None));
410                }
411
412                // Specification is `<mount-point> <size>` (where size is already in GiB)
413                // The mount point must be absolute, i.e. start with `/`
414                if !first.starts_with('/') {
415                    return None;
416                }
417
418                Some((second.parse().ok()?, Some(first)))
419            }
420            (Some(mount_point), Some(size), Some(unit)) => {
421                // Specification is `<mount-point> <size> <units>`
422                let unit: StorageUnit = unit.parse().ok()?;
423                let size = unit.bytes(size.parse().ok()?)? / (ONE_GIBIBYTE as u64);
424
425                // Mount point must be absolute
426                if !mount_point.starts_with('/') {
427                    return None;
428                }
429
430                Some((size.try_into().ok()?, Some(mount_point)))
431            }
432            _ => unreachable!("should have one, two, or three values"),
433        }
434    }
435
436    /// Inserts a disk into the disks map.
437    fn insert_disk<'a>(
438        spec: &'a str,
439        hints: &HashMap<String, Value>,
440        disks: &mut HashMap<&'a str, DiskRequirement>,
441    ) -> Result<()> {
442        let (size, mount_point) =
443            parse_disk_spec(spec).with_context(|| format!("invalid disk specification `{spec}"))?;
444
445        let prev = disks.insert(
446            mount_point.unwrap_or("/"),
447            DiskRequirement {
448                size,
449                ty: lookup_type(mount_point, hints),
450            },
451        );
452
453        if prev.is_some() {
454            bail!(
455                "duplicate mount point `{mp}` specified in `disks` requirement",
456                mp = mount_point.unwrap_or("/")
457            );
458        }
459
460        Ok(())
461    }
462
463    let mut disks = HashMap::new();
464    if let Some(v) = requirements.get(TASK_REQUIREMENT_DISKS) {
465        if let Some(size) = v.as_integer() {
466            // Disk spec is just the size (in GiB)
467            if size < 0 {
468                bail!("task requirement `disks` cannot be less than zero");
469            }
470
471            disks.insert(
472                "/",
473                DiskRequirement {
474                    size,
475                    ty: lookup_type(None, hints),
476                },
477            );
478        } else if let Some(spec) = v.as_string() {
479            insert_disk(spec, hints, &mut disks)?;
480        } else if let Some(v) = v.as_array() {
481            for spec in v.as_slice() {
482                insert_disk(
483                    spec.as_string().expect("spec should be a string"),
484                    hints,
485                    &mut disks,
486                )?;
487            }
488        } else {
489            unreachable!("value should be an integer, string, or array");
490        }
491    }
492
493    Ok(disks)
494}
495
496/// Gets the `preemptible` hint from a hints map.
497///
498/// This hint is not part of the WDL standard but is used for compatibility with
499/// Cromwell where backends can support preemptible retries before using
500/// dedicated instances.
501pub(crate) fn preemptible(hints: &HashMap<String, Value>) -> i64 {
502    const TASK_HINT_PREEMPTIBLE: &str = "preemptible";
503    const DEFAULT_TASK_HINT_PREEMPTIBLE: i64 = 0;
504
505    hints
506        .get(TASK_HINT_PREEMPTIBLE)
507        .and_then(|v| {
508            Some(
509                v.coerce(None, &PrimitiveType::Integer.into())
510                    .ok()?
511                    .unwrap_integer(),
512            )
513        })
514        .unwrap_or(DEFAULT_TASK_HINT_PREEMPTIBLE)
515}
516
517/// Gets the `max_retries` requirement from a requirements map with config
518/// fallback.
519pub(crate) fn max_retries(requirements: &HashMap<String, Value>, config: &Config) -> u64 {
520    requirements
521        .get(TASK_REQUIREMENT_MAX_RETRIES)
522        .or_else(|| requirements.get(TASK_REQUIREMENT_MAX_RETRIES_ALIAS))
523        .and_then(|v| v.as_integer())
524        .map(|v| v as u64)
525        .or(config.task.retries)
526        .unwrap_or(DEFAULT_TASK_REQUIREMENT_MAX_RETRIES)
527}
528
529/// Gets the `cacheable` hint from a hints map with config fallback.
530pub(crate) fn cacheable(hints: &HashMap<String, Value>, config: &Config) -> bool {
531    hints
532        .get(TASK_HINT_CACHEABLE)
533        .and_then(|v| v.as_boolean())
534        .unwrap_or(match config.task.cache {
535            CallCachingMode::Off | CallCachingMode::Explicit => false,
536            CallCachingMode::On => true,
537        })
538}
539
540/// Used to evaluate expressions in tasks.
541struct TaskEvaluationContext<'a, 'b> {
542    /// The associated evaluation state.
543    state: &'a mut State<'b>,
544    /// The current evaluation scope.
545    scope: ScopeIndex,
546    /// The task work directory.
547    ///
548    /// This is `None` unless the output section is being evaluated.
549    work_dir: Option<&'a EvaluationPath>,
550    /// The standard out value to use.
551    ///
552    /// This field is only available after task execution.
553    stdout: Option<&'a Value>,
554    /// The standard error value to use.
555    ///
556    /// This field is only available after task execution.
557    stderr: Option<&'a Value>,
558    /// Whether or not the evaluation has associated task information.
559    ///
560    /// This is `true` when evaluating hints sections.
561    task: bool,
562}
563
564impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
565    /// Constructs a new expression evaluation context.
566    pub fn new(state: &'a mut State<'b>, scope: ScopeIndex) -> Self {
567        Self {
568            state,
569            scope,
570            work_dir: None,
571            stdout: None,
572            stderr: None,
573            task: false,
574        }
575    }
576
577    /// Sets the task's work directory to use for the evaluation context.
578    pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
579        self.work_dir = Some(work_dir);
580        self
581    }
582
583    /// Sets the stdout value to use for the evaluation context.
584    pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
585        self.stdout = Some(stdout);
586        self
587    }
588
589    /// Sets the stderr value to use for the evaluation context.
590    pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
591        self.stderr = Some(stderr);
592        self
593    }
594
595    /// Marks the evaluation as having associated task information.
596    ///
597    /// This is used in evaluating hints sections.
598    pub fn with_task(mut self) -> Self {
599        self.task = true;
600        self
601    }
602}
603
604impl EvaluationContext for TaskEvaluationContext<'_, '_> {
605    fn version(&self) -> SupportedVersion {
606        self.state
607            .document
608            .version()
609            .expect("document should have a version")
610    }
611
612    fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
613        ScopeRef::new(&self.state.scopes, self.scope)
614            .lookup(name)
615            .cloned()
616            .ok_or_else(|| unknown_name(name, span))
617    }
618
619    fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
620        crate::resolve_type_name(self.state.document, name, span)
621    }
622
623    fn base_dir(&self) -> &EvaluationPath {
624        self.work_dir.unwrap_or(&self.state.base_dir)
625    }
626
627    fn temp_dir(&self) -> &Path {
628        self.state.temp_dir
629    }
630
631    fn stdout(&self) -> Option<&Value> {
632        self.stdout
633    }
634
635    fn stderr(&self) -> Option<&Value> {
636        self.stderr
637    }
638
639    fn task(&self) -> Option<&Task> {
640        if self.task {
641            Some(self.state.task)
642        } else {
643            None
644        }
645    }
646
647    fn transferer(&self) -> &dyn Transferer {
648        self.state.transferer().as_ref()
649    }
650
651    fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
652        self.state.path_map.get_by_right(path).cloned()
653    }
654
655    fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
656        self.state.path_map.get_by_left(path).cloned()
657    }
658
659    fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
660        self.state.insert_backend_input(ContentKind::File, path)?;
661        Ok(())
662    }
663}
664
665/// Represents task evaluation state.
666struct State<'a> {
667    /// The top-level evaluation context.
668    top_level: &'a TopLevelEvaluator,
669    /// The temp directory.
670    temp_dir: &'a Path,
671    /// The base directory for evaluation.
672    ///
673    /// This is the document's directory.
674    ///
675    /// When outputs are evaluated, the task's work directory is used as the
676    /// base directory.
677    base_dir: EvaluationPath,
678    /// The document containing the workflow being evaluated.
679    document: &'a Document,
680    /// The task being evaluated.
681    task: &'a Task,
682    /// The scopes of the task being evaluated.
683    ///
684    /// The first scope is the root scope, the second is the output scope, and
685    /// the third is the scope where the "task" variable is visible in 1.2+
686    /// evaluations.
687    scopes: [Scope; 3],
688    /// The environment variables of the task.
689    ///
690    /// Environment variables do not change between retries.
691    env: IndexMap<String, String>,
692    /// The map of inputs to evaluated values.
693    ///
694    /// This is used for calculating the call cache key for the task.
695    inputs: BTreeMap<String, Value>,
696    /// The trie for mapping backend inputs.
697    backend_inputs: InputTrie,
698    /// A bi-map of host paths and guest paths.
699    path_map: BiHashMap<HostPath, GuestPath>,
700}
701
702impl<'a> State<'a> {
703    /// Get the [`Transferer`] for this evaluation.
704    fn transferer(&self) -> &Arc<dyn Transferer> {
705        &self.top_level.transferer
706    }
707
708    /// Constructs a new task evaluation state.
709    fn new(
710        top_level: &'a TopLevelEvaluator,
711        document: &'a Document,
712        task: &'a Task,
713        temp_dir: &'a Path,
714    ) -> Result<Self> {
715        // Tasks have a root scope (index 0), an output scope (index 1), and a `task`
716        // variable scope (index 2). The output scope inherits from the root scope and
717        // the task scope inherits from the output scope. Inputs and private
718        // declarations are evaluated into the root scope. Outputs are evaluated into
719        // the output scope. The task scope is used for evaluating expressions in both
720        // the command and output sections. Only the `task` variable in WDL 1.2 is
721        // introduced into the task scope; in previous WDL versions, the task scope will
722        // not have any local names.
723        let scopes = [
724            Scope::default(),
725            Scope::new(ROOT_SCOPE_INDEX),
726            Scope::new(OUTPUT_SCOPE_INDEX),
727        ];
728
729        let backend_inputs = if let Some(guest_inputs_dir) = top_level.backend.guest_inputs_dir() {
730            InputTrie::new_with_guest_dir(guest_inputs_dir)
731        } else {
732            InputTrie::new()
733        };
734
735        let document_path = document.uri();
736        let base_dir = EvaluationPath::parent_of(document_path.as_str()).with_context(|| {
737            format!(
738                "document `{path}` does not have a parent directory",
739                path = document.path()
740            )
741        })?;
742
743        Ok(Self {
744            top_level,
745            temp_dir,
746            base_dir,
747            document,
748            task,
749            scopes,
750            env: Default::default(),
751            inputs: Default::default(),
752            backend_inputs,
753            path_map: Default::default(),
754        })
755    }
756
757    /// Adds backend inputs to the state for any `File` or `Directory` values
758    /// referenced by the given value.
759    ///
760    /// If the backend doesn't use containers, remote inputs are immediately
761    /// localized.
762    ///
763    /// If the backend does use containers, remote inputs are localized during
764    /// the call to `localize_inputs`.
765    ///
766    /// This method also ensures that a `File` or `Directory` paths exist for
767    /// WDL 1.2+.
768    async fn add_backend_inputs(
769        &mut self,
770        is_optional: bool,
771        value: &mut Value,
772        transferer: Arc<dyn Transferer>,
773        needs_local_inputs: bool,
774    ) -> Result<()> {
775        // For WDL 1.2 documents, start by ensuring paths exist.
776        // This will replace any non-existent optional paths with `None`
777        if self
778            .document
779            .version()
780            .expect("document should have a version")
781            >= SupportedVersion::V1(V1::Two)
782        {
783            *value = value
784                .resolve_paths(
785                    is_optional,
786                    self.base_dir.as_local(),
787                    Some(transferer.as_ref()),
788                    &|path| Ok(path.clone()),
789                )
790                .await?;
791        }
792
793        // Add inputs to the backend
794        let mut urls = Vec::new();
795        value.visit_paths(&mut |is_file, path| {
796            // Insert a backend input for the path
797            if let Some(index) = self.insert_backend_input(
798                if is_file {
799                    ContentKind::File
800                } else {
801                    ContentKind::Directory
802                },
803                path,
804            )? {
805                // Check to see if there's no guest path for a remote URL that needs to be
806                // localized; if so, we must localize it now
807                if needs_local_inputs
808                    && self.backend_inputs.as_slice()[index].guest_path.is_none()
809                    && is_supported_url(path.as_str())
810                    && !is_file_url(path.as_str())
811                {
812                    urls.push((path.clone(), index));
813                }
814            }
815
816            Ok(())
817        })?;
818
819        if urls.is_empty() {
820            return Ok(());
821        }
822
823        // Download any necessary files
824        let mut downloads = JoinSet::new();
825        for (url, index) in urls {
826            let transferer = transferer.clone();
827            downloads.spawn(async move {
828                transferer
829                    .download(
830                        &url.as_str()
831                            .parse()
832                            .with_context(|| format!("invalid URL `{url}`"))?,
833                    )
834                    .await
835                    .with_context(|| anyhow!("failed to localize `{url}`"))
836                    .map(|l| (url, l, index))
837            });
838        }
839
840        // Wait for the downloads to complete
841        while let Some(result) = downloads.join_next().await {
842            let (url, location, index) =
843                result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}")))?;
844
845            let guest_path = GuestPath::new(location.to_str().with_context(|| {
846                format!(
847                    "download location `{location}` is not UTF-8",
848                    location = location.display()
849                )
850            })?);
851
852            // Map the URL to the guest path
853            self.path_map.insert(url, guest_path);
854
855            // Finally, set the location of the input
856            self.backend_inputs.as_slice_mut()[index].set_location(location);
857        }
858
859        Ok(())
860    }
861
862    /// Inserts a backend input into the state.
863    ///
864    /// Responsible for mapping host and guest paths.
865    fn insert_backend_input(
866        &mut self,
867        kind: ContentKind,
868        path: &HostPath,
869    ) -> Result<Option<usize>> {
870        // Insert an input for the path
871        if let Some(index) = self
872            .backend_inputs
873            .insert(kind, path.as_str(), &self.base_dir)?
874        {
875            // If the input has a guest path, map it
876            let input = &self.backend_inputs.as_slice()[index];
877            if let Some(guest_path) = &input.guest_path {
878                self.path_map.insert(path.clone(), guest_path.clone());
879            }
880
881            return Ok(Some(index));
882        }
883
884        Ok(None)
885    }
886}
887
888/// Represents the result of evaluating task sections before execution.
889struct EvaluatedSections {
890    /// The evaluated command.
891    command: String,
892    /// The evaluated requirements.
893    requirements: Arc<HashMap<String, Value>>,
894    /// The evaluated hints.
895    hints: Arc<HashMap<String, Value>>,
896}
897
898impl TopLevelEvaluator {
899    /// Evaluates the given task.
900    ///
901    /// Upon success, returns the evaluated task.
902    pub async fn evaluate_task(
903        &self,
904        document: &Document,
905        task: &Task,
906        inputs: &TaskInputs,
907        task_eval_root: impl AsRef<Path>,
908    ) -> EvaluationResult<EvaluatedTask> {
909        // We cannot evaluate a document with errors
910        if document.has_errors() {
911            return Err(anyhow!("cannot evaluate a document with errors").into());
912        }
913
914        let result = self
915            .perform_task_evaluation(document, task, inputs, task_eval_root.as_ref(), task.name())
916            .await;
917
918        if self.cancellation.user_canceled() {
919            return Err(EvaluationError::Canceled);
920        }
921
922        result
923    }
924
925    /// Performs the evaluation of the given task.
926    ///
927    /// This method skips checking the document (and its transitive imports) for
928    /// analysis errors as the check occurs at the `evaluate` entrypoint.
929    pub(crate) async fn perform_task_evaluation(
930        &self,
931        document: &Document,
932        task: &Task,
933        inputs: &TaskInputs,
934        task_eval_root: &Path,
935        id: &str,
936    ) -> EvaluationResult<EvaluatedTask> {
937        inputs.validate(document, task, None).with_context(|| {
938            format!(
939                "failed to validate the inputs to task `{task}`",
940                task = task.name()
941            )
942        })?;
943
944        let ast = match document.root().morph().ast() {
945            Ast::V1(ast) => ast,
946            _ => {
947                return Err(
948                    anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
949                );
950            }
951        };
952
953        // Find the task in the AST
954        let definition = ast
955            .tasks()
956            .find(|t| t.name().text() == task.name())
957            .expect("task should exist in the AST");
958
959        let version = document.version().expect("document should have version");
960
961        // Build an evaluation graph for the task
962        let mut diagnostics = Vec::new();
963        let graph = TaskGraphBuilder::default().build(version, &definition, &mut diagnostics);
964        assert!(
965            diagnostics.is_empty(),
966            "task evaluation graph should have no diagnostics"
967        );
968
969        debug!(
970            task_id = id,
971            task_name = task.name(),
972            document = document.uri().as_str(),
973            "evaluating task"
974        );
975
976        let task_eval_root = absolute(task_eval_root).with_context(|| {
977            format!(
978                "failed to determine absolute path of `{path}`",
979                path = task_eval_root.display()
980            )
981        })?;
982
983        // Create the temp directory now as it may be needed for task evaluation
984        let temp_dir = task_eval_root.join("tmp");
985        fs::create_dir_all(&temp_dir).with_context(|| {
986            format!(
987                "failed to create directory `{path}`",
988                path = temp_dir.display()
989            )
990        })?;
991
992        // Write the inputs to the task's root directory
993        write_json_file(task_eval_root.join(INPUTS_FILE), inputs)?;
994
995        let mut state = State::new(self, document, task, &temp_dir)?;
996        let nodes = toposort(&graph, None).expect("graph should be acyclic");
997        let mut current = 0;
998        while current < nodes.len() {
999            match &graph[nodes[current]] {
1000                TaskGraphNode::Input(decl) => {
1001                    state
1002                        .evaluate_input(id, decl, inputs)
1003                        .await
1004                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1005                }
1006                TaskGraphNode::Decl(decl) => {
1007                    state
1008                        .evaluate_decl(id, decl)
1009                        .await
1010                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1011                }
1012                TaskGraphNode::Output(_) => {
1013                    // Stop at the first output
1014                    break;
1015                }
1016                TaskGraphNode::Command(_)
1017                | TaskGraphNode::Runtime(_)
1018                | TaskGraphNode::Requirements(_)
1019                | TaskGraphNode::Hints(_) => {
1020                    // Skip these sections for now; they'll evaluate in the
1021                    // retry loop
1022                }
1023            }
1024
1025            current += 1;
1026        }
1027
1028        let mut cached;
1029        let env = Arc::new(mem::take(&mut state.env));
1030        // Spawn the task in a retry loop
1031        let mut attempt = 0;
1032        let mut previous_task_data: Option<Arc<TaskPostEvaluationData>> = None;
1033        let mut evaluated = loop {
1034            if self.cancellation.state() != CancellationContextState::NotCanceled {
1035                return Err(EvaluationError::Canceled);
1036            }
1037
1038            let EvaluatedSections {
1039                command,
1040                requirements,
1041                hints,
1042            } = state
1043                .evaluate_sections(id, &definition, inputs, attempt, previous_task_data.clone())
1044                .await?;
1045
1046            // Get the maximum number of retries, either from the task's requirements or
1047            // from configuration
1048            let max_retries = max_retries(&requirements, &self.config);
1049
1050            if max_retries > MAX_RETRIES {
1051                return Err(anyhow!(
1052                    "task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
1053                )
1054                .into());
1055            }
1056
1057            let backend_inputs = state.localize_inputs(id).await?;
1058
1059            // Calculate the cache key on the first attempt only
1060            let mut key = if attempt == 0
1061                && let Some(cache) = &self.cache
1062            {
1063                if cacheable(&hints, &self.config) {
1064                    let request = KeyRequest {
1065                        document_uri: state.document.uri().as_ref(),
1066                        task_name: task.name(),
1067                        inputs: &state.inputs,
1068                        command: &command,
1069                        requirements: requirements.as_ref(),
1070                        hints: hints.as_ref(),
1071                        container: &container(&requirements, self.config.task.container.as_deref()),
1072                        shell: self
1073                            .config
1074                            .task
1075                            .shell
1076                            .as_deref()
1077                            .unwrap_or(DEFAULT_TASK_SHELL),
1078                        backend_inputs: &backend_inputs,
1079                    };
1080
1081                    match cache.key(request).await {
1082                        Ok(key) => {
1083                            debug!(
1084                                task_id = id,
1085                                task_name = state.task.name(),
1086                                document = state.document.uri().as_str(),
1087                                "task cache key is `{key}`"
1088                            );
1089                            Some(key)
1090                        }
1091                        Err(e) => {
1092                            warn!(
1093                                task_id = id,
1094                                task_name = state.task.name(),
1095                                document = state.document.uri().as_str(),
1096                                "call caching disabled due to cache key calculation failure: {e:#}"
1097                            );
1098                            None
1099                        }
1100                    }
1101                } else {
1102                    // Task wasn't cacheable, explain why.
1103                    match self.config.task.cache {
1104                        CallCachingMode::Off => {
1105                            unreachable!("cache was used despite not being enabled")
1106                        }
1107                        CallCachingMode::On => debug!(
1108                            task_id = id,
1109                            task_name = state.task.name(),
1110                            document = state.document.uri().as_str(),
1111                            "task is not cacheable due to `cacheable` hint being set to `false`"
1112                        ),
1113                        CallCachingMode::Explicit => debug!(
1114                            task_id = id,
1115                            task_name = state.task.name(),
1116                            document = state.document.uri().as_str(),
1117                            "task is not cacheable due to `cacheable` hint not being explicitly \
1118                             set to `true`"
1119                        ),
1120                    }
1121
1122                    None
1123                }
1124            } else {
1125                None
1126            };
1127
1128            // Lookup the results from the cache
1129            cached = false;
1130            let result = if let Some(cache_key) = &key {
1131                match self
1132                    .cache
1133                    .as_ref()
1134                    .expect("should have cache")
1135                    .get(cache_key)
1136                    .await
1137                {
1138                    Ok(Some(results)) => {
1139                        info!(
1140                            task_id = id,
1141                            task_name = state.task.name(),
1142                            document = state.document.uri().as_str(),
1143                            "task execution was skipped due to previous result being present in \
1144                             the call cache"
1145                        );
1146
1147                        // Notify that we've reused a cached execution result.
1148                        cached = true;
1149                        if let Some(sender) = &self.events {
1150                            let _ = sender.send(EngineEvent::ReusedCachedExecutionResult {
1151                                id: id.to_string(),
1152                            });
1153                        }
1154
1155                        // We're serving the results from the call cache; no need to update, so set
1156                        // the key to `None`
1157                        key = None;
1158                        Some(results)
1159                    }
1160                    Ok(None) => {
1161                        debug!(
1162                            task_id = id,
1163                            task_name = state.task.name(),
1164                            document = state.document.uri().as_str(),
1165                            "call cache miss for key `{cache_key}`"
1166                        );
1167                        None
1168                    }
1169                    Err(e) => {
1170                        info!(
1171                            task_id = id,
1172                            task_name = state.task.name(),
1173                            document = state.document.uri().as_str(),
1174                            "ignoring call cache entry: {e:#}"
1175                        );
1176                        None
1177                    }
1178                }
1179            } else {
1180                None
1181            };
1182
1183            let result = match result {
1184                Some(result) => result,
1185                None => {
1186                    let mut attempt_dir = task_eval_root.clone();
1187                    attempt_dir.push("attempts");
1188                    attempt_dir.push(attempt.to_string());
1189                    let request = TaskSpawnRequest::new(
1190                        id.to_string(),
1191                        TaskSpawnInfo::new(
1192                            command,
1193                            backend_inputs,
1194                            requirements.clone(),
1195                            hints.clone(),
1196                            env.clone(),
1197                            self.transferer.clone(),
1198                        ),
1199                        attempt,
1200                        attempt_dir.clone(),
1201                        task_eval_root.clone(),
1202                        temp_dir.clone(),
1203                    );
1204
1205                    self.backend
1206                        .spawn(request, self.cancellation.token())
1207                        .with_context(|| {
1208                            format!(
1209                                "failed to spawn task `{name}` in `{path}` (task id `{id}`)",
1210                                name = task.name(),
1211                                path = document.path(),
1212                            )
1213                        })?
1214                        .await
1215                        .expect("failed to receive response from spawned task")
1216                        .map_err(|e| {
1217                            EvaluationError::new(
1218                                state.document.clone(),
1219                                task_execution_failed(e, task.name(), id, task.name_span()),
1220                            )
1221                        })?
1222                }
1223            };
1224
1225            // Update the task variable
1226            let evaluated = EvaluatedTask::new(cached, result);
1227            if version >= SupportedVersion::V1(V1::Two) {
1228                let task = state.scopes[TASK_SCOPE_INDEX.0]
1229                    .get_mut(TASK_VAR_NAME)
1230                    .expect("task variable should exist in scope for WDL v1.2+")
1231                    .as_task_post_evaluation_mut()
1232                    .expect("task should be a post evaluation task at this point");
1233
1234                task.set_attempt(attempt.try_into().with_context(|| {
1235                    format!(
1236                        "too many attempts were made to run task `{task}`",
1237                        task = state.task.name()
1238                    )
1239                })?);
1240                task.set_return_code(evaluated.result.exit_code);
1241            }
1242
1243            if let Err(e) = evaluated
1244                .handle_exit(&requirements, state.transferer().as_ref())
1245                .await
1246            {
1247                if attempt >= max_retries {
1248                    return Err(EvaluationError::new(
1249                        state.document.clone(),
1250                        task_execution_failed(e, task.name(), id, task.name_span()),
1251                    ));
1252                }
1253
1254                attempt += 1;
1255
1256                if let Some(task) = state.scopes[TASK_SCOPE_INDEX.0].names.get(TASK_VAR_NAME) {
1257                    // SAFETY: task variable should always be TaskPostEvaluation at this point
1258                    let task = task.as_task_post_evaluation().unwrap();
1259                    previous_task_data = Some(task.data().clone());
1260                }
1261
1262                info!(
1263                    "retrying execution of task `{name}` (retry {attempt})",
1264                    name = state.task.name()
1265                );
1266                continue;
1267            }
1268
1269            // Task execution succeeded; update the cache entry if we have a key
1270            if let Some(key) = key {
1271                match self
1272                    .cache
1273                    .as_ref()
1274                    .expect("should have cache")
1275                    .put(key, &evaluated.result)
1276                    .await
1277                {
1278                    Ok(key) => {
1279                        debug!(
1280                            task_id = id,
1281                            task_name = state.task.name(),
1282                            document = state.document.uri().as_str(),
1283                            "updated call cache entry for key `{key}`"
1284                        );
1285                    }
1286                    Err(e) => {
1287                        error!(
1288                            "failed to update call cache entry for task `{name}` (task id \
1289                             `{id}`): cache entry has been discard: {e:#}",
1290                            name = task.name()
1291                        );
1292                    }
1293                }
1294            }
1295
1296            break evaluated;
1297        };
1298
1299        // Perform backend cleanup before output evaluation
1300        if !cached
1301            && let Some(cleanup) = self
1302                .backend
1303                .cleanup(&evaluated.result.work_dir, self.cancellation.token())
1304        {
1305            cleanup.await;
1306        }
1307
1308        // Evaluate the remaining inputs (unused), and decls, and outputs
1309        for index in &nodes[current..] {
1310            match &graph[*index] {
1311                TaskGraphNode::Decl(decl) => {
1312                    state
1313                        .evaluate_decl(id, decl)
1314                        .await
1315                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1316                }
1317                TaskGraphNode::Output(decl) => {
1318                    state
1319                        .evaluate_output(id, decl, &evaluated)
1320                        .await
1321                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1322                }
1323                _ => {
1324                    unreachable!(
1325                        "only declarations and outputs should be evaluated after the command"
1326                    )
1327                }
1328            }
1329        }
1330
1331        // Take the output scope and return it in declaration sort order
1332        let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
1333        if let Some(section) = definition.output() {
1334            let indexes: HashMap<_, _> = section
1335                .declarations()
1336                .enumerate()
1337                .map(|(i, d)| (d.name().hashable(), i))
1338                .collect();
1339            outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
1340        }
1341
1342        // Write the outputs to the task's root directory
1343        write_json_file(task_eval_root.join(OUTPUTS_FILE), &outputs)?;
1344
1345        evaluated.outputs = Ok(outputs);
1346        Ok(evaluated)
1347    }
1348}
1349
1350impl<'a> State<'a> {
1351    /// Evaluates a task input.
1352    async fn evaluate_input(
1353        &mut self,
1354        id: &str,
1355        decl: &Decl<SyntaxNode>,
1356        inputs: &TaskInputs,
1357    ) -> Result<(), Diagnostic> {
1358        let name = decl.name();
1359        let decl_ty = decl.ty();
1360        let expected_ty = crate::convert_ast_type_v1(self.document, &decl_ty)?;
1361
1362        // Evaluate the input if not provided one
1363        let (value, span) = match inputs.get(name.text()) {
1364            Some(input) => {
1365                // For WDL 1.2 evaluation, a `None` value when the expected type is non-optional
1366                // will invoke the default expression
1367                if input.is_none()
1368                    && !expected_ty.is_optional()
1369                    && self
1370                        .document
1371                        .version()
1372                        .map(|v| v >= SupportedVersion::V1(V1::Two))
1373                        .unwrap_or(false)
1374                    && let Some(expr) = decl.expr()
1375                {
1376                    debug!(
1377                        task_id = id,
1378                        task_name = self.task.name(),
1379                        document = self.document.uri().as_str(),
1380                        input_name = name.text(),
1381                        "evaluating input default expression"
1382                    );
1383
1384                    let mut evaluator =
1385                        ExprEvaluator::new(TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX));
1386                    (evaluator.evaluate_expr(&expr).await?, expr.span())
1387                } else {
1388                    (input.clone(), name.span())
1389                }
1390            }
1391            None => match decl.expr() {
1392                Some(expr) => {
1393                    debug!(
1394                        task_id = id,
1395                        task_name = self.task.name(),
1396                        document = self.document.uri().as_str(),
1397                        input_name = name.text(),
1398                        "evaluating input default expression"
1399                    );
1400
1401                    let mut evaluator =
1402                        ExprEvaluator::new(TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX));
1403                    (evaluator.evaluate_expr(&expr).await?, expr.span())
1404                }
1405                _ => {
1406                    assert!(expected_ty.is_optional(), "type should be optional");
1407                    (Value::new_none(expected_ty.clone()), name.span())
1408                }
1409            },
1410        };
1411
1412        // Coerce the value to the expected type
1413        let mut value = value
1414            .coerce(
1415                Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)),
1416                &expected_ty,
1417            )
1418            .map_err(|e| runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), span))?;
1419
1420        // Add any file or directory backend inputs
1421        self.add_backend_inputs(
1422            decl_ty.is_optional(),
1423            &mut value,
1424            self.transferer().clone(),
1425            self.top_level.backend.needs_local_inputs(),
1426        )
1427        .await
1428        .map_err(|e| {
1429            decl_evaluation_failed(
1430                e,
1431                self.task.name(),
1432                true,
1433                name.text(),
1434                Some(Io::Input),
1435                name.span(),
1436            )
1437        })?;
1438
1439        // Insert the name into the scope
1440        self.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1441        self.inputs.insert(name.text().to_string(), value.clone());
1442
1443        // Insert an environment variable, if it is one
1444        if decl.env().is_some() {
1445            let value = value
1446                .as_primitive()
1447                .expect("value should be primitive")
1448                .raw(Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)))
1449                .to_string();
1450            self.env.insert(name.text().to_string(), value);
1451        }
1452
1453        Ok(())
1454    }
1455
1456    /// Evaluates a task private declaration.
1457    async fn evaluate_decl(&mut self, id: &str, decl: &Decl<SyntaxNode>) -> Result<(), Diagnostic> {
1458        let name = decl.name();
1459        debug!(
1460            task_id = id,
1461            task_name = self.task.name(),
1462            document = self.document.uri().as_str(),
1463            decl_name = name.text(),
1464            "evaluating private declaration",
1465        );
1466
1467        let decl_ty = decl.ty();
1468        let ty = crate::convert_ast_type_v1(self.document, &decl_ty)?;
1469
1470        let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX));
1471
1472        let expr = decl.expr().expect("private decls should have expressions");
1473        let value = evaluator.evaluate_expr(&expr).await?;
1474        let mut value = value
1475            .coerce(
1476                Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)),
1477                &ty,
1478            )
1479            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1480
1481        // Add any file or directory backend inputs
1482        self.add_backend_inputs(
1483            decl_ty.is_optional(),
1484            &mut value,
1485            self.transferer().clone(),
1486            self.top_level.backend.needs_local_inputs(),
1487        )
1488        .await
1489        .map_err(|e| {
1490            decl_evaluation_failed(e, self.task.name(), true, name.text(), None, name.span())
1491        })?;
1492
1493        self.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1494
1495        // Insert an environment variable, if it is one
1496        if decl.env().is_some() {
1497            let value = value
1498                .as_primitive()
1499                .expect("value should be primitive")
1500                .raw(Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)))
1501                .to_string();
1502            self.env.insert(name.text().to_string(), value);
1503        }
1504
1505        Ok(())
1506    }
1507
1508    /// Evaluates the runtime section.
1509    ///
1510    /// Returns both the task's hints and requirements.
1511    async fn evaluate_runtime_section(
1512        &mut self,
1513        id: &str,
1514        section: &RuntimeSection<SyntaxNode>,
1515        inputs: &TaskInputs,
1516    ) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
1517        debug!(
1518            task_id = id,
1519            task_name = self.task.name(),
1520            document = self.document.uri().as_str(),
1521            "evaluating runtimes section",
1522        );
1523
1524        let mut requirements = HashMap::new();
1525        let mut hints = HashMap::new();
1526
1527        let version = self
1528            .document
1529            .version()
1530            .expect("document should have version");
1531
1532        // In WDL 1.3+, use `TASK_SCOPE_INDEX` to access the `task` variable.
1533        let scope_index = if version >= SupportedVersion::V1(V1::Three) {
1534            TASK_SCOPE_INDEX
1535        } else {
1536            ROOT_SCOPE_INDEX
1537        };
1538
1539        for item in section.items() {
1540            let name = item.name();
1541            match inputs.requirement(name.text()) {
1542                Some(value) => {
1543                    requirements.insert(name.text().to_string(), value.clone());
1544                    continue;
1545                }
1546                _ => {
1547                    if let Some(value) = inputs.hint(name.text()) {
1548                        hints.insert(name.text().to_string(), value.clone());
1549                        continue;
1550                    }
1551                }
1552            }
1553
1554            let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(self, scope_index));
1555
1556            let (types, requirement) = match task_requirement_types(version, name.text()) {
1557                Some(types) => (Some(types), true),
1558                None => match task_hint_types(version, name.text(), false) {
1559                    Some(types) => (Some(types), false),
1560                    None => (None, false),
1561                },
1562            };
1563
1564            // Evaluate and coerce to the expected type
1565            let expr = item.expr();
1566            let mut value = evaluator.evaluate_expr(&expr).await?;
1567            if let Some(types) = types {
1568                value = types
1569                    .iter()
1570                    .find_map(|ty| {
1571                        value
1572                            .coerce(Some(&TaskEvaluationContext::new(self, scope_index)), ty)
1573                            .ok()
1574                    })
1575                    .ok_or_else(|| {
1576                        multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1577                    })?;
1578            }
1579
1580            if requirement {
1581                requirements.insert(name.text().to_string(), value);
1582            } else {
1583                hints.insert(name.text().to_string(), value);
1584            }
1585        }
1586
1587        Ok((requirements, hints))
1588    }
1589
1590    /// Evaluates the requirements section.
1591    async fn evaluate_requirements_section(
1592        &mut self,
1593        id: &str,
1594        section: &RequirementsSection<SyntaxNode>,
1595        inputs: &TaskInputs,
1596    ) -> Result<HashMap<String, Value>, Diagnostic> {
1597        debug!(
1598            task_id = id,
1599            task_name = self.task.name(),
1600            document = self.document.uri().as_str(),
1601            "evaluating requirements",
1602        );
1603
1604        let mut requirements = HashMap::new();
1605
1606        let version = self
1607            .document
1608            .version()
1609            .expect("document should have version");
1610
1611        // In WDL 1.3+, use `TASK_SCOPE_INDEX` to access the `task` variable.
1612        let scope_index = if version >= SupportedVersion::V1(V1::Three) {
1613            TASK_SCOPE_INDEX
1614        } else {
1615            ROOT_SCOPE_INDEX
1616        };
1617
1618        for item in section.items() {
1619            let name = item.name();
1620            if let Some(value) = inputs.requirement(name.text()) {
1621                requirements.insert(name.text().to_string(), value.clone());
1622                continue;
1623            }
1624
1625            let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(self, scope_index));
1626
1627            let types =
1628                task_requirement_types(version, name.text()).expect("requirement should be known");
1629
1630            // Evaluate and coerce to the expected type
1631            let expr = item.expr();
1632            let value = evaluator.evaluate_expr(&expr).await?;
1633            let value = types
1634                .iter()
1635                .find_map(|ty| {
1636                    value
1637                        .coerce(Some(&TaskEvaluationContext::new(self, scope_index)), ty)
1638                        .ok()
1639                })
1640                .ok_or_else(|| {
1641                    multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1642                })?;
1643
1644            requirements.insert(name.text().to_string(), value);
1645        }
1646
1647        Ok(requirements)
1648    }
1649
1650    /// Evaluates the hints section.
1651    async fn evaluate_hints_section(
1652        &mut self,
1653        id: &str,
1654        section: &TaskHintsSection<SyntaxNode>,
1655        inputs: &TaskInputs,
1656    ) -> Result<HashMap<String, Value>, Diagnostic> {
1657        debug!(
1658            task_id = id,
1659            task_name = self.task.name(),
1660            document = self.document.uri().as_str(),
1661            "evaluating hints section",
1662        );
1663
1664        let mut hints = HashMap::new();
1665
1666        let version = self
1667            .document
1668            .version()
1669            .expect("document should have version");
1670
1671        // In WDL 1.3+, use `TASK_SCOPE_INDEX` to access task.attempt and task.previous
1672        let scope_index = if version >= SupportedVersion::V1(V1::Three) {
1673            TASK_SCOPE_INDEX
1674        } else {
1675            ROOT_SCOPE_INDEX
1676        };
1677
1678        for item in section.items() {
1679            let name = item.name();
1680            if let Some(value) = inputs.hint(name.text()) {
1681                hints.insert(name.text().to_string(), value.clone());
1682                continue;
1683            }
1684
1685            let mut evaluator =
1686                ExprEvaluator::new(TaskEvaluationContext::new(self, scope_index).with_task());
1687
1688            let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
1689            hints.insert(name.text().to_string(), value);
1690        }
1691
1692        Ok(hints)
1693    }
1694
1695    /// Evaluates the command of a task.
1696    ///
1697    /// Returns the evaluated command as a string.
1698    async fn evaluate_command(
1699        &mut self,
1700        id: &str,
1701        section: &CommandSection<SyntaxNode>,
1702    ) -> EvaluationResult<String> {
1703        debug!(
1704            task_id = id,
1705            task_name = self.task.name(),
1706            document = self.document.uri().as_str(),
1707            "evaluating command section",
1708        );
1709
1710        let document = self.document.clone();
1711        let mut command = String::new();
1712        match section.strip_whitespace() {
1713            Some(parts) => {
1714                let mut evaluator =
1715                    ExprEvaluator::new(TaskEvaluationContext::new(self, TASK_SCOPE_INDEX));
1716
1717                for part in parts {
1718                    match part {
1719                        StrippedCommandPart::Text(t) => {
1720                            command.push_str(t.as_str());
1721                        }
1722                        StrippedCommandPart::Placeholder(placeholder) => {
1723                            evaluator
1724                                .evaluate_placeholder(&placeholder, &mut command)
1725                                .await
1726                                .map_err(|d| EvaluationError::new(document.clone(), d))?;
1727                        }
1728                    }
1729                }
1730            }
1731            _ => {
1732                warn!(
1733                    "command for task `{task}` in `{uri}` has mixed indentation; whitespace \
1734                     stripping was skipped",
1735                    task = self.task.name(),
1736                    uri = self.document.uri(),
1737                );
1738
1739                let mut evaluator =
1740                    ExprEvaluator::new(TaskEvaluationContext::new(self, TASK_SCOPE_INDEX));
1741
1742                let heredoc = section.is_heredoc();
1743                for part in section.parts() {
1744                    match part {
1745                        CommandPart::Text(t) => {
1746                            t.unescape_to(heredoc, &mut command);
1747                        }
1748                        CommandPart::Placeholder(placeholder) => {
1749                            evaluator
1750                                .evaluate_placeholder(&placeholder, &mut command)
1751                                .await
1752                                .map_err(|d| EvaluationError::new(document.clone(), d))?;
1753                        }
1754                    }
1755                }
1756            }
1757        }
1758
1759        Ok(command)
1760    }
1761
1762    /// Evaluates sections prior to spawning the command.
1763    ///
1764    /// This method evaluates the following sections:
1765    ///   * runtime
1766    ///   * requirements
1767    ///   * hints
1768    ///   * command
1769    async fn evaluate_sections(
1770        &mut self,
1771        id: &str,
1772        definition: &TaskDefinition<SyntaxNode>,
1773        inputs: &TaskInputs,
1774        attempt: u64,
1775        previous_task_data: Option<Arc<TaskPostEvaluationData>>,
1776    ) -> EvaluationResult<EvaluatedSections> {
1777        let version = self.document.version();
1778
1779        // Extract task metadata once to avoid walking the AST multiple times
1780        let task_meta = definition
1781            .metadata()
1782            .map(|s| Object::from_v1_metadata(s.items()))
1783            .unwrap_or_else(Object::empty);
1784        let task_parameter_meta = definition
1785            .parameter_metadata()
1786            .map(|s| Object::from_v1_metadata(s.items()))
1787            .unwrap_or_else(Object::empty);
1788        // Note: Sprocket does not currently support workflow-level extension metadata,
1789        // so `ext` is always an empty object.
1790        let task_ext = Object::empty();
1791
1792        // In WDL 1.3+, insert a [`TaskPreEvaluation`] before evaluating the
1793        // requirements/hints/runtime section.
1794        if version >= Some(SupportedVersion::V1(V1::Three)) {
1795            let mut task = TaskPreEvaluationValue::new(
1796                self.task.name(),
1797                id,
1798                attempt.try_into().expect("attempt should fit in i64"),
1799                task_meta.clone(),
1800                task_parameter_meta.clone(),
1801                task_ext.clone(),
1802            );
1803
1804            if let Some(prev_data) = &previous_task_data {
1805                task.set_previous(prev_data.clone());
1806            }
1807
1808            let scope = &mut self.scopes[TASK_SCOPE_INDEX.0];
1809            if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1810                *v = HiddenValue::TaskPreEvaluation(task).into();
1811            } else {
1812                scope.insert(TASK_VAR_NAME, HiddenValue::TaskPreEvaluation(task));
1813            }
1814        }
1815
1816        // Evaluate requirements and hints
1817        let (requirements, hints) = match definition.runtime() {
1818            Some(section) => self
1819                .evaluate_runtime_section(id, &section, inputs)
1820                .await
1821                .map_err(|d| EvaluationError::new(self.document.clone(), d))?,
1822            _ => (
1823                match definition.requirements() {
1824                    Some(section) => self
1825                        .evaluate_requirements_section(id, &section, inputs)
1826                        .await
1827                        .map_err(|d| EvaluationError::new(self.document.clone(), d))?,
1828                    None => Default::default(),
1829                },
1830                match definition.hints() {
1831                    Some(section) => self
1832                        .evaluate_hints_section(id, &section, inputs)
1833                        .await
1834                        .map_err(|d| EvaluationError::new(self.document.clone(), d))?,
1835                    None => Default::default(),
1836                },
1837            ),
1838        };
1839
1840        // Now that those are evaluated, insert a [`TaskPostEvaluation`] for
1841        // `task` which includes those calculated requirements before the
1842        // command/output sections are evaluated.
1843        if version >= Some(SupportedVersion::V1(V1::Two)) {
1844            // Get the execution constraints
1845            let constraints = self
1846                .top_level
1847                .backend
1848                .constraints(&requirements, &hints)
1849                .with_context(|| {
1850                    format!(
1851                        "failed to get constraints for task `{task}`",
1852                        task = self.task.name()
1853                    )
1854                })?;
1855
1856            let max_retries = max_retries(&requirements, &self.top_level.config);
1857
1858            let mut task = TaskPostEvaluationValue::new(
1859                self.task.name(),
1860                id,
1861                constraints,
1862                max_retries.try_into().with_context(|| {
1863                    format!(
1864                        "the number of max retries is too large to run task `{task}`",
1865                        task = self.task.name()
1866                    )
1867                })?,
1868                attempt.try_into().with_context(|| {
1869                    format!(
1870                        "too many attempts were made to run task `{task}`",
1871                        task = self.task.name()
1872                    )
1873                })?,
1874                task_meta,
1875                task_parameter_meta,
1876                task_ext,
1877            );
1878
1879            // In WDL 1.3+, insert the previous requirements.
1880            if let Some(version) = version
1881                && version >= SupportedVersion::V1(V1::Three)
1882                && let Some(prev_data) = &previous_task_data
1883            {
1884                task.set_previous(prev_data.clone());
1885            }
1886
1887            let scope = &mut self.scopes[TASK_SCOPE_INDEX.0];
1888            if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1889                *v = HiddenValue::TaskPostEvaluation(task).into();
1890            } else {
1891                scope.insert(TASK_VAR_NAME, HiddenValue::TaskPostEvaluation(task));
1892            }
1893        }
1894
1895        let command = self
1896            .evaluate_command(
1897                id,
1898                &definition.command().expect("must have command section"),
1899            )
1900            .await?;
1901
1902        Ok(EvaluatedSections {
1903            command,
1904            requirements: Arc::new(requirements),
1905            hints: Arc::new(hints),
1906        })
1907    }
1908
1909    /// Evaluates a task output.
1910    async fn evaluate_output(
1911        &mut self,
1912        id: &str,
1913        decl: &Decl<SyntaxNode>,
1914        evaluated: &EvaluatedTask,
1915    ) -> Result<(), Diagnostic> {
1916        let name = decl.name();
1917        debug!(
1918            task_id = id,
1919            task_name = self.task.name(),
1920            document = self.document.uri().as_str(),
1921            output_name = name.text(),
1922            "evaluating output",
1923        );
1924
1925        let decl_ty = decl.ty();
1926        let ty = crate::convert_ast_type_v1(self.document, &decl_ty)?;
1927        let mut evaluator = ExprEvaluator::new(
1928            TaskEvaluationContext::new(self, TASK_SCOPE_INDEX)
1929                .with_work_dir(&evaluated.result.work_dir)
1930                .with_stdout(&evaluated.result.stdout)
1931                .with_stderr(&evaluated.result.stderr),
1932        );
1933
1934        let expr = decl.expr().expect("outputs should have expressions");
1935        let value = evaluator.evaluate_expr(&expr).await?;
1936
1937        // Coerce the output value to the expected type
1938        let mut value = value
1939            .coerce(Some(evaluator.context()), &ty)
1940            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1941        value = value
1942            .resolve_paths(
1943                ty.is_optional(),
1944                self.base_dir.as_local(),
1945                Some(self.transferer().as_ref()),
1946                &|path| {
1947                    // Join the path with the work directory.
1948                    let mut output_path = evaluated.result.work_dir.join(path.as_str())?;
1949
1950                    // Ensure the output's path is valid
1951                    let output_path = match (&mut output_path, &evaluated.result.work_dir) {
1952                        (EvaluationPath::Local(joined), EvaluationPath::Local(base))
1953                            if joined.starts_with(base)
1954                                || joined == evaluated.stdout().as_file().unwrap().as_str()
1955                                || joined == evaluated.stderr().as_file().unwrap().as_str() =>
1956                        {
1957                            // The joined path is contained within the work directory or is
1958                            // stdout/stderr
1959                            HostPath::new(String::try_from(output_path)?)
1960                        }
1961                        (EvaluationPath::Local(_), EvaluationPath::Local(_)) => {
1962                            // The joined path is not within the work or attempt directory;
1963                            // therefore, it is required to be an input
1964                            self.path_map
1965                                .get_by_left(path)
1966                                .ok_or_else(|| {
1967                                    anyhow!(
1968                                        "guest path `{path}` is not an input or within the task's \
1969                                         working directory"
1970                                    )
1971                                })?
1972                                .0
1973                                .clone()
1974                                .into()
1975                        }
1976                        (EvaluationPath::Local(_), EvaluationPath::Remote(_)) => {
1977                            // Path is local (and absolute) and the working directory is remote
1978                            bail!(
1979                                "cannot access guest path `{path}` from a remotely executing task"
1980                            )
1981                        }
1982                        (EvaluationPath::Remote(_), _) => {
1983                            HostPath::new(String::try_from(output_path)?)
1984                        }
1985                    };
1986
1987                    Ok(output_path)
1988                },
1989            )
1990            .await
1991            .map_err(|e| {
1992                decl_evaluation_failed(
1993                    e,
1994                    self.task.name(),
1995                    true,
1996                    name.text(),
1997                    Some(Io::Output),
1998                    name.span(),
1999                )
2000            })?;
2001
2002        self.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
2003        Ok(())
2004    }
2005
2006    /// Localizes inputs for execution.
2007    ///
2008    /// Returns the inputs to pass to the backend.
2009    async fn localize_inputs(&mut self, task_id: &str) -> EvaluationResult<Vec<Input>> {
2010        // If the backend needs local inputs, download them now
2011        if self.top_level.backend.needs_local_inputs() {
2012            let mut downloads = JoinSet::new();
2013
2014            // Download any necessary files
2015            for (idx, input) in self.backend_inputs.as_slice_mut().iter_mut().enumerate() {
2016                if input.local_path().is_some() {
2017                    continue;
2018                }
2019
2020                if let EvaluationPath::Remote(url) = input.path() {
2021                    let transferer = self.top_level.transferer.clone();
2022                    let url = url.clone();
2023                    downloads.spawn(async move {
2024                        transferer
2025                            .download(&url)
2026                            .await
2027                            .map(|l| (idx, l))
2028                            .with_context(|| anyhow!("failed to localize `{url}`"))
2029                    });
2030                }
2031            }
2032
2033            // Wait for the downloads to complete
2034            while let Some(result) = downloads.join_next().await {
2035                match result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}"))) {
2036                    Ok((idx, location)) => {
2037                        self.backend_inputs.as_slice_mut()[idx].set_location(location);
2038                    }
2039                    Err(e) => {
2040                        return Err(EvaluationError::new(
2041                            self.document.clone(),
2042                            task_localization_failed(e, self.task.name(), self.task.name_span()),
2043                        ));
2044                    }
2045                }
2046            }
2047        }
2048
2049        if enabled!(Level::DEBUG) {
2050            for input in self.backend_inputs.as_slice() {
2051                match (
2052                    input.path().as_local().is_some(),
2053                    input.local_path(),
2054                    input.guest_path(),
2055                ) {
2056                    // Input is unmapped and either local or remote and not downloaded
2057                    (true, _, None) | (false, None, None) => {}
2058                    // Input is local and was mapped to a guest path
2059                    (true, _, Some(guest_path)) => {
2060                        debug!(
2061                            task_id,
2062                            task_name = self.task.name(),
2063                            document = self.document.uri().as_str(),
2064                            "task input `{path}` mapped to `{guest_path}`",
2065                            path = input.path().display(),
2066                        );
2067                    }
2068                    // Input is remote and was downloaded to a local path
2069                    (false, Some(local_path), None) => {
2070                        debug!(
2071                            task_id,
2072                            task_name = self.task.name(),
2073                            document = self.document.uri().as_str(),
2074                            "task input `{path}` downloaded to `{local_path}`",
2075                            path = input.path().display(),
2076                            local_path = local_path.display()
2077                        );
2078                    }
2079                    // Input is remote and was not downloaded, but mapped to a guest path
2080                    (false, None, Some(guest_path)) => {
2081                        debug!(
2082                            task_id,
2083                            task_name = self.task.name(),
2084                            document = self.document.uri().as_str(),
2085                            "task input `{path}` mapped to `{guest_path}`",
2086                            path = input.path().display(),
2087                        );
2088                    }
2089                    // Input is remote and was both downloaded and mapped to a guest path
2090                    (false, Some(local_path), Some(guest_path)) => {
2091                        debug!(
2092                            task_id,
2093                            task_name = self.task.name(),
2094                            document = self.document.uri().as_str(),
2095                            "task input `{path}` downloaded to `{local_path}` and mapped to \
2096                             `{guest_path}`",
2097                            path = input.path().display(),
2098                            local_path = local_path.display(),
2099                        );
2100                    }
2101                }
2102            }
2103        }
2104
2105        Ok(self.backend_inputs.as_slice().into())
2106    }
2107}
2108
2109#[cfg(test)]
2110mod test {
2111    use std::fs;
2112    use std::path::Path;
2113
2114    use pretty_assertions::assert_eq;
2115    use tempfile::tempdir;
2116    use tracing_test::traced_test;
2117    use wdl_analysis::Analyzer;
2118    use wdl_analysis::Config as AnalysisConfig;
2119    use wdl_analysis::DiagnosticsConfig;
2120
2121    use crate::CancellationContext;
2122    use crate::EvaluatedTask;
2123    use crate::Events;
2124    use crate::TaskInputs;
2125    use crate::config::BackendConfig;
2126    use crate::config::CallCachingMode;
2127    use crate::config::Config;
2128    use crate::v1::TopLevelEvaluator;
2129
2130    /// Helper for evaluating a simple task with the given call cache mode.
2131    async fn evaluate_task(mode: CallCachingMode, root_dir: &Path, source: &str) -> EvaluatedTask {
2132        fs::write(root_dir.join("source.wdl"), source).expect("failed to write WDL source file");
2133
2134        // Analyze the source file
2135        let analyzer = Analyzer::new(
2136            AnalysisConfig::default().with_diagnostics_config(DiagnosticsConfig::except_all()),
2137            |(), _, _, _| async {},
2138        );
2139        analyzer
2140            .add_directory(root_dir)
2141            .await
2142            .expect("failed to add directory");
2143        let results = analyzer
2144            .analyze(())
2145            .await
2146            .expect("failed to analyze document");
2147        assert_eq!(results.len(), 1, "expected only one result");
2148
2149        let document = results.first().expect("should have result").document();
2150
2151        let mut config = Config::default();
2152        config.task.cache = mode;
2153        config.task.cache_dir = Some(root_dir.join("cache"));
2154        config
2155            .backends
2156            .insert("default".into(), BackendConfig::Local(Default::default()));
2157
2158        let evaluator = TopLevelEvaluator::new(
2159            &root_dir.join("runs"),
2160            config,
2161            CancellationContext::default(),
2162            Events::disabled(),
2163        )
2164        .await
2165        .unwrap();
2166
2167        let runs_dir = root_dir.join("runs");
2168        evaluator
2169            .evaluate_task(
2170                document,
2171                document.task_by_name("test").expect("should have task"),
2172                &TaskInputs::default(),
2173                &runs_dir,
2174            )
2175            .await
2176            .unwrap()
2177    }
2178
2179    /// Tests task evaluation when call caching is disabled.
2180    #[tokio::test]
2181    #[traced_test]
2182    async fn cache_off() {
2183        const SOURCE: &str = r#"
2184version 1.2
2185
2186task test {
2187    input {
2188        String name = "friend"
2189    }
2190
2191    command <<<echo "hello, ~{name}!">>>
2192
2193    output {
2194        String message = read_string(stdout())
2195    }
2196}
2197"#;
2198
2199        let root_dir = tempdir().expect("failed to create temporary directory");
2200        let evaluated = evaluate_task(CallCachingMode::Off, root_dir.path(), SOURCE).await;
2201        assert!(!evaluated.cached());
2202        assert_eq!(evaluated.exit_code(), 0);
2203        assert_eq!(
2204            fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2205                .unwrap()
2206                .trim(),
2207            "hello, friend!"
2208        );
2209        assert_eq!(
2210            fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2211            ""
2212        );
2213        assert!(
2214            logs_contain("call caching is disabled"),
2215            "expected cache to be off"
2216        );
2217    }
2218
2219    /// Tests task evaluation when call caching is enabled.
2220    #[tokio::test]
2221    #[traced_test]
2222    async fn cache_on() {
2223        const SOURCE: &str = r#"
2224version 1.2
2225
2226task test {
2227    input {
2228        String name = "friend"
2229    }
2230
2231    command <<<echo "hello, ~{name}!">>>
2232
2233    output {
2234        String message = read_string(stdout())
2235    }
2236}
2237"#;
2238
2239        let root_dir = tempdir().expect("failed to create temporary directory");
2240        let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
2241        assert!(!evaluated.cached());
2242        assert_eq!(evaluated.exit_code(), 0);
2243        assert_eq!(
2244            fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2245                .unwrap()
2246                .trim(),
2247            "hello, friend!"
2248        );
2249        assert_eq!(
2250            fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2251            ""
2252        );
2253        assert!(logs_contain("using call cache"), "expected cache to be on");
2254        assert!(
2255            logs_contain("call cache miss"),
2256            "expected first run to miss the cache"
2257        );
2258        assert!(logs_contain("spawning task"), "expected the task to spawn");
2259
2260        let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
2261        assert!(evaluated.cached());
2262        assert_eq!(evaluated.exit_code(), 0);
2263        assert_eq!(
2264            fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2265                .unwrap()
2266                .trim(),
2267            "hello, friend!"
2268        );
2269        assert_eq!(
2270            fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2271            ""
2272        );
2273        assert!(
2274            logs_contain("task execution was skipped"),
2275            "expected second run to skip execution"
2276        );
2277    }
2278
2279    /// Tests task evaluation when call caching is enabled, but the task is not
2280    /// cacheable.
2281    #[tokio::test]
2282    #[traced_test]
2283    async fn cache_on_not_cacheable() {
2284        const SOURCE: &str = r#"
2285version 1.2
2286
2287task test {
2288    input {
2289        String name = "friend"
2290    }
2291
2292    command <<<echo "hello, ~{name}!">>>
2293
2294    hints {
2295        cacheable: false
2296    }
2297
2298    output {
2299        String message = read_string(stdout())
2300    }
2301}
2302"#;
2303
2304        let root_dir = tempdir().expect("failed to create temporary directory");
2305        let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
2306        assert!(!evaluated.cached());
2307        assert_eq!(evaluated.exit_code(), 0);
2308        assert_eq!(
2309            fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2310                .unwrap()
2311                .trim(),
2312            "hello, friend!"
2313        );
2314        assert_eq!(
2315            fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2316            ""
2317        );
2318        assert!(logs_contain("using call cache"), "expected cache to be on");
2319        assert!(
2320            logs_contain("task is not cacheable due to `cacheable` hint being set to `false`"),
2321            "expected task to not be cacheable"
2322        );
2323
2324        let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
2325        assert!(!evaluated.cached());
2326        assert_eq!(evaluated.exit_code(), 0);
2327        assert_eq!(
2328            fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2329                .unwrap()
2330                .trim(),
2331            "hello, friend!"
2332        );
2333        assert_eq!(
2334            fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2335            ""
2336        );
2337        assert!(
2338            !logs_contain("task execution was skipped"),
2339            "expected second run to not skip execution"
2340        );
2341    }
2342
2343    /// Tests task evaluation when call caching is enabled in explicit mode and
2344    /// the task is not explicitly marked cacheable.
2345    #[tokio::test]
2346    #[traced_test]
2347    async fn cache_explicit() {
2348        const SOURCE: &str = r#"
2349version 1.2
2350
2351task test {
2352    input {
2353        String name = "friend"
2354    }
2355
2356    command <<<echo "hello, ~{name}!">>>
2357
2358    output {
2359        String message = read_string(stdout())
2360    }
2361}
2362"#;
2363
2364        let root_dir = tempdir().expect("failed to create temporary directory");
2365        let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
2366        assert!(!evaluated.cached());
2367        assert_eq!(evaluated.exit_code(), 0);
2368        assert_eq!(
2369            fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2370                .unwrap()
2371                .trim(),
2372            "hello, friend!"
2373        );
2374        assert_eq!(
2375            fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2376            ""
2377        );
2378        assert!(logs_contain("using call cache"), "expected cache to be on");
2379        assert!(
2380            logs_contain(
2381                "task is not cacheable due to `cacheable` hint not being explicitly set to `true`"
2382            ),
2383            "expected task to not be cacheable"
2384        );
2385
2386        let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
2387        assert!(!evaluated.cached());
2388        assert_eq!(evaluated.exit_code(), 0);
2389        assert_eq!(
2390            fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2391                .unwrap()
2392                .trim(),
2393            "hello, friend!"
2394        );
2395        assert_eq!(
2396            fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2397            ""
2398        );
2399        assert!(
2400            !logs_contain("task execution was skipped"),
2401            "expected second run to not skip execution"
2402        );
2403    }
2404
2405    /// Tests task evaluation when call caching is enabled in explicit mode and
2406    /// the task is explicitly marked cacheable.
2407    #[tokio::test]
2408    #[traced_test]
2409    async fn cache_explicit_cacheable() {
2410        const SOURCE: &str = r#"
2411version 1.2
2412
2413task test {
2414    input {
2415        String name = "friend"
2416    }
2417
2418    command <<<echo "hello, ~{name}!">>>
2419
2420    hints {
2421        cacheable: true
2422    }
2423
2424    output {
2425        String message = read_string(stdout())
2426    }
2427}
2428"#;
2429
2430        let root_dir = tempdir().expect("failed to create temporary directory");
2431        let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
2432        assert!(!evaluated.cached());
2433        assert_eq!(evaluated.exit_code(), 0);
2434        assert_eq!(
2435            fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2436                .unwrap()
2437                .trim(),
2438            "hello, friend!"
2439        );
2440        assert_eq!(
2441            fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2442            ""
2443        );
2444        assert!(logs_contain("using call cache"), "expected cache to be on");
2445        assert!(
2446            logs_contain("call cache miss"),
2447            "expected first run to miss the cache"
2448        );
2449        assert!(logs_contain("spawning task"), "expected the task to spawn");
2450
2451        let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
2452        assert!(evaluated.cached());
2453        assert_eq!(evaluated.exit_code(), 0);
2454        assert_eq!(
2455            fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2456                .unwrap()
2457                .trim(),
2458            "hello, friend!"
2459        );
2460        assert_eq!(
2461            fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2462            ""
2463        );
2464        assert!(
2465            logs_contain("task execution was skipped"),
2466            "expected second run to skip execution"
2467        );
2468    }
2469}