wdl_engine/eval/v1/
task.rs

1//! Implementation of evaluation for V1 tasks.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::future::Future;
7use std::mem;
8use std::path::Path;
9use std::path::absolute;
10use std::sync::Arc;
11
12use anyhow::Context;
13use anyhow::Result;
14use anyhow::anyhow;
15use indexmap::IndexMap;
16use petgraph::algo::toposort;
17use tokio_util::sync::CancellationToken;
18use tracing::Level;
19use tracing::debug;
20use tracing::enabled;
21use tracing::info;
22use tracing::warn;
23use wdl_analysis::Document;
24use wdl_analysis::diagnostics::multiple_type_mismatch;
25use wdl_analysis::diagnostics::unknown_name;
26use wdl_analysis::document::TASK_VAR_NAME;
27use wdl_analysis::document::Task;
28use wdl_analysis::eval::v1::TaskGraphBuilder;
29use wdl_analysis::eval::v1::TaskGraphNode;
30use wdl_analysis::types::Optional;
31use wdl_analysis::types::PrimitiveType;
32use wdl_analysis::types::Type;
33use wdl_analysis::types::v1::task_hint_types;
34use wdl_analysis::types::v1::task_requirement_types;
35use wdl_ast::Ast;
36use wdl_ast::AstNode;
37use wdl_ast::AstToken;
38use wdl_ast::Diagnostic;
39use wdl_ast::Span;
40use wdl_ast::SupportedVersion;
41use wdl_ast::v1::CommandPart;
42use wdl_ast::v1::CommandSection;
43use wdl_ast::v1::Decl;
44use wdl_ast::v1::RequirementsSection;
45use wdl_ast::v1::RuntimeSection;
46use wdl_ast::v1::StrippedCommandPart;
47use wdl_ast::v1::TASK_HINT_MAX_CPU;
48use wdl_ast::v1::TASK_HINT_MAX_CPU_ALIAS;
49use wdl_ast::v1::TASK_HINT_MAX_MEMORY;
50use wdl_ast::v1::TASK_HINT_MAX_MEMORY_ALIAS;
51use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER;
52use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER_ALIAS;
53use wdl_ast::v1::TASK_REQUIREMENT_CPU;
54use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES;
55use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES_ALIAS;
56use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
57use wdl_ast::v1::TaskDefinition;
58use wdl_ast::v1::TaskHintsSection;
59use wdl_ast::version::V1;
60
61use super::ProgressKind;
62use crate::Coercible;
63use crate::EvaluationContext;
64use crate::EvaluationError;
65use crate::EvaluationResult;
66use crate::Input;
67use crate::InputKind;
68use crate::Outputs;
69use crate::PrimitiveValue;
70use crate::Scope;
71use crate::ScopeIndex;
72use crate::ScopeRef;
73use crate::TaskExecutionBackend;
74use crate::TaskInputs;
75use crate::TaskSpawnInfo;
76use crate::TaskSpawnRequest;
77use crate::TaskValue;
78use crate::Value;
79use crate::config::Config;
80use crate::config::MAX_RETRIES;
81use crate::convert_unit_string;
82use crate::diagnostics::output_evaluation_failed;
83use crate::diagnostics::runtime_type_mismatch;
84use crate::diagnostics::task_execution_failed;
85use crate::diagnostics::task_localization_failed;
86use crate::eval::EvaluatedTask;
87use crate::http::Downloader;
88use crate::http::HttpDownloader;
89use crate::path;
90use crate::path::EvaluationPath;
91use crate::tree::SyntaxNode;
92use crate::v1::ExprEvaluator;
93use crate::v1::INPUTS_FILE;
94use crate::v1::OUTPUTS_FILE;
95use crate::v1::write_json_file;
96
97/// The default container requirement.
98pub const DEFAULT_TASK_REQUIREMENT_CONTAINER: &str = "ubuntu:latest";
99/// The default value for the `cpu` requirement.
100pub const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
101/// The default value for the `memory` requirement.
102pub const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * 1024 * 1024 * 1024;
103/// The default value for the `max_retries` requirement.
104pub const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
105
106/// The index of a task's root scope.
107const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
108/// The index of a task's output scope.
109const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
110/// The index of the evaluation scope where the WDL 1.2 `task` variable is
111/// visible.
112const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
113
114/// Gets the `container` requirement from a requirements map.
115pub(crate) fn container<'a>(
116    requirements: &'a HashMap<String, Value>,
117    default: Option<&'a str>,
118) -> Cow<'a, str> {
119    requirements
120        .get(TASK_REQUIREMENT_CONTAINER)
121        .or_else(|| requirements.get(TASK_REQUIREMENT_CONTAINER_ALIAS))
122        .and_then(|v| -> Option<Cow<'_, str>> {
123            // If the value is an array, use the first element or the default
124            // Note: in the future we should be resolving which element in the array is
125            // usable; this will require some work in Crankshaft to enable
126            if let Some(array) = v.as_array() {
127                return array.as_slice().first().map(|v| {
128                    v.as_string()
129                        .expect("type should be string")
130                        .as_ref()
131                        .into()
132                });
133            }
134
135            Some(
136                v.coerce(&PrimitiveType::String.into())
137                    .expect("type should coerce")
138                    .unwrap_string()
139                    .as_ref()
140                    .clone()
141                    .into(),
142            )
143        })
144        .and_then(|v| {
145            // Treat star as the default
146            if v == "*" { None } else { Some(v) }
147        })
148        .unwrap_or_else(|| {
149            default
150                .map(Into::into)
151                .unwrap_or(DEFAULT_TASK_REQUIREMENT_CONTAINER.into())
152        })
153}
154
155/// Gets the `cpu` requirement from a requirements map.
156pub(crate) fn cpu(requirements: &HashMap<String, Value>) -> f64 {
157    requirements
158        .get(TASK_REQUIREMENT_CPU)
159        .map(|v| {
160            v.coerce(&PrimitiveType::Float.into())
161                .expect("type should coerce")
162                .unwrap_float()
163        })
164        .unwrap_or(DEFAULT_TASK_REQUIREMENT_CPU)
165}
166
167/// Gets the `max_cpu` hint from a hints map.
168pub(crate) fn max_cpu(hints: &HashMap<String, Value>) -> Option<f64> {
169    hints
170        .get(TASK_HINT_MAX_CPU)
171        .or_else(|| hints.get(TASK_HINT_MAX_CPU_ALIAS))
172        .map(|v| {
173            v.coerce(&PrimitiveType::Float.into())
174                .expect("type should coerce")
175                .unwrap_float()
176        })
177}
178
179/// Gets the `memory` requirement from a requirements map.
180pub(crate) fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
181    Ok(requirements
182        .get(TASK_REQUIREMENT_MEMORY)
183        .map(|v| {
184            if let Some(v) = v.as_integer() {
185                return Ok(v);
186            }
187
188            if let Some(s) = v.as_string() {
189                return convert_unit_string(s)
190                    .and_then(|v| v.try_into().ok())
191                    .with_context(|| {
192                        format!("task specifies an invalid `memory` requirement `{s}`")
193                    });
194            }
195
196            unreachable!("value should be an integer or string");
197        })
198        .transpose()?
199        .unwrap_or(DEFAULT_TASK_REQUIREMENT_MEMORY))
200}
201
202/// Gets the `max_memory` hint from a hints map.
203pub(crate) fn max_memory(hints: &HashMap<String, Value>) -> Result<Option<i64>> {
204    hints
205        .get(TASK_HINT_MAX_MEMORY)
206        .or_else(|| hints.get(TASK_HINT_MAX_MEMORY_ALIAS))
207        .map(|v| {
208            if let Some(v) = v.as_integer() {
209                return Ok(v);
210            }
211
212            if let Some(s) = v.as_string() {
213                return convert_unit_string(s)
214                    .and_then(|v| v.try_into().ok())
215                    .with_context(|| {
216                        format!("task specifies an invalid `memory` requirement `{s}`")
217                    });
218            }
219
220            unreachable!("value should be an integer or string");
221        })
222        .transpose()
223}
224
225/// Used to evaluate expressions in tasks.
226struct TaskEvaluationContext<'a, 'b> {
227    /// The associated evaluation state.
228    state: &'a State<'b>,
229    /// The downloader to use for expression evaluation.
230    downloader: &'a HttpDownloader,
231    /// The current evaluation scope.
232    scope: ScopeIndex,
233    /// The work directory.
234    ///
235    /// This field is only available after task execution.
236    work_dir: Option<&'a EvaluationPath>,
237    /// The standard out value to use.
238    ///
239    /// This field is only available after task execution.
240    stdout: Option<&'a Value>,
241    /// The standard error value to use.
242    ///
243    /// This field is only available after task execution.
244    stderr: Option<&'a Value>,
245    /// The inputs for the evaluation.
246    inputs: Option<&'a [Input]>,
247    /// Whether or not the evaluation has associated task information.
248    ///
249    /// This is `true` when evaluating hints sections.
250    task: bool,
251}
252
253impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
254    /// Constructs a new expression evaluation context.
255    pub fn new(state: &'a State<'b>, downloader: &'a HttpDownloader, scope: ScopeIndex) -> Self {
256        Self {
257            state,
258            downloader,
259            scope,
260            work_dir: None,
261            stdout: None,
262            stderr: None,
263            inputs: None,
264            task: false,
265        }
266    }
267
268    /// Sets the working directory to use for the evaluation context.
269    pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
270        self.work_dir = Some(work_dir);
271        self
272    }
273
274    /// Sets the stdout value to use for the evaluation context.
275    pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
276        self.stdout = Some(stdout);
277        self
278    }
279
280    /// Sets the stderr value to use for the evaluation context.
281    pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
282        self.stderr = Some(stderr);
283        self
284    }
285
286    /// Sets the inputs associated with the evaluation context.
287    pub fn with_inputs(mut self, inputs: &'a [Input]) -> Self {
288        self.inputs = Some(inputs);
289        self
290    }
291
292    /// Marks the evaluation as having associated task information.
293    ///
294    /// This is used in evaluating hints sections.
295    pub fn with_task(mut self) -> Self {
296        self.task = true;
297        self
298    }
299}
300
301impl EvaluationContext for TaskEvaluationContext<'_, '_> {
302    fn version(&self) -> SupportedVersion {
303        self.state
304            .document
305            .version()
306            .expect("document should have a version")
307    }
308
309    fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
310        ScopeRef::new(&self.state.scopes, self.scope)
311            .lookup(name)
312            .cloned()
313            .ok_or_else(|| unknown_name(name, span))
314    }
315
316    fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
317        crate::resolve_type_name(self.state.document, name, span)
318    }
319
320    fn work_dir(&self) -> Option<&EvaluationPath> {
321        self.work_dir
322    }
323
324    fn temp_dir(&self) -> &Path {
325        self.state.temp_dir
326    }
327
328    fn stdout(&self) -> Option<&Value> {
329        self.stdout
330    }
331
332    fn stderr(&self) -> Option<&Value> {
333        self.stderr
334    }
335
336    fn task(&self) -> Option<&Task> {
337        if self.task {
338            Some(self.state.task)
339        } else {
340            None
341        }
342    }
343
344    fn translate_path(&self, path: &str) -> Option<Cow<'_, Path>> {
345        let inputs = self.inputs?;
346        let is_url = path::is_url(path);
347
348        // We cannot translate a relative path
349        if !is_url && Path::new(path).is_relative() {
350            return None;
351        }
352
353        // The most specific (i.e. shortest stripped path) wins
354        let (guest_path, stripped) = inputs
355            .iter()
356            .filter_map(|i| {
357                match i.path() {
358                    EvaluationPath::Local(base) if !is_url => {
359                        let stripped = Path::new(path).strip_prefix(base).ok()?;
360                        Some((i.guest_path()?, stripped.to_str()?))
361                    }
362                    EvaluationPath::Remote(url) if is_url => {
363                        let url = url.as_str();
364                        let stripped = path.strip_prefix(url.strip_suffix('/').unwrap_or(url))?;
365
366                        // Strip off the query string or fragment
367                        let stripped = if let Some(pos) = stripped.find('?') {
368                            &stripped[..pos]
369                        } else if let Some(pos) = stripped.find('#') {
370                            &stripped[..pos]
371                        } else {
372                            stripped.strip_prefix('/').unwrap_or(stripped)
373                        };
374
375                        Some((i.guest_path()?, stripped))
376                    }
377                    _ => None,
378                }
379            })
380            .min_by(|(_, a), (_, b)| a.len().cmp(&b.len()))?;
381
382        if stripped.is_empty() {
383            return Some(Path::new(guest_path).into());
384        }
385
386        Some(Path::new(guest_path).join(stripped).into())
387    }
388
389    fn downloader(&self) -> &dyn Downloader {
390        self.downloader
391    }
392}
393
394/// Represents task evaluation state.
395struct State<'a> {
396    /// The temp directory.
397    temp_dir: &'a Path,
398    /// The document containing the workflow being evaluated.
399    document: &'a Document,
400    /// The task being evaluated.
401    task: &'a Task,
402    /// The scopes of the task being evaluated.
403    ///
404    /// The first scope is the root scope, the second is the output scope, and
405    /// the third is the scope where the "task" variable is visible in 1.2+
406    /// evaluations.
407    scopes: [Scope; 3],
408    /// The environment variables of the task.
409    ///
410    /// Environment variables do not change between retries.
411    env: IndexMap<String, String>,
412}
413
414impl<'a> State<'a> {
415    /// Constructs a new task evaluation state.
416    fn new(temp_dir: &'a Path, document: &'a Document, task: &'a Task) -> Result<Self> {
417        // Tasks have a root scope (index 0), an output scope (index 1), and a `task`
418        // variable scope (index 2). The output scope inherits from the root scope and
419        // the task scope inherits from the output scope. Inputs and private
420        // declarations are evaluated into the root scope. Outputs are evaluated into
421        // the output scope. The task scope is used for evaluating expressions in both
422        // the command and output sections. Only the `task` variable in WDL 1.2 is
423        // introduced into the task scope; in previous WDL versions, the task scope will
424        // not have any local names.
425        let scopes = [
426            Scope::default(),
427            Scope::new(ROOT_SCOPE_INDEX),
428            Scope::new(OUTPUT_SCOPE_INDEX),
429        ];
430
431        Ok(Self {
432            temp_dir,
433            document,
434            task,
435            scopes,
436            env: Default::default(),
437        })
438    }
439}
440
441/// Represents the result of evaluating task sections before execution.
442struct EvaluatedSections {
443    /// The evaluated command.
444    command: String,
445    /// The evaluated requirements.
446    requirements: Arc<HashMap<String, Value>>,
447    /// The evaluated hints.
448    hints: Arc<HashMap<String, Value>>,
449    /// The inputs to the task.
450    inputs: Vec<Input>,
451}
452
453/// Represents a WDL V1 task evaluator.
454pub struct TaskEvaluator {
455    /// The associated evaluation configuration.
456    config: Arc<Config>,
457    /// The associated task execution backend.
458    backend: Arc<dyn TaskExecutionBackend>,
459    /// The cancellation token for cancelling task evaluation.
460    token: CancellationToken,
461    /// The downloader to use for expression evaluation.
462    downloader: HttpDownloader,
463}
464
465impl TaskEvaluator {
466    /// Constructs a new task evaluator with the given evaluation
467    /// configuration and cancellation token.
468    ///
469    /// This method creates a default task execution backend.
470    ///
471    /// Returns an error if the configuration isn't valid.
472    pub async fn new(config: Config, token: CancellationToken) -> Result<Self> {
473        let backend = config.create_backend().await?;
474        Self::new_with_backend(config, backend, token)
475    }
476
477    /// Constructs a new task evaluator with the given evaluation
478    /// configuration, task execution backend, and cancellation token.
479    ///
480    /// Returns an error if the configuration isn't valid.
481    pub fn new_with_backend(
482        config: Config,
483        backend: Arc<dyn TaskExecutionBackend>,
484        token: CancellationToken,
485    ) -> Result<Self> {
486        config.validate()?;
487
488        let config = Arc::new(config);
489        let downloader = HttpDownloader::new(config.clone())?;
490
491        Ok(Self {
492            config,
493            backend,
494            token,
495            downloader,
496        })
497    }
498
499    /// Creates a new task evaluator with the given configuration, backend,
500    /// cancellation token, and downloader.
501    ///
502    /// This method does not validate the configuration.
503    pub(crate) fn new_unchecked(
504        config: Arc<Config>,
505        backend: Arc<dyn TaskExecutionBackend>,
506        token: CancellationToken,
507        downloader: HttpDownloader,
508    ) -> Self {
509        Self {
510            config,
511            backend,
512            token,
513            downloader,
514        }
515    }
516
517    /// Evaluates the given task.
518    ///
519    /// Upon success, returns the evaluated task.
520    pub async fn evaluate<P, R>(
521        &self,
522        document: &Document,
523        task: &Task,
524        inputs: &TaskInputs,
525        root: impl AsRef<Path>,
526        progress: P,
527    ) -> EvaluationResult<EvaluatedTask>
528    where
529        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
530        R: Future<Output = ()> + Send,
531    {
532        self.evaluate_with_progress(
533            document,
534            task,
535            inputs,
536            root.as_ref(),
537            task.name(),
538            Arc::new(progress),
539        )
540        .await
541    }
542
543    /// Evaluates the given task with the given shared progress callback.
544    pub(crate) async fn evaluate_with_progress<P, R>(
545        &self,
546        document: &Document,
547        task: &Task,
548        inputs: &TaskInputs,
549        root: &Path,
550        id: &str,
551        progress: Arc<P>,
552    ) -> EvaluationResult<EvaluatedTask>
553    where
554        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
555        R: Future<Output = ()> + Send,
556    {
557        // We cannot evaluate a document with errors
558        if document.has_errors() {
559            return Err(anyhow!("cannot evaluate a document with errors").into());
560        }
561
562        progress(ProgressKind::TaskStarted { id }).await;
563
564        let result = self
565            .perform_evaluation(document, task, inputs, root, id, progress.clone())
566            .await;
567
568        progress(ProgressKind::TaskCompleted {
569            id,
570            result: &result,
571        })
572        .await;
573
574        result
575    }
576
577    /// Performs the actual evaluation of the task.
578    async fn perform_evaluation<P, R>(
579        &self,
580        document: &Document,
581        task: &Task,
582        inputs: &TaskInputs,
583        root: &Path,
584        id: &str,
585        progress: Arc<P>,
586    ) -> EvaluationResult<EvaluatedTask>
587    where
588        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
589        R: Future<Output = ()> + Send,
590    {
591        inputs.validate(document, task, None).with_context(|| {
592            format!(
593                "failed to validate the inputs to task `{task}`",
594                task = task.name()
595            )
596        })?;
597
598        let ast = match document.root().morph().ast() {
599            Ast::V1(ast) => ast,
600            _ => {
601                return Err(
602                    anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
603                );
604            }
605        };
606
607        // Find the task in the AST
608        let definition = ast
609            .tasks()
610            .find(|t| t.name().text() == task.name())
611            .expect("task should exist in the AST");
612
613        let version = document.version().expect("document should have version");
614
615        // Build an evaluation graph for the task
616        let mut diagnostics = Vec::new();
617        let graph = TaskGraphBuilder::default().build(version, &definition, &mut diagnostics);
618        assert!(
619            diagnostics.is_empty(),
620            "task evaluation graph should have no diagnostics"
621        );
622
623        debug!(
624            task_id = id,
625            task_name = task.name(),
626            document = document.uri().as_str(),
627            "evaluating task"
628        );
629
630        let root_dir = absolute(root).with_context(|| {
631            format!(
632                "failed to determine absolute path of `{path}`",
633                path = root.display()
634            )
635        })?;
636
637        // Create the temp directory now as it may be needed for task evaluation
638        let temp_dir = root_dir.join("tmp");
639        fs::create_dir_all(&temp_dir).with_context(|| {
640            format!(
641                "failed to create directory `{path}`",
642                path = temp_dir.display()
643            )
644        })?;
645
646        // Write the inputs to the task's root directory
647        write_json_file(root_dir.join(INPUTS_FILE), inputs)?;
648
649        let mut state = State::new(&temp_dir, document, task)?;
650        let nodes = toposort(&graph, None).expect("graph should be acyclic");
651        let mut current = 0;
652        while current < nodes.len() {
653            match &graph[nodes[current]] {
654                TaskGraphNode::Input(decl) => {
655                    self.evaluate_input(id, &mut state, decl, inputs)
656                        .await
657                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
658                }
659                TaskGraphNode::Decl(decl) => {
660                    self.evaluate_decl(id, &mut state, decl)
661                        .await
662                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
663                }
664                TaskGraphNode::Output(_) => {
665                    // Stop at the first output
666                    break;
667                }
668                TaskGraphNode::Command(_)
669                | TaskGraphNode::Runtime(_)
670                | TaskGraphNode::Requirements(_)
671                | TaskGraphNode::Hints(_) => {
672                    // Skip these sections for now; they'll evaluate in the
673                    // retry loop
674                }
675            }
676
677            current += 1;
678        }
679
680        // TODO: check call cache for a hit. if so, skip task execution and use cache
681        // paths for output evaluation
682
683        let env = Arc::new(mem::take(&mut state.env));
684
685        // Spawn the task in a retry loop
686        let mut attempt = 0;
687        let mut evaluated = loop {
688            let EvaluatedSections {
689                command,
690                requirements,
691                hints,
692                inputs,
693            } = self
694                .evaluate_sections(id, &mut state, &definition, inputs, attempt)
695                .await?;
696
697            // Get the maximum number of retries, either from the task's requirements or
698            // from configuration
699            let max_retries = requirements
700                .get(TASK_REQUIREMENT_MAX_RETRIES)
701                .or_else(|| requirements.get(TASK_REQUIREMENT_MAX_RETRIES_ALIAS))
702                .cloned()
703                .map(|v| v.unwrap_integer() as u64)
704                .or_else(|| self.config.task.retries)
705                .unwrap_or(DEFAULT_TASK_REQUIREMENT_MAX_RETRIES);
706
707            if max_retries > MAX_RETRIES {
708                return Err(anyhow!(
709                    "task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
710                )
711                .into());
712            }
713
714            let mut attempt_dir = root_dir.clone();
715            attempt_dir.push("attempts");
716            attempt_dir.push(attempt.to_string());
717
718            let request = TaskSpawnRequest::new(
719                id.to_string(),
720                TaskSpawnInfo::new(
721                    command,
722                    inputs,
723                    requirements.clone(),
724                    hints.clone(),
725                    env.clone(),
726                ),
727                attempt,
728                attempt_dir.clone(),
729            );
730
731            let events = self
732                .backend
733                .spawn(request, self.token.clone())
734                .with_context(|| {
735                    format!(
736                        "failed to spawn task `{name}` in `{path}` (task id `{id}`)",
737                        name = task.name(),
738                        path = document.path(),
739                    )
740                })?;
741
742            if attempt > 0 {
743                progress(ProgressKind::TaskRetried {
744                    id,
745                    retry: attempt - 1,
746                });
747            }
748
749            // Await the spawned notification first
750            events.spawned.await.ok();
751
752            progress(ProgressKind::TaskExecutionStarted { id });
753
754            let result = events
755                .completed
756                .await
757                .expect("failed to receive response from spawned task");
758
759            progress(ProgressKind::TaskExecutionCompleted {
760                id,
761                result: &result,
762            });
763
764            let result = result.map_err(|e| {
765                EvaluationError::new(
766                    state.document.clone(),
767                    task_execution_failed(e, task.name(), id, task.name_span()),
768                )
769            })?;
770
771            // Update the task variable
772            let evaluated = EvaluatedTask::new(attempt_dir, result)?;
773            if version >= SupportedVersion::V1(V1::Two) {
774                let task = state.scopes[TASK_SCOPE_INDEX.0]
775                    .get_mut(TASK_VAR_NAME)
776                    .unwrap()
777                    .as_task_mut()
778                    .unwrap();
779
780                task.set_attempt(attempt.try_into().with_context(|| {
781                    format!(
782                        "too many attempts were made to run task `{task}`",
783                        task = state.task.name()
784                    )
785                })?);
786                task.set_return_code(evaluated.result.exit_code);
787            }
788
789            if let Err(e) = evaluated.handle_exit(&requirements, &self.downloader).await {
790                if attempt >= max_retries {
791                    return Err(EvaluationError::new(
792                        state.document.clone(),
793                        task_execution_failed(e, task.name(), id, task.name_span()),
794                    ));
795                }
796
797                attempt += 1;
798
799                info!(
800                    "retrying execution of task `{name}` (retry {attempt})",
801                    name = state.task.name()
802                );
803                continue;
804            }
805
806            break evaluated;
807        };
808
809        // Evaluate the remaining inputs (unused), and decls, and outputs
810        for index in &nodes[current..] {
811            match &graph[*index] {
812                TaskGraphNode::Decl(decl) => {
813                    self.evaluate_decl(id, &mut state, decl)
814                        .await
815                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
816                }
817                TaskGraphNode::Output(decl) => {
818                    self.evaluate_output(id, &mut state, decl, &evaluated)
819                        .await
820                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
821                }
822                _ => {
823                    unreachable!(
824                        "only declarations and outputs should be evaluated after the command"
825                    )
826                }
827            }
828        }
829
830        // Take the output scope and return it
831        let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
832        drop(state);
833        if let Some(section) = definition.output() {
834            let indexes: HashMap<_, _> = section
835                .declarations()
836                .enumerate()
837                .map(|(i, d)| (d.name().hashable(), i))
838                .collect();
839            outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
840        }
841
842        // Write the outputs to the task's root directory
843        write_json_file(root_dir.join(OUTPUTS_FILE), &outputs)?;
844
845        evaluated.outputs = Ok(outputs);
846        Ok(evaluated)
847    }
848
849    /// Evaluates a task input.
850    async fn evaluate_input(
851        &self,
852        id: &str,
853        state: &mut State<'_>,
854        decl: &Decl<SyntaxNode>,
855        inputs: &TaskInputs,
856    ) -> Result<(), Diagnostic> {
857        let name = decl.name();
858        let decl_ty = decl.ty();
859        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
860
861        let (value, span) = match inputs.get(name.text()) {
862            Some(input) => (input.clone(), name.span()),
863            None => match decl.expr() {
864                Some(expr) => {
865                    debug!(
866                        task_id = id,
867                        task_name = state.task.name(),
868                        document = state.document.uri().as_str(),
869                        input_name = name.text(),
870                        "evaluating input"
871                    );
872
873                    let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
874                        state,
875                        &self.downloader,
876                        ROOT_SCOPE_INDEX,
877                    ));
878                    let value = evaluator.evaluate_expr(&expr).await?;
879                    (value, expr.span())
880                }
881                _ => {
882                    assert!(decl.ty().is_optional(), "type should be optional");
883                    (Value::None, name.span())
884                }
885            },
886        };
887
888        let value = value
889            .coerce(&ty)
890            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), span))?;
891        state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
892
893        // Insert an environment variable, if it is one
894        if decl.env().is_some() {
895            state.env.insert(
896                name.text().to_string(),
897                value
898                    .as_primitive()
899                    .expect("value should be primitive")
900                    .raw(None)
901                    .to_string(),
902            );
903        }
904
905        Ok(())
906    }
907
908    /// Evaluates a task private declaration.
909    async fn evaluate_decl(
910        &self,
911        id: &str,
912        state: &mut State<'_>,
913        decl: &Decl<SyntaxNode>,
914    ) -> Result<(), Diagnostic> {
915        let name = decl.name();
916        debug!(
917            task_id = id,
918            task_name = state.task.name(),
919            document = state.document.uri().as_str(),
920            decl_name = name.text(),
921            "evaluating private declaration",
922        );
923
924        let decl_ty = decl.ty();
925        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
926
927        let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
928            state,
929            &self.downloader,
930            ROOT_SCOPE_INDEX,
931        ));
932
933        let expr = decl.expr().expect("private decls should have expressions");
934        let value = evaluator.evaluate_expr(&expr).await?;
935        let value = value
936            .coerce(&ty)
937            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
938        state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
939
940        // Insert an environment variable, if it is one
941        if decl.env().is_some() {
942            state.env.insert(
943                name.text().to_string(),
944                value
945                    .as_primitive()
946                    .expect("value should be primitive")
947                    .raw(None)
948                    .to_string(),
949            );
950        }
951
952        Ok(())
953    }
954
955    /// Evaluates the runtime section.
956    ///
957    /// Returns both the task's hints and requirements.
958    async fn evaluate_runtime_section(
959        &self,
960        id: &str,
961        state: &State<'_>,
962        section: &RuntimeSection<SyntaxNode>,
963        inputs: &TaskInputs,
964    ) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
965        debug!(
966            task_id = id,
967            task_name = state.task.name(),
968            document = state.document.uri().as_str(),
969            "evaluating runtimes section",
970        );
971
972        let mut requirements = HashMap::new();
973        let mut hints = HashMap::new();
974
975        let version = state
976            .document
977            .version()
978            .expect("document should have version");
979        for item in section.items() {
980            let name = item.name();
981            match inputs.requirement(name.text()) {
982                Some(value) => {
983                    requirements.insert(name.text().to_string(), value.clone());
984                    continue;
985                }
986                _ => {
987                    if let Some(value) = inputs.hint(name.text()) {
988                        hints.insert(name.text().to_string(), value.clone());
989                        continue;
990                    }
991                }
992            }
993
994            let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
995                state,
996                &self.downloader,
997                ROOT_SCOPE_INDEX,
998            ));
999
1000            let (types, requirement) = match task_requirement_types(version, name.text()) {
1001                Some(types) => (Some(types), true),
1002                None => match task_hint_types(version, name.text(), false) {
1003                    Some(types) => (Some(types), false),
1004                    None => (None, false),
1005                },
1006            };
1007
1008            // Evaluate and coerce to the expected type
1009            let expr = item.expr();
1010            let mut value = evaluator.evaluate_expr(&expr).await?;
1011            if let Some(types) = types {
1012                value = types
1013                    .iter()
1014                    .find_map(|ty| value.coerce(ty).ok())
1015                    .ok_or_else(|| {
1016                        multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1017                    })?;
1018            }
1019
1020            if requirement {
1021                requirements.insert(name.text().to_string(), value);
1022            } else {
1023                hints.insert(name.text().to_string(), value);
1024            }
1025        }
1026
1027        Ok((requirements, hints))
1028    }
1029
1030    /// Evaluates the requirements section.
1031    async fn evaluate_requirements_section(
1032        &self,
1033        id: &str,
1034        state: &State<'_>,
1035        section: &RequirementsSection<SyntaxNode>,
1036        inputs: &TaskInputs,
1037    ) -> Result<HashMap<String, Value>, Diagnostic> {
1038        debug!(
1039            task_id = id,
1040            task_name = state.task.name(),
1041            document = state.document.uri().as_str(),
1042            "evaluating requirements",
1043        );
1044
1045        let mut requirements = HashMap::new();
1046
1047        let version = state
1048            .document
1049            .version()
1050            .expect("document should have version");
1051        for item in section.items() {
1052            let name = item.name();
1053            if let Some(value) = inputs.requirement(name.text()) {
1054                requirements.insert(name.text().to_string(), value.clone());
1055                continue;
1056            }
1057
1058            let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1059                state,
1060                &self.downloader,
1061                ROOT_SCOPE_INDEX,
1062            ));
1063
1064            let types =
1065                task_requirement_types(version, name.text()).expect("requirement should be known");
1066
1067            // Evaluate and coerce to the expected type
1068            let expr = item.expr();
1069            let value = evaluator.evaluate_expr(&expr).await?;
1070            let value = types
1071                .iter()
1072                .find_map(|ty| value.coerce(ty).ok())
1073                .ok_or_else(|| {
1074                    multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1075                })?;
1076
1077            requirements.insert(name.text().to_string(), value);
1078        }
1079
1080        Ok(requirements)
1081    }
1082
1083    /// Evaluates the hints section.
1084    async fn evaluate_hints_section(
1085        &self,
1086        id: &str,
1087        state: &State<'_>,
1088        section: &TaskHintsSection<SyntaxNode>,
1089        inputs: &TaskInputs,
1090    ) -> Result<HashMap<String, Value>, Diagnostic> {
1091        debug!(
1092            task_id = id,
1093            task_name = state.task.name(),
1094            document = state.document.uri().as_str(),
1095            "evaluating hints section",
1096        );
1097
1098        let mut hints = HashMap::new();
1099
1100        for item in section.items() {
1101            let name = item.name();
1102            if let Some(value) = inputs.hint(name.text()) {
1103                hints.insert(name.text().to_string(), value.clone());
1104                continue;
1105            }
1106
1107            let mut evaluator = ExprEvaluator::new(
1108                TaskEvaluationContext::new(state, &self.downloader, ROOT_SCOPE_INDEX).with_task(),
1109            );
1110
1111            let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
1112            hints.insert(name.text().to_string(), value);
1113        }
1114
1115        Ok(hints)
1116    }
1117
1118    /// Evaluates the command of a task.
1119    ///
1120    /// Returns the evaluated command and the mounts to use for spawning the
1121    /// task.
1122    async fn evaluate_command(
1123        &self,
1124        id: &str,
1125        state: &State<'_>,
1126        section: &CommandSection<SyntaxNode>,
1127    ) -> EvaluationResult<(String, Vec<Input>)> {
1128        debug!(
1129            task_id = id,
1130            task_name = state.task.name(),
1131            document = state.document.uri().as_str(),
1132            "evaluating command section",
1133        );
1134
1135        // Determine the inputs to the task
1136        let mut inputs = Vec::new();
1137
1138        // Discover every input that's visible to the scope
1139        ScopeRef::new(&state.scopes, TASK_SCOPE_INDEX.0).for_each(|_, v| {
1140            v.visit_paths(false, &mut |_, value| {
1141                inputs.push(Input::from_primitive(value)?);
1142                Ok(())
1143            })
1144        })?;
1145
1146        // The temp directory should always be an input
1147        inputs.push(Input::new(
1148            InputKind::Directory,
1149            EvaluationPath::Local(state.temp_dir.to_path_buf()),
1150        ));
1151
1152        // Localize the inputs
1153        self.backend
1154            .localize_inputs(&self.downloader, &mut inputs)
1155            .await
1156            .map_err(|e| {
1157                EvaluationError::new(
1158                    state.document.clone(),
1159                    task_localization_failed(e, state.task.name(), state.task.name_span()),
1160                )
1161            })?;
1162
1163        if enabled!(Level::DEBUG) {
1164            for input in inputs.iter() {
1165                if let Some(location) = input.location() {
1166                    debug!(
1167                        task_id = id,
1168                        task_name = state.task.name(),
1169                        document = state.document.uri().as_str(),
1170                        "task input `{path}` (downloaded to `{location}`) mapped to `{guest_path}`",
1171                        path = input.path().display(),
1172                        location = location.display(),
1173                        guest_path = input.guest_path().unwrap_or(""),
1174                    );
1175                } else {
1176                    debug!(
1177                        task_id = id,
1178                        task_name = state.task.name(),
1179                        document = state.document.uri().as_str(),
1180                        "task input `{path}` mapped to `{guest_path}`",
1181                        path = input.path().display(),
1182                        guest_path = input.guest_path().unwrap_or(""),
1183                    );
1184                }
1185            }
1186        }
1187
1188        let mut command = String::new();
1189        match section.strip_whitespace() {
1190            Some(parts) => {
1191                let mut evaluator = ExprEvaluator::new(
1192                    TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1193                        .with_inputs(&inputs),
1194                );
1195
1196                for part in parts {
1197                    match part {
1198                        StrippedCommandPart::Text(t) => {
1199                            command.push_str(t.as_str());
1200                        }
1201                        StrippedCommandPart::Placeholder(placeholder) => {
1202                            evaluator
1203                                .evaluate_placeholder(&placeholder, &mut command)
1204                                .await
1205                                .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1206                        }
1207                    }
1208                }
1209            }
1210            _ => {
1211                warn!(
1212                    "command for task `{task}` in `{uri}` has mixed indentation; whitespace \
1213                     stripping was skipped",
1214                    task = state.task.name(),
1215                    uri = state.document.uri(),
1216                );
1217
1218                let mut evaluator = ExprEvaluator::new(
1219                    TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1220                        .with_inputs(&inputs),
1221                );
1222
1223                let heredoc = section.is_heredoc();
1224                for part in section.parts() {
1225                    match part {
1226                        CommandPart::Text(t) => {
1227                            t.unescape_to(heredoc, &mut command);
1228                        }
1229                        CommandPart::Placeholder(placeholder) => {
1230                            evaluator
1231                                .evaluate_placeholder(&placeholder, &mut command)
1232                                .await
1233                                .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1234                        }
1235                    }
1236                }
1237            }
1238        }
1239
1240        Ok((command, inputs))
1241    }
1242
1243    /// Evaluates sections prior to spawning the command.
1244    ///
1245    /// This method evaluates the following sections:
1246    ///   * runtime
1247    ///   * requirements
1248    ///   * hints
1249    ///   * command
1250    async fn evaluate_sections(
1251        &self,
1252        id: &str,
1253        state: &mut State<'_>,
1254        definition: &TaskDefinition<SyntaxNode>,
1255        inputs: &TaskInputs,
1256        attempt: u64,
1257    ) -> EvaluationResult<EvaluatedSections> {
1258        // Start by evaluating requirements and hints
1259        let (requirements, hints) = match definition.runtime() {
1260            Some(section) => self
1261                .evaluate_runtime_section(id, state, &section, inputs)
1262                .await
1263                .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1264            _ => (
1265                match definition.requirements() {
1266                    Some(section) => self
1267                        .evaluate_requirements_section(id, state, &section, inputs)
1268                        .await
1269                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1270                    None => Default::default(),
1271                },
1272                match definition.hints() {
1273                    Some(section) => self
1274                        .evaluate_hints_section(id, state, &section, inputs)
1275                        .await
1276                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1277                    None => Default::default(),
1278                },
1279            ),
1280        };
1281
1282        // Update or insert the `task` variable in the task scope
1283        // TODO: if task variables become visible in `requirements` or `hints` section,
1284        // this needs to be relocated to before we evaluate those sections
1285        if state.document.version() >= Some(SupportedVersion::V1(V1::Two)) {
1286            // Get the execution constraints
1287            let constraints = self
1288                .backend
1289                .constraints(&requirements, &hints)
1290                .with_context(|| {
1291                    format!(
1292                        "failed to get constraints for task `{task}`",
1293                        task = state.task.name()
1294                    )
1295                })?;
1296
1297            let task = TaskValue::new_v1(
1298                state.task.name(),
1299                id,
1300                definition,
1301                constraints,
1302                attempt.try_into().with_context(|| {
1303                    format!(
1304                        "too many attempts were made to run task `{task}`",
1305                        task = state.task.name()
1306                    )
1307                })?,
1308            );
1309
1310            let scope = &mut state.scopes[TASK_SCOPE_INDEX.0];
1311            if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1312                *v = Value::Task(task);
1313            } else {
1314                scope.insert(TASK_VAR_NAME, Value::Task(task));
1315            }
1316        }
1317
1318        let (command, inputs) = self
1319            .evaluate_command(
1320                id,
1321                state,
1322                &definition.command().expect("must have command section"),
1323            )
1324            .await?;
1325
1326        Ok(EvaluatedSections {
1327            command,
1328            requirements: Arc::new(requirements),
1329            hints: Arc::new(hints),
1330            inputs,
1331        })
1332    }
1333
1334    /// Evaluates a task output.
1335    async fn evaluate_output(
1336        &self,
1337        id: &str,
1338        state: &mut State<'_>,
1339        decl: &Decl<SyntaxNode>,
1340        evaluated: &EvaluatedTask,
1341    ) -> Result<(), Diagnostic> {
1342        let name = decl.name();
1343        debug!(
1344            task_id = id,
1345            task_name = state.task.name(),
1346            document = state.document.uri().as_str(),
1347            output_name = name.text(),
1348            "evaluating output",
1349        );
1350
1351        let decl_ty = decl.ty();
1352        let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1353        let mut evaluator = ExprEvaluator::new(
1354            TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1355                .with_work_dir(&evaluated.result.work_dir)
1356                .with_stdout(&evaluated.result.stdout)
1357                .with_stderr(&evaluated.result.stderr),
1358        );
1359
1360        let expr = decl.expr().expect("outputs should have expressions");
1361        let value = evaluator.evaluate_expr(&expr).await?;
1362
1363        // First coerce the output value to the expected type
1364        let mut value = value
1365            .coerce(&ty)
1366            .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1367
1368        let result = if let Some(guest_work_dir) = self.backend.guest_work_dir() {
1369            // Perform guest to host path translation and check for existence
1370            value.visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1371                let path = match value {
1372                    PrimitiveValue::File(path) => path,
1373                    PrimitiveValue::Directory(path) => path,
1374                    _ => unreachable!("only file and directory values should be visited"),
1375                };
1376
1377                // If the path isn't in the temp directory or the attempt directory, perform
1378                // translation
1379                if !Path::new(path.as_str()).starts_with(state.temp_dir)
1380                    && !Path::new(path.as_str()).starts_with(evaluated.attempt_dir())
1381                {
1382                    // It's a file scheme'd URL, treat it as an absolute guest path
1383                    let guest = if path::is_file_url(path) {
1384                        path::parse_url(path)
1385                            .and_then(|u| u.to_file_path().ok())
1386                            .ok_or_else(|| anyhow!("guest path `{path}` is not a valid file URI"))?
1387                    } else if path::is_url(path) {
1388                        // Treat other URLs as if they exist
1389                        // TODO: should probably issue a HEAD request to verify
1390                        return Ok(true);
1391                    } else {
1392                        // Otherwise, treat as relative to the guest working directory
1393                        guest_work_dir.join(path.as_str())
1394                    };
1395
1396                    // If the path is inside of the working directory, join with the task's working
1397                    // directory
1398                    let host = if let Ok(stripped) = guest.strip_prefix(guest_work_dir) {
1399                        Cow::Owned(
1400                            evaluated.result.work_dir.join(
1401                                stripped.to_str().with_context(|| {
1402                                    format!("output path `{path}` is not UTF-8")
1403                                })?,
1404                            )?,
1405                        )
1406                    } else {
1407                        evaluated
1408                            .inputs()
1409                            .iter()
1410                            .filter_map(|i| {
1411                                Some((i.path(), guest.strip_prefix(i.guest_path()?).ok()?))
1412                            })
1413                            .min_by(|(_, a), (_, b)| a.as_os_str().len().cmp(&b.as_os_str().len()))
1414                            .and_then(|(path, stripped)| {
1415                                if stripped.as_os_str().is_empty() {
1416                                    return Some(Cow::Borrowed(path));
1417                                }
1418
1419                                Some(Cow::Owned(path.join(stripped.to_str()?).ok()?))
1420                            })
1421                            .ok_or_else(|| {
1422                                anyhow!("guest path `{path}` is not within a container mount")
1423                            })?
1424                    };
1425
1426                    // Update the value to the host path
1427                    *Arc::make_mut(path) = host.into_owned().try_into()?;
1428                }
1429
1430                // Finally, ensure the value exists
1431                value.ensure_path_exists(optional)
1432            })
1433        } else {
1434            // Backend isn't containerized, just join host paths and check for existence
1435            value.visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1436                if let Some(work_dir) = evaluated.result.work_dir.as_local() {
1437                    value.join_path_to(work_dir);
1438                }
1439
1440                value.ensure_path_exists(optional)
1441            })
1442        };
1443
1444        result.map_err(|e| {
1445            output_evaluation_failed(e, state.task.name(), true, name.text(), name.span())
1446        })?;
1447
1448        state.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
1449        Ok(())
1450    }
1451}