1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::future::Future;
7use std::mem;
8use std::path::Path;
9use std::path::absolute;
10use std::str::FromStr;
11use std::sync::Arc;
12
13use anyhow::Context;
14use anyhow::Result;
15use anyhow::anyhow;
16use anyhow::bail;
17use indexmap::IndexMap;
18use petgraph::algo::toposort;
19use tokio_util::sync::CancellationToken;
20use tracing::Level;
21use tracing::debug;
22use tracing::enabled;
23use tracing::info;
24use tracing::warn;
25use wdl_analysis::Document;
26use wdl_analysis::diagnostics::multiple_type_mismatch;
27use wdl_analysis::diagnostics::unknown_name;
28use wdl_analysis::document::TASK_VAR_NAME;
29use wdl_analysis::document::Task;
30use wdl_analysis::eval::v1::TaskGraphBuilder;
31use wdl_analysis::eval::v1::TaskGraphNode;
32use wdl_analysis::types::Optional;
33use wdl_analysis::types::PrimitiveType;
34use wdl_analysis::types::Type;
35use wdl_analysis::types::v1::task_hint_types;
36use wdl_analysis::types::v1::task_requirement_types;
37use wdl_ast::Ast;
38use wdl_ast::AstNode;
39use wdl_ast::AstToken;
40use wdl_ast::Diagnostic;
41use wdl_ast::Span;
42use wdl_ast::SupportedVersion;
43use wdl_ast::v1::CommandPart;
44use wdl_ast::v1::CommandSection;
45use wdl_ast::v1::Decl;
46use wdl_ast::v1::RequirementsSection;
47use wdl_ast::v1::RuntimeSection;
48use wdl_ast::v1::StrippedCommandPart;
49use wdl_ast::v1::TASK_HINT_DISKS;
50use wdl_ast::v1::TASK_HINT_MAX_CPU;
51use wdl_ast::v1::TASK_HINT_MAX_CPU_ALIAS;
52use wdl_ast::v1::TASK_HINT_MAX_MEMORY;
53use wdl_ast::v1::TASK_HINT_MAX_MEMORY_ALIAS;
54use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER;
55use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER_ALIAS;
56use wdl_ast::v1::TASK_REQUIREMENT_CPU;
57use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
58use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES;
59use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES_ALIAS;
60use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
61use wdl_ast::v1::TaskDefinition;
62use wdl_ast::v1::TaskHintsSection;
63use wdl_ast::version::V1;
64
65use super::ProgressKind;
66use crate::Coercible;
67use crate::EvaluationContext;
68use crate::EvaluationError;
69use crate::EvaluationResult;
70use crate::Input;
71use crate::InputKind;
72use crate::ONE_GIBIBYTE;
73use crate::Outputs;
74use crate::PrimitiveValue;
75use crate::Scope;
76use crate::ScopeIndex;
77use crate::ScopeRef;
78use crate::StorageUnit;
79use crate::TaskExecutionBackend;
80use crate::TaskInputs;
81use crate::TaskSpawnInfo;
82use crate::TaskSpawnRequest;
83use crate::TaskValue;
84use crate::Value;
85use crate::config::Config;
86use crate::config::MAX_RETRIES;
87use crate::convert_unit_string;
88use crate::diagnostics::output_evaluation_failed;
89use crate::diagnostics::runtime_type_mismatch;
90use crate::diagnostics::task_execution_failed;
91use crate::diagnostics::task_localization_failed;
92use crate::eval::EvaluatedTask;
93use crate::http::Downloader;
94use crate::http::HttpDownloader;
95use crate::path;
96use crate::path::EvaluationPath;
97use crate::tree::SyntaxNode;
98use crate::v1::ExprEvaluator;
99use crate::v1::INPUTS_FILE;
100use crate::v1::OUTPUTS_FILE;
101use crate::v1::write_json_file;
102
103pub const DEFAULT_TASK_REQUIREMENT_CONTAINER: &str = "ubuntu:latest";
105pub const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
107pub const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * (ONE_GIBIBYTE as i64);
109pub const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
111pub const DEFAULT_TASK_REQUIREMENT_DISKS: f64 = 1.0;
113
114const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
116const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
118const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
121
122pub(crate) fn container<'a>(
124 requirements: &'a HashMap<String, Value>,
125 default: Option<&'a str>,
126) -> Cow<'a, str> {
127 requirements
128 .get(TASK_REQUIREMENT_CONTAINER)
129 .or_else(|| requirements.get(TASK_REQUIREMENT_CONTAINER_ALIAS))
130 .and_then(|v| -> Option<Cow<'_, str>> {
131 if let Some(array) = v.as_array() {
135 return array.as_slice().first().map(|v| {
136 v.as_string()
137 .expect("type should be string")
138 .as_ref()
139 .into()
140 });
141 }
142
143 Some(
144 v.coerce(&PrimitiveType::String.into())
145 .expect("type should coerce")
146 .unwrap_string()
147 .as_ref()
148 .clone()
149 .into(),
150 )
151 })
152 .and_then(|v| {
153 if v == "*" { None } else { Some(v) }
155 })
156 .unwrap_or_else(|| {
157 default
158 .map(Into::into)
159 .unwrap_or(DEFAULT_TASK_REQUIREMENT_CONTAINER.into())
160 })
161}
162
163pub(crate) fn cpu(requirements: &HashMap<String, Value>) -> f64 {
165 requirements
166 .get(TASK_REQUIREMENT_CPU)
167 .map(|v| {
168 v.coerce(&PrimitiveType::Float.into())
169 .expect("type should coerce")
170 .unwrap_float()
171 })
172 .unwrap_or(DEFAULT_TASK_REQUIREMENT_CPU)
173}
174
175pub(crate) fn max_cpu(hints: &HashMap<String, Value>) -> Option<f64> {
177 hints
178 .get(TASK_HINT_MAX_CPU)
179 .or_else(|| hints.get(TASK_HINT_MAX_CPU_ALIAS))
180 .map(|v| {
181 v.coerce(&PrimitiveType::Float.into())
182 .expect("type should coerce")
183 .unwrap_float()
184 })
185}
186
187pub(crate) fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
189 Ok(requirements
190 .get(TASK_REQUIREMENT_MEMORY)
191 .map(|v| {
192 if let Some(v) = v.as_integer() {
193 return Ok(v);
194 }
195
196 if let Some(s) = v.as_string() {
197 return convert_unit_string(s)
198 .and_then(|v| v.try_into().ok())
199 .with_context(|| {
200 format!("task specifies an invalid `memory` requirement `{s}`")
201 });
202 }
203
204 unreachable!("value should be an integer or string");
205 })
206 .transpose()?
207 .unwrap_or(DEFAULT_TASK_REQUIREMENT_MEMORY))
208}
209
210pub(crate) fn max_memory(hints: &HashMap<String, Value>) -> Result<Option<i64>> {
212 hints
213 .get(TASK_HINT_MAX_MEMORY)
214 .or_else(|| hints.get(TASK_HINT_MAX_MEMORY_ALIAS))
215 .map(|v| {
216 if let Some(v) = v.as_integer() {
217 return Ok(v);
218 }
219
220 if let Some(s) = v.as_string() {
221 return convert_unit_string(s)
222 .and_then(|v| v.try_into().ok())
223 .with_context(|| {
224 format!("task specifies an invalid `memory` requirement `{s}`")
225 });
226 }
227
228 unreachable!("value should be an integer or string");
229 })
230 .transpose()
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
237pub enum DiskType {
238 SSD,
240 HDD,
242}
243
244impl FromStr for DiskType {
245 type Err = ();
246
247 fn from_str(s: &str) -> Result<Self, Self::Err> {
248 match s {
249 "SSD" => Ok(Self::SSD),
250 "HDD" => Ok(Self::HDD),
251 _ => Err(()),
252 }
253 }
254}
255
256pub struct DiskRequirement {
258 pub size: i64,
260
261 pub ty: Option<DiskType>,
263}
264
265pub(crate) fn disks<'a>(
269 requirements: &'a HashMap<String, Value>,
270 hints: &HashMap<String, Value>,
271) -> Result<HashMap<&'a str, DiskRequirement>> {
272 fn lookup_type(mount_point: Option<&str>, hints: &HashMap<String, Value>) -> Option<DiskType> {
276 hints.get(TASK_HINT_DISKS).and_then(|v| {
277 if let Some(ty) = v.as_string() {
278 return ty.parse().ok();
279 }
280
281 if let Some(map) = v.as_map() {
282 if let Some((_, v)) = map.iter().find(|(k, _)| match (k, mount_point) {
285 (None, None) => true,
286 (None, Some(_)) | (Some(_), None) => false,
287 (Some(k), Some(mount_point)) => k
288 .as_string()
289 .map(|k| k.as_str() == mount_point)
290 .unwrap_or(false),
291 }) {
292 return v.as_string().and_then(|ty| ty.parse().ok());
293 }
294 }
295
296 None
297 })
298 }
299
300 fn parse_disk_spec(spec: &str) -> Option<(i64, Option<&str>)> {
303 let iter = spec.split_whitespace();
304 let mut first = None;
305 let mut second = None;
306 let mut third = None;
307
308 for part in iter {
309 if first.is_none() {
310 first = Some(part);
311 continue;
312 }
313
314 if second.is_none() {
315 second = Some(part);
316 continue;
317 }
318
319 if third.is_none() {
320 third = Some(part);
321 continue;
322 }
323
324 return None;
325 }
326
327 match (first, second, third) {
328 (None, None, None) => None,
329 (Some(size), None, None) => {
330 Some((size.parse().ok()?, None))
332 }
333 (Some(first), Some(second), None) => {
334 if let Ok(size) = first.parse() {
336 let unit: StorageUnit = second.parse().ok()?;
337 let size = unit.bytes(size)? / (ONE_GIBIBYTE as u64);
338 return Some((size.try_into().ok()?, None));
339 }
340
341 if !first.starts_with('/') {
344 return None;
345 }
346
347 Some((second.parse().ok()?, Some(first)))
348 }
349 (Some(mount_point), Some(size), Some(unit)) => {
350 let unit: StorageUnit = unit.parse().ok()?;
352 let size = unit.bytes(size.parse().ok()?)? / (ONE_GIBIBYTE as u64);
353
354 if !mount_point.starts_with('/') {
356 return None;
357 }
358
359 Some((size.try_into().ok()?, Some(mount_point)))
360 }
361 _ => unreachable!("should have one, two, or three values"),
362 }
363 }
364
365 fn insert_disk<'a>(
367 spec: &'a str,
368 hints: &HashMap<String, Value>,
369 disks: &mut HashMap<&'a str, DiskRequirement>,
370 ) -> Result<()> {
371 let (size, mount_point) =
372 parse_disk_spec(spec).with_context(|| format!("invalid disk specification `{spec}"))?;
373
374 let prev = disks.insert(
375 mount_point.unwrap_or("/"),
376 DiskRequirement {
377 size,
378 ty: lookup_type(mount_point, hints),
379 },
380 );
381
382 if prev.is_some() {
383 bail!(
384 "duplicate mount point `{mp}` specified in `disks` requirement",
385 mp = mount_point.unwrap_or("/")
386 );
387 }
388
389 Ok(())
390 }
391
392 let mut disks = HashMap::new();
393 if let Some(v) = requirements.get(TASK_REQUIREMENT_DISKS) {
394 if let Some(size) = v.as_integer() {
395 if size < 0 {
397 bail!("task requirement `disks` cannot be less than zero");
398 }
399
400 disks.insert(
401 "/",
402 DiskRequirement {
403 size,
404 ty: lookup_type(None, hints),
405 },
406 );
407 } else if let Some(spec) = v.as_string() {
408 insert_disk(spec, hints, &mut disks)?;
409 } else if let Some(v) = v.as_array() {
410 for spec in v.as_slice() {
411 insert_disk(
412 spec.as_string().expect("spec should be a string"),
413 hints,
414 &mut disks,
415 )?;
416 }
417 } else {
418 unreachable!("value should be an integer, string, or array");
419 }
420 }
421
422 Ok(disks)
423}
424
425pub(crate) fn preemptible(hints: &HashMap<String, Value>) -> i64 {
431 const TASK_HINT_PREEMPTIBLE: &str = "preemptible";
432 const DEFAULT_TASK_HINT_PREEMPTIBLE: i64 = 0;
433
434 hints
435 .get(TASK_HINT_PREEMPTIBLE)
436 .and_then(|v| {
437 Some(
438 v.coerce(&PrimitiveType::Integer.into())
439 .ok()?
440 .unwrap_integer(),
441 )
442 })
443 .unwrap_or(DEFAULT_TASK_HINT_PREEMPTIBLE)
444}
445
446struct TaskEvaluationContext<'a, 'b> {
448 state: &'a State<'b>,
450 downloader: &'a HttpDownloader,
452 scope: ScopeIndex,
454 work_dir: Option<&'a EvaluationPath>,
458 stdout: Option<&'a Value>,
462 stderr: Option<&'a Value>,
466 inputs: Option<&'a [Input]>,
468 task: bool,
472}
473
474impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
475 pub fn new(state: &'a State<'b>, downloader: &'a HttpDownloader, scope: ScopeIndex) -> Self {
477 Self {
478 state,
479 downloader,
480 scope,
481 work_dir: None,
482 stdout: None,
483 stderr: None,
484 inputs: None,
485 task: false,
486 }
487 }
488
489 pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
491 self.work_dir = Some(work_dir);
492 self
493 }
494
495 pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
497 self.stdout = Some(stdout);
498 self
499 }
500
501 pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
503 self.stderr = Some(stderr);
504 self
505 }
506
507 pub fn with_inputs(mut self, inputs: &'a [Input]) -> Self {
509 self.inputs = Some(inputs);
510 self
511 }
512
513 pub fn with_task(mut self) -> Self {
517 self.task = true;
518 self
519 }
520}
521
522impl EvaluationContext for TaskEvaluationContext<'_, '_> {
523 fn version(&self) -> SupportedVersion {
524 self.state
525 .document
526 .version()
527 .expect("document should have a version")
528 }
529
530 fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
531 ScopeRef::new(&self.state.scopes, self.scope)
532 .lookup(name)
533 .cloned()
534 .ok_or_else(|| unknown_name(name, span))
535 }
536
537 fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
538 crate::resolve_type_name(self.state.document, name, span)
539 }
540
541 fn work_dir(&self) -> Option<&EvaluationPath> {
542 self.work_dir
543 }
544
545 fn temp_dir(&self) -> &Path {
546 self.state.temp_dir
547 }
548
549 fn stdout(&self) -> Option<&Value> {
550 self.stdout
551 }
552
553 fn stderr(&self) -> Option<&Value> {
554 self.stderr
555 }
556
557 fn task(&self) -> Option<&Task> {
558 if self.task {
559 Some(self.state.task)
560 } else {
561 None
562 }
563 }
564
565 fn translate_path(&self, path: &str) -> Option<Cow<'_, Path>> {
566 let inputs = self.inputs?;
567 let is_url = path::is_url(path);
568
569 if !is_url && Path::new(path).is_relative() {
571 return None;
572 }
573
574 let (guest_path, stripped) = inputs
576 .iter()
577 .filter_map(|i| {
578 match i.path() {
579 EvaluationPath::Local(base) if !is_url => {
580 let stripped = Path::new(path).strip_prefix(base).ok()?;
581 Some((i.guest_path()?, stripped.to_str()?))
582 }
583 EvaluationPath::Remote(url) if is_url => {
584 let url = url.as_str();
585 let stripped = path.strip_prefix(url.strip_suffix('/').unwrap_or(url))?;
586
587 let stripped = if let Some(pos) = stripped.find('?') {
589 &stripped[..pos]
590 } else if let Some(pos) = stripped.find('#') {
591 &stripped[..pos]
592 } else {
593 stripped.strip_prefix('/').unwrap_or(stripped)
594 };
595
596 Some((i.guest_path()?, stripped))
597 }
598 _ => None,
599 }
600 })
601 .min_by(|(_, a), (_, b)| a.len().cmp(&b.len()))?;
602
603 if stripped.is_empty() {
604 return Some(Path::new(guest_path).into());
605 }
606
607 Some(Path::new(guest_path).join(stripped).into())
608 }
609
610 fn downloader(&self) -> &dyn Downloader {
611 self.downloader
612 }
613}
614
615struct State<'a> {
617 temp_dir: &'a Path,
619 document: &'a Document,
621 task: &'a Task,
623 scopes: [Scope; 3],
629 env: IndexMap<String, String>,
633}
634
635impl<'a> State<'a> {
636 fn new(temp_dir: &'a Path, document: &'a Document, task: &'a Task) -> Result<Self> {
638 let scopes = [
647 Scope::default(),
648 Scope::new(ROOT_SCOPE_INDEX),
649 Scope::new(OUTPUT_SCOPE_INDEX),
650 ];
651
652 Ok(Self {
653 temp_dir,
654 document,
655 task,
656 scopes,
657 env: Default::default(),
658 })
659 }
660}
661
662struct EvaluatedSections {
664 command: String,
666 requirements: Arc<HashMap<String, Value>>,
668 hints: Arc<HashMap<String, Value>>,
670 inputs: Vec<Input>,
672}
673
674pub struct TaskEvaluator {
676 config: Arc<Config>,
678 backend: Arc<dyn TaskExecutionBackend>,
680 token: CancellationToken,
682 downloader: HttpDownloader,
684}
685
686impl TaskEvaluator {
687 pub async fn new(config: Config, token: CancellationToken) -> Result<Self> {
694 let backend = config.create_backend().await?;
695 Self::new_with_backend(config, backend, token)
696 }
697
698 pub fn new_with_backend(
703 config: Config,
704 backend: Arc<dyn TaskExecutionBackend>,
705 token: CancellationToken,
706 ) -> Result<Self> {
707 config.validate()?;
708
709 let config = Arc::new(config);
710 let downloader = HttpDownloader::new(config.clone())?;
711
712 Ok(Self {
713 config,
714 backend,
715 token,
716 downloader,
717 })
718 }
719
720 pub(crate) fn new_unchecked(
725 config: Arc<Config>,
726 backend: Arc<dyn TaskExecutionBackend>,
727 token: CancellationToken,
728 downloader: HttpDownloader,
729 ) -> Self {
730 Self {
731 config,
732 backend,
733 token,
734 downloader,
735 }
736 }
737
738 pub async fn evaluate<P, R>(
742 &self,
743 document: &Document,
744 task: &Task,
745 inputs: &TaskInputs,
746 root: impl AsRef<Path>,
747 progress: P,
748 ) -> EvaluationResult<EvaluatedTask>
749 where
750 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
751 R: Future<Output = ()> + Send,
752 {
753 self.evaluate_with_progress(
754 document,
755 task,
756 inputs,
757 root.as_ref(),
758 task.name(),
759 Arc::new(progress),
760 )
761 .await
762 }
763
764 pub(crate) async fn evaluate_with_progress<P, R>(
766 &self,
767 document: &Document,
768 task: &Task,
769 inputs: &TaskInputs,
770 root: &Path,
771 id: &str,
772 progress: Arc<P>,
773 ) -> EvaluationResult<EvaluatedTask>
774 where
775 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
776 R: Future<Output = ()> + Send,
777 {
778 if document.has_errors() {
780 return Err(anyhow!("cannot evaluate a document with errors").into());
781 }
782
783 progress(ProgressKind::TaskStarted { id }).await;
784
785 let result = self
786 .perform_evaluation(document, task, inputs, root, id, progress.clone())
787 .await;
788
789 progress(ProgressKind::TaskCompleted {
790 id,
791 result: &result,
792 })
793 .await;
794
795 result
796 }
797
798 async fn perform_evaluation<P, R>(
800 &self,
801 document: &Document,
802 task: &Task,
803 inputs: &TaskInputs,
804 root: &Path,
805 id: &str,
806 progress: Arc<P>,
807 ) -> EvaluationResult<EvaluatedTask>
808 where
809 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
810 R: Future<Output = ()> + Send,
811 {
812 inputs.validate(document, task, None).with_context(|| {
813 format!(
814 "failed to validate the inputs to task `{task}`",
815 task = task.name()
816 )
817 })?;
818
819 let ast = match document.root().morph().ast() {
820 Ast::V1(ast) => ast,
821 _ => {
822 return Err(
823 anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
824 );
825 }
826 };
827
828 let definition = ast
830 .tasks()
831 .find(|t| t.name().text() == task.name())
832 .expect("task should exist in the AST");
833
834 let version = document.version().expect("document should have version");
835
836 let mut diagnostics = Vec::new();
838 let graph = TaskGraphBuilder::default().build(version, &definition, &mut diagnostics);
839 assert!(
840 diagnostics.is_empty(),
841 "task evaluation graph should have no diagnostics"
842 );
843
844 debug!(
845 task_id = id,
846 task_name = task.name(),
847 document = document.uri().as_str(),
848 "evaluating task"
849 );
850
851 let root_dir = absolute(root).with_context(|| {
852 format!(
853 "failed to determine absolute path of `{path}`",
854 path = root.display()
855 )
856 })?;
857
858 let temp_dir = root_dir.join("tmp");
860 fs::create_dir_all(&temp_dir).with_context(|| {
861 format!(
862 "failed to create directory `{path}`",
863 path = temp_dir.display()
864 )
865 })?;
866
867 write_json_file(root_dir.join(INPUTS_FILE), inputs)?;
869
870 let mut state = State::new(&temp_dir, document, task)?;
871 let nodes = toposort(&graph, None).expect("graph should be acyclic");
872 let mut current = 0;
873 while current < nodes.len() {
874 match &graph[nodes[current]] {
875 TaskGraphNode::Input(decl) => {
876 self.evaluate_input(id, &mut state, decl, inputs)
877 .await
878 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
879 }
880 TaskGraphNode::Decl(decl) => {
881 self.evaluate_decl(id, &mut state, decl)
882 .await
883 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
884 }
885 TaskGraphNode::Output(_) => {
886 break;
888 }
889 TaskGraphNode::Command(_)
890 | TaskGraphNode::Runtime(_)
891 | TaskGraphNode::Requirements(_)
892 | TaskGraphNode::Hints(_) => {
893 }
896 }
897
898 current += 1;
899 }
900
901 let env = Arc::new(mem::take(&mut state.env));
905
906 let mut attempt = 0;
908 let mut evaluated = loop {
909 let EvaluatedSections {
910 command,
911 requirements,
912 hints,
913 inputs,
914 } = self
915 .evaluate_sections(id, &mut state, &definition, inputs, attempt)
916 .await?;
917
918 let max_retries = requirements
921 .get(TASK_REQUIREMENT_MAX_RETRIES)
922 .or_else(|| requirements.get(TASK_REQUIREMENT_MAX_RETRIES_ALIAS))
923 .cloned()
924 .map(|v| v.unwrap_integer() as u64)
925 .or_else(|| self.config.task.retries)
926 .unwrap_or(DEFAULT_TASK_REQUIREMENT_MAX_RETRIES);
927
928 if max_retries > MAX_RETRIES {
929 return Err(anyhow!(
930 "task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
931 )
932 .into());
933 }
934
935 let mut attempt_dir = root_dir.clone();
936 attempt_dir.push("attempts");
937 attempt_dir.push(attempt.to_string());
938
939 let request = TaskSpawnRequest::new(
940 id.to_string(),
941 TaskSpawnInfo::new(
942 command,
943 inputs,
944 requirements.clone(),
945 hints.clone(),
946 env.clone(),
947 ),
948 attempt,
949 attempt_dir.clone(),
950 );
951
952 let events = self
953 .backend
954 .spawn(request, self.token.clone())
955 .with_context(|| {
956 format!(
957 "failed to spawn task `{name}` in `{path}` (task id `{id}`)",
958 name = task.name(),
959 path = document.path(),
960 )
961 })?;
962
963 if attempt > 0 {
964 progress(ProgressKind::TaskRetried {
965 id,
966 retry: attempt - 1,
967 });
968 }
969
970 events.spawned.await.ok();
972
973 progress(ProgressKind::TaskExecutionStarted { id });
974
975 let result = events
976 .completed
977 .await
978 .expect("failed to receive response from spawned task");
979
980 progress(ProgressKind::TaskExecutionCompleted {
981 id,
982 result: &result,
983 });
984
985 let result = result.map_err(|e| {
986 EvaluationError::new(
987 state.document.clone(),
988 task_execution_failed(e, task.name(), id, task.name_span()),
989 )
990 })?;
991
992 let evaluated = EvaluatedTask::new(attempt_dir, result)?;
994 if version >= SupportedVersion::V1(V1::Two) {
995 let task = state.scopes[TASK_SCOPE_INDEX.0]
996 .get_mut(TASK_VAR_NAME)
997 .unwrap()
998 .as_task_mut()
999 .unwrap();
1000
1001 task.set_attempt(attempt.try_into().with_context(|| {
1002 format!(
1003 "too many attempts were made to run task `{task}`",
1004 task = state.task.name()
1005 )
1006 })?);
1007 task.set_return_code(evaluated.result.exit_code);
1008 }
1009
1010 if let Err(e) = evaluated.handle_exit(&requirements, &self.downloader).await {
1011 if attempt >= max_retries {
1012 return Err(EvaluationError::new(
1013 state.document.clone(),
1014 task_execution_failed(e, task.name(), id, task.name_span()),
1015 ));
1016 }
1017
1018 attempt += 1;
1019
1020 info!(
1021 "retrying execution of task `{name}` (retry {attempt})",
1022 name = state.task.name()
1023 );
1024 continue;
1025 }
1026
1027 break evaluated;
1028 };
1029
1030 for index in &nodes[current..] {
1032 match &graph[*index] {
1033 TaskGraphNode::Decl(decl) => {
1034 self.evaluate_decl(id, &mut state, decl)
1035 .await
1036 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1037 }
1038 TaskGraphNode::Output(decl) => {
1039 self.evaluate_output(id, &mut state, decl, &evaluated)
1040 .await
1041 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1042 }
1043 _ => {
1044 unreachable!(
1045 "only declarations and outputs should be evaluated after the command"
1046 )
1047 }
1048 }
1049 }
1050
1051 let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
1053 drop(state);
1054 if let Some(section) = definition.output() {
1055 let indexes: HashMap<_, _> = section
1056 .declarations()
1057 .enumerate()
1058 .map(|(i, d)| (d.name().hashable(), i))
1059 .collect();
1060 outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
1061 }
1062
1063 write_json_file(root_dir.join(OUTPUTS_FILE), &outputs)?;
1065
1066 evaluated.outputs = Ok(outputs);
1067 Ok(evaluated)
1068 }
1069
1070 async fn evaluate_input(
1072 &self,
1073 id: &str,
1074 state: &mut State<'_>,
1075 decl: &Decl<SyntaxNode>,
1076 inputs: &TaskInputs,
1077 ) -> Result<(), Diagnostic> {
1078 let name = decl.name();
1079 let decl_ty = decl.ty();
1080 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1081
1082 let (value, span) = match inputs.get(name.text()) {
1083 Some(input) => (input.clone(), name.span()),
1084 None => match decl.expr() {
1085 Some(expr) => {
1086 debug!(
1087 task_id = id,
1088 task_name = state.task.name(),
1089 document = state.document.uri().as_str(),
1090 input_name = name.text(),
1091 "evaluating input"
1092 );
1093
1094 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1095 state,
1096 &self.downloader,
1097 ROOT_SCOPE_INDEX,
1098 ));
1099 let value = evaluator.evaluate_expr(&expr).await?;
1100 (value, expr.span())
1101 }
1102 _ => {
1103 assert!(decl.ty().is_optional(), "type should be optional");
1104 (Value::None, name.span())
1105 }
1106 },
1107 };
1108
1109 let value = value
1110 .coerce(&ty)
1111 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), span))?;
1112 state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1113
1114 if decl.env().is_some() {
1116 state.env.insert(
1117 name.text().to_string(),
1118 value
1119 .as_primitive()
1120 .expect("value should be primitive")
1121 .raw(None)
1122 .to_string(),
1123 );
1124 }
1125
1126 Ok(())
1127 }
1128
1129 async fn evaluate_decl(
1131 &self,
1132 id: &str,
1133 state: &mut State<'_>,
1134 decl: &Decl<SyntaxNode>,
1135 ) -> Result<(), Diagnostic> {
1136 let name = decl.name();
1137 debug!(
1138 task_id = id,
1139 task_name = state.task.name(),
1140 document = state.document.uri().as_str(),
1141 decl_name = name.text(),
1142 "evaluating private declaration",
1143 );
1144
1145 let decl_ty = decl.ty();
1146 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1147
1148 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1149 state,
1150 &self.downloader,
1151 ROOT_SCOPE_INDEX,
1152 ));
1153
1154 let expr = decl.expr().expect("private decls should have expressions");
1155 let value = evaluator.evaluate_expr(&expr).await?;
1156 let value = value
1157 .coerce(&ty)
1158 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1159 state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1160
1161 if decl.env().is_some() {
1163 state.env.insert(
1164 name.text().to_string(),
1165 value
1166 .as_primitive()
1167 .expect("value should be primitive")
1168 .raw(None)
1169 .to_string(),
1170 );
1171 }
1172
1173 Ok(())
1174 }
1175
1176 async fn evaluate_runtime_section(
1180 &self,
1181 id: &str,
1182 state: &State<'_>,
1183 section: &RuntimeSection<SyntaxNode>,
1184 inputs: &TaskInputs,
1185 ) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
1186 debug!(
1187 task_id = id,
1188 task_name = state.task.name(),
1189 document = state.document.uri().as_str(),
1190 "evaluating runtimes section",
1191 );
1192
1193 let mut requirements = HashMap::new();
1194 let mut hints = HashMap::new();
1195
1196 let version = state
1197 .document
1198 .version()
1199 .expect("document should have version");
1200 for item in section.items() {
1201 let name = item.name();
1202 match inputs.requirement(name.text()) {
1203 Some(value) => {
1204 requirements.insert(name.text().to_string(), value.clone());
1205 continue;
1206 }
1207 _ => {
1208 if let Some(value) = inputs.hint(name.text()) {
1209 hints.insert(name.text().to_string(), value.clone());
1210 continue;
1211 }
1212 }
1213 }
1214
1215 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1216 state,
1217 &self.downloader,
1218 ROOT_SCOPE_INDEX,
1219 ));
1220
1221 let (types, requirement) = match task_requirement_types(version, name.text()) {
1222 Some(types) => (Some(types), true),
1223 None => match task_hint_types(version, name.text(), false) {
1224 Some(types) => (Some(types), false),
1225 None => (None, false),
1226 },
1227 };
1228
1229 let expr = item.expr();
1231 let mut value = evaluator.evaluate_expr(&expr).await?;
1232 if let Some(types) = types {
1233 value = types
1234 .iter()
1235 .find_map(|ty| value.coerce(ty).ok())
1236 .ok_or_else(|| {
1237 multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1238 })?;
1239 }
1240
1241 if requirement {
1242 requirements.insert(name.text().to_string(), value);
1243 } else {
1244 hints.insert(name.text().to_string(), value);
1245 }
1246 }
1247
1248 Ok((requirements, hints))
1249 }
1250
1251 async fn evaluate_requirements_section(
1253 &self,
1254 id: &str,
1255 state: &State<'_>,
1256 section: &RequirementsSection<SyntaxNode>,
1257 inputs: &TaskInputs,
1258 ) -> Result<HashMap<String, Value>, Diagnostic> {
1259 debug!(
1260 task_id = id,
1261 task_name = state.task.name(),
1262 document = state.document.uri().as_str(),
1263 "evaluating requirements",
1264 );
1265
1266 let mut requirements = HashMap::new();
1267
1268 let version = state
1269 .document
1270 .version()
1271 .expect("document should have version");
1272 for item in section.items() {
1273 let name = item.name();
1274 if let Some(value) = inputs.requirement(name.text()) {
1275 requirements.insert(name.text().to_string(), value.clone());
1276 continue;
1277 }
1278
1279 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1280 state,
1281 &self.downloader,
1282 ROOT_SCOPE_INDEX,
1283 ));
1284
1285 let types =
1286 task_requirement_types(version, name.text()).expect("requirement should be known");
1287
1288 let expr = item.expr();
1290 let value = evaluator.evaluate_expr(&expr).await?;
1291 let value = types
1292 .iter()
1293 .find_map(|ty| value.coerce(ty).ok())
1294 .ok_or_else(|| {
1295 multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1296 })?;
1297
1298 requirements.insert(name.text().to_string(), value);
1299 }
1300
1301 Ok(requirements)
1302 }
1303
1304 async fn evaluate_hints_section(
1306 &self,
1307 id: &str,
1308 state: &State<'_>,
1309 section: &TaskHintsSection<SyntaxNode>,
1310 inputs: &TaskInputs,
1311 ) -> Result<HashMap<String, Value>, Diagnostic> {
1312 debug!(
1313 task_id = id,
1314 task_name = state.task.name(),
1315 document = state.document.uri().as_str(),
1316 "evaluating hints section",
1317 );
1318
1319 let mut hints = HashMap::new();
1320
1321 for item in section.items() {
1322 let name = item.name();
1323 if let Some(value) = inputs.hint(name.text()) {
1324 hints.insert(name.text().to_string(), value.clone());
1325 continue;
1326 }
1327
1328 let mut evaluator = ExprEvaluator::new(
1329 TaskEvaluationContext::new(state, &self.downloader, ROOT_SCOPE_INDEX).with_task(),
1330 );
1331
1332 let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
1333 hints.insert(name.text().to_string(), value);
1334 }
1335
1336 Ok(hints)
1337 }
1338
1339 async fn evaluate_command(
1344 &self,
1345 id: &str,
1346 state: &State<'_>,
1347 section: &CommandSection<SyntaxNode>,
1348 ) -> EvaluationResult<(String, Vec<Input>)> {
1349 debug!(
1350 task_id = id,
1351 task_name = state.task.name(),
1352 document = state.document.uri().as_str(),
1353 "evaluating command section",
1354 );
1355
1356 let mut inputs = Vec::new();
1358
1359 ScopeRef::new(&state.scopes, TASK_SCOPE_INDEX.0).for_each(|_, v| {
1361 v.visit_paths(false, &mut |_, value| {
1362 inputs.push(Input::from_primitive(value)?);
1363 Ok(())
1364 })
1365 })?;
1366
1367 inputs.push(Input::new(
1369 InputKind::Directory,
1370 EvaluationPath::Local(state.temp_dir.to_path_buf()),
1371 ));
1372
1373 self.backend
1375 .localize_inputs(&self.downloader, &mut inputs)
1376 .await
1377 .map_err(|e| {
1378 EvaluationError::new(
1379 state.document.clone(),
1380 task_localization_failed(e, state.task.name(), state.task.name_span()),
1381 )
1382 })?;
1383
1384 if enabled!(Level::DEBUG) {
1385 for input in inputs.iter() {
1386 if let Some(location) = input.location() {
1387 debug!(
1388 task_id = id,
1389 task_name = state.task.name(),
1390 document = state.document.uri().as_str(),
1391 "task input `{path}` (downloaded to `{location}`) mapped to `{guest_path}`",
1392 path = input.path().display(),
1393 location = location.display(),
1394 guest_path = input.guest_path().unwrap_or(""),
1395 );
1396 } else {
1397 debug!(
1398 task_id = id,
1399 task_name = state.task.name(),
1400 document = state.document.uri().as_str(),
1401 "task input `{path}` mapped to `{guest_path}`",
1402 path = input.path().display(),
1403 guest_path = input.guest_path().unwrap_or(""),
1404 );
1405 }
1406 }
1407 }
1408
1409 let mut command = String::new();
1410 match section.strip_whitespace() {
1411 Some(parts) => {
1412 let mut evaluator = ExprEvaluator::new(
1413 TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1414 .with_inputs(&inputs),
1415 );
1416
1417 for part in parts {
1418 match part {
1419 StrippedCommandPart::Text(t) => {
1420 command.push_str(t.as_str());
1421 }
1422 StrippedCommandPart::Placeholder(placeholder) => {
1423 evaluator
1424 .evaluate_placeholder(&placeholder, &mut command)
1425 .await
1426 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1427 }
1428 }
1429 }
1430 }
1431 _ => {
1432 warn!(
1433 "command for task `{task}` in `{uri}` has mixed indentation; whitespace \
1434 stripping was skipped",
1435 task = state.task.name(),
1436 uri = state.document.uri(),
1437 );
1438
1439 let mut evaluator = ExprEvaluator::new(
1440 TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1441 .with_inputs(&inputs),
1442 );
1443
1444 let heredoc = section.is_heredoc();
1445 for part in section.parts() {
1446 match part {
1447 CommandPart::Text(t) => {
1448 t.unescape_to(heredoc, &mut command);
1449 }
1450 CommandPart::Placeholder(placeholder) => {
1451 evaluator
1452 .evaluate_placeholder(&placeholder, &mut command)
1453 .await
1454 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1455 }
1456 }
1457 }
1458 }
1459 }
1460
1461 Ok((command, inputs))
1462 }
1463
1464 async fn evaluate_sections(
1472 &self,
1473 id: &str,
1474 state: &mut State<'_>,
1475 definition: &TaskDefinition<SyntaxNode>,
1476 inputs: &TaskInputs,
1477 attempt: u64,
1478 ) -> EvaluationResult<EvaluatedSections> {
1479 let (requirements, hints) = match definition.runtime() {
1481 Some(section) => self
1482 .evaluate_runtime_section(id, state, §ion, inputs)
1483 .await
1484 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1485 _ => (
1486 match definition.requirements() {
1487 Some(section) => self
1488 .evaluate_requirements_section(id, state, §ion, inputs)
1489 .await
1490 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1491 None => Default::default(),
1492 },
1493 match definition.hints() {
1494 Some(section) => self
1495 .evaluate_hints_section(id, state, §ion, inputs)
1496 .await
1497 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1498 None => Default::default(),
1499 },
1500 ),
1501 };
1502
1503 if state.document.version() >= Some(SupportedVersion::V1(V1::Two)) {
1507 let constraints = self
1509 .backend
1510 .constraints(&requirements, &hints)
1511 .with_context(|| {
1512 format!(
1513 "failed to get constraints for task `{task}`",
1514 task = state.task.name()
1515 )
1516 })?;
1517
1518 let task = TaskValue::new_v1(
1519 state.task.name(),
1520 id,
1521 definition,
1522 constraints,
1523 attempt.try_into().with_context(|| {
1524 format!(
1525 "too many attempts were made to run task `{task}`",
1526 task = state.task.name()
1527 )
1528 })?,
1529 );
1530
1531 let scope = &mut state.scopes[TASK_SCOPE_INDEX.0];
1532 if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1533 *v = Value::Task(task);
1534 } else {
1535 scope.insert(TASK_VAR_NAME, Value::Task(task));
1536 }
1537 }
1538
1539 let (command, inputs) = self
1540 .evaluate_command(
1541 id,
1542 state,
1543 &definition.command().expect("must have command section"),
1544 )
1545 .await?;
1546
1547 Ok(EvaluatedSections {
1548 command,
1549 requirements: Arc::new(requirements),
1550 hints: Arc::new(hints),
1551 inputs,
1552 })
1553 }
1554
1555 async fn evaluate_output(
1557 &self,
1558 id: &str,
1559 state: &mut State<'_>,
1560 decl: &Decl<SyntaxNode>,
1561 evaluated: &EvaluatedTask,
1562 ) -> Result<(), Diagnostic> {
1563 let name = decl.name();
1564 debug!(
1565 task_id = id,
1566 task_name = state.task.name(),
1567 document = state.document.uri().as_str(),
1568 output_name = name.text(),
1569 "evaluating output",
1570 );
1571
1572 let decl_ty = decl.ty();
1573 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1574 let mut evaluator = ExprEvaluator::new(
1575 TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1576 .with_work_dir(&evaluated.result.work_dir)
1577 .with_stdout(&evaluated.result.stdout)
1578 .with_stderr(&evaluated.result.stderr),
1579 );
1580
1581 let expr = decl.expr().expect("outputs should have expressions");
1582 let value = evaluator.evaluate_expr(&expr).await?;
1583
1584 let mut value = value
1586 .coerce(&ty)
1587 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1588
1589 let result = if let Some(guest_work_dir) = self.backend.guest_work_dir() {
1590 value.visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1592 let path = match value {
1593 PrimitiveValue::File(path) => path,
1594 PrimitiveValue::Directory(path) => path,
1595 _ => unreachable!("only file and directory values should be visited"),
1596 };
1597
1598 if !Path::new(path.as_str()).starts_with(state.temp_dir)
1601 && !Path::new(path.as_str()).starts_with(evaluated.attempt_dir())
1602 {
1603 let guest = if path::is_file_url(path) {
1605 path::parse_url(path)
1606 .and_then(|u| u.to_file_path().ok())
1607 .ok_or_else(|| anyhow!("guest path `{path}` is not a valid file URI"))?
1608 } else if path::is_url(path) {
1609 return Ok(true);
1612 } else {
1613 guest_work_dir.join(path.as_str())
1615 };
1616
1617 let host = if let Ok(stripped) = guest.strip_prefix(guest_work_dir) {
1620 Cow::Owned(
1621 evaluated.result.work_dir.join(
1622 stripped.to_str().with_context(|| {
1623 format!("output path `{path}` is not UTF-8")
1624 })?,
1625 )?,
1626 )
1627 } else {
1628 evaluated
1629 .inputs()
1630 .iter()
1631 .filter_map(|i| {
1632 Some((i.path(), guest.strip_prefix(i.guest_path()?).ok()?))
1633 })
1634 .min_by(|(_, a), (_, b)| a.as_os_str().len().cmp(&b.as_os_str().len()))
1635 .and_then(|(path, stripped)| {
1636 if stripped.as_os_str().is_empty() {
1637 return Some(Cow::Borrowed(path));
1638 }
1639
1640 Some(Cow::Owned(path.join(stripped.to_str()?).ok()?))
1641 })
1642 .ok_or_else(|| {
1643 anyhow!("guest path `{path}` is not within a container mount")
1644 })?
1645 };
1646
1647 *Arc::make_mut(path) = host.into_owned().try_into()?;
1649 }
1650
1651 value.ensure_path_exists(optional)
1653 })
1654 } else {
1655 value.visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1657 if let Some(work_dir) = evaluated.result.work_dir.as_local() {
1658 value.join_path_to(work_dir);
1659 }
1660
1661 value.ensure_path_exists(optional)
1662 })
1663 };
1664
1665 result.map_err(|e| {
1666 output_evaluation_failed(e, state.task.name(), true, name.text(), name.span())
1667 })?;
1668
1669 state.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
1670 Ok(())
1671 }
1672}