1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::future::Future;
7use std::mem;
8use std::path::Path;
9use std::path::absolute;
10use std::str::FromStr;
11use std::sync::Arc;
12
13use anyhow::Context;
14use anyhow::Result;
15use anyhow::anyhow;
16use anyhow::bail;
17use indexmap::IndexMap;
18use petgraph::algo::toposort;
19use tokio_util::sync::CancellationToken;
20use tracing::Level;
21use tracing::debug;
22use tracing::enabled;
23use tracing::info;
24use tracing::warn;
25use wdl_analysis::Document;
26use wdl_analysis::diagnostics::multiple_type_mismatch;
27use wdl_analysis::diagnostics::unknown_name;
28use wdl_analysis::document::TASK_VAR_NAME;
29use wdl_analysis::document::Task;
30use wdl_analysis::eval::v1::TaskGraphBuilder;
31use wdl_analysis::eval::v1::TaskGraphNode;
32use wdl_analysis::types::Optional;
33use wdl_analysis::types::PrimitiveType;
34use wdl_analysis::types::Type;
35use wdl_analysis::types::v1::task_hint_types;
36use wdl_analysis::types::v1::task_requirement_types;
37use wdl_ast::Ast;
38use wdl_ast::AstNode;
39use wdl_ast::AstToken;
40use wdl_ast::Diagnostic;
41use wdl_ast::Span;
42use wdl_ast::SupportedVersion;
43use wdl_ast::v1::CommandPart;
44use wdl_ast::v1::CommandSection;
45use wdl_ast::v1::Decl;
46use wdl_ast::v1::RequirementsSection;
47use wdl_ast::v1::RuntimeSection;
48use wdl_ast::v1::StrippedCommandPart;
49use wdl_ast::v1::TASK_HINT_DISKS;
50use wdl_ast::v1::TASK_HINT_MAX_CPU;
51use wdl_ast::v1::TASK_HINT_MAX_CPU_ALIAS;
52use wdl_ast::v1::TASK_HINT_MAX_MEMORY;
53use wdl_ast::v1::TASK_HINT_MAX_MEMORY_ALIAS;
54use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER;
55use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER_ALIAS;
56use wdl_ast::v1::TASK_REQUIREMENT_CPU;
57use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
58use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES;
59use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES_ALIAS;
60use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
61use wdl_ast::v1::TaskDefinition;
62use wdl_ast::v1::TaskHintsSection;
63use wdl_ast::version::V1;
64
65use super::ProgressKind;
66use crate::Coercible;
67use crate::EvaluationContext;
68use crate::EvaluationError;
69use crate::EvaluationResult;
70use crate::Input;
71use crate::InputKind;
72use crate::ONE_GIBIBYTE;
73use crate::Outputs;
74use crate::PrimitiveValue;
75use crate::Scope;
76use crate::ScopeIndex;
77use crate::ScopeRef;
78use crate::StorageUnit;
79use crate::TaskExecutionBackend;
80use crate::TaskInputs;
81use crate::TaskSpawnInfo;
82use crate::TaskSpawnRequest;
83use crate::TaskValue;
84use crate::Value;
85use crate::config::Config;
86use crate::config::MAX_RETRIES;
87use crate::convert_unit_string;
88use crate::diagnostics::output_evaluation_failed;
89use crate::diagnostics::runtime_type_mismatch;
90use crate::diagnostics::task_execution_failed;
91use crate::diagnostics::task_localization_failed;
92use crate::eval::EvaluatedTask;
93use crate::http::Downloader;
94use crate::http::HttpDownloader;
95use crate::path;
96use crate::path::EvaluationPath;
97use crate::tree::SyntaxNode;
98use crate::v1::ExprEvaluator;
99use crate::v1::INPUTS_FILE;
100use crate::v1::OUTPUTS_FILE;
101use crate::v1::write_json_file;
102
103pub const DEFAULT_TASK_REQUIREMENT_CONTAINER: &str = "ubuntu:latest";
105pub const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
107pub const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * (ONE_GIBIBYTE as i64);
109pub const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
111pub const DEFAULT_TASK_REQUIREMENT_DISKS: f64 = 1.0;
113
114const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
116const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
118const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
121
122pub(crate) fn container<'a>(
124 requirements: &'a HashMap<String, Value>,
125 default: Option<&'a str>,
126) -> Cow<'a, str> {
127 requirements
128 .get(TASK_REQUIREMENT_CONTAINER)
129 .or_else(|| requirements.get(TASK_REQUIREMENT_CONTAINER_ALIAS))
130 .and_then(|v| -> Option<Cow<'_, str>> {
131 if let Some(array) = v.as_array() {
135 return array.as_slice().first().map(|v| {
136 v.as_string()
137 .expect("type should be string")
138 .as_ref()
139 .into()
140 });
141 }
142
143 Some(
144 v.coerce(&PrimitiveType::String.into())
145 .expect("type should coerce")
146 .unwrap_string()
147 .as_ref()
148 .clone()
149 .into(),
150 )
151 })
152 .and_then(|v| {
153 if v == "*" { None } else { Some(v) }
155 })
156 .unwrap_or_else(|| {
157 default
158 .map(Into::into)
159 .unwrap_or(DEFAULT_TASK_REQUIREMENT_CONTAINER.into())
160 })
161}
162
163pub(crate) fn cpu(requirements: &HashMap<String, Value>) -> f64 {
165 requirements
166 .get(TASK_REQUIREMENT_CPU)
167 .map(|v| {
168 v.coerce(&PrimitiveType::Float.into())
169 .expect("type should coerce")
170 .unwrap_float()
171 })
172 .unwrap_or(DEFAULT_TASK_REQUIREMENT_CPU)
173}
174
175pub(crate) fn max_cpu(hints: &HashMap<String, Value>) -> Option<f64> {
177 hints
178 .get(TASK_HINT_MAX_CPU)
179 .or_else(|| hints.get(TASK_HINT_MAX_CPU_ALIAS))
180 .map(|v| {
181 v.coerce(&PrimitiveType::Float.into())
182 .expect("type should coerce")
183 .unwrap_float()
184 })
185}
186
187pub(crate) fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
189 Ok(requirements
190 .get(TASK_REQUIREMENT_MEMORY)
191 .map(|v| {
192 if let Some(v) = v.as_integer() {
193 return Ok(v);
194 }
195
196 if let Some(s) = v.as_string() {
197 return convert_unit_string(s)
198 .and_then(|v| v.try_into().ok())
199 .with_context(|| {
200 format!("task specifies an invalid `memory` requirement `{s}`")
201 });
202 }
203
204 unreachable!("value should be an integer or string");
205 })
206 .transpose()?
207 .unwrap_or(DEFAULT_TASK_REQUIREMENT_MEMORY))
208}
209
210pub(crate) fn max_memory(hints: &HashMap<String, Value>) -> Result<Option<i64>> {
212 hints
213 .get(TASK_HINT_MAX_MEMORY)
214 .or_else(|| hints.get(TASK_HINT_MAX_MEMORY_ALIAS))
215 .map(|v| {
216 if let Some(v) = v.as_integer() {
217 return Ok(v);
218 }
219
220 if let Some(s) = v.as_string() {
221 return convert_unit_string(s)
222 .and_then(|v| v.try_into().ok())
223 .with_context(|| {
224 format!("task specifies an invalid `memory` requirement `{s}`")
225 });
226 }
227
228 unreachable!("value should be an integer or string");
229 })
230 .transpose()
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
237pub enum DiskType {
238 SSD,
240 HDD,
242}
243
244impl FromStr for DiskType {
245 type Err = ();
246
247 fn from_str(s: &str) -> Result<Self, Self::Err> {
248 match s {
249 "SSD" => Ok(Self::SSD),
250 "HDD" => Ok(Self::HDD),
251 _ => Err(()),
252 }
253 }
254}
255
256pub struct DiskRequirement {
258 pub size: i64,
260
261 pub ty: Option<DiskType>,
263}
264
265pub(crate) fn disks<'a>(
269 requirements: &'a HashMap<String, Value>,
270 hints: &HashMap<String, Value>,
271) -> Result<HashMap<&'a str, DiskRequirement>> {
272 fn lookup_type(mount_point: Option<&str>, hints: &HashMap<String, Value>) -> Option<DiskType> {
276 hints.get(TASK_HINT_DISKS).and_then(|v| {
277 if let Some(ty) = v.as_string() {
278 return ty.parse().ok();
279 }
280
281 if let Some(map) = v.as_map() {
282 if let Some((_, v)) = map.iter().find(|(k, _)| match (k, mount_point) {
285 (None, None) => true,
286 (None, Some(_)) | (Some(_), None) => false,
287 (Some(k), Some(mount_point)) => k
288 .as_string()
289 .map(|k| k.as_str() == mount_point)
290 .unwrap_or(false),
291 }) {
292 return v.as_string().and_then(|ty| ty.parse().ok());
293 }
294 }
295
296 None
297 })
298 }
299
300 fn parse_disk_spec(spec: &str) -> Option<(i64, Option<&str>)> {
303 let iter = spec.split_whitespace();
304 let mut first = None;
305 let mut second = None;
306 let mut third = None;
307
308 for part in iter {
309 if first.is_none() {
310 first = Some(part);
311 continue;
312 }
313
314 if second.is_none() {
315 second = Some(part);
316 continue;
317 }
318
319 if third.is_none() {
320 third = Some(part);
321 continue;
322 }
323
324 return None;
325 }
326
327 match (first, second, third) {
328 (None, None, None) => None,
329 (Some(size), None, None) => {
330 Some((size.parse().ok()?, None))
332 }
333 (Some(first), Some(second), None) => {
334 if let Ok(size) = first.parse() {
336 let unit: StorageUnit = second.parse().ok()?;
337 let size = unit.bytes(size)? / (ONE_GIBIBYTE as u64);
338 return Some((size.try_into().ok()?, None));
339 }
340
341 if !first.starts_with('/') {
344 return None;
345 }
346
347 Some((second.parse().ok()?, Some(first)))
348 }
349 (Some(mount_point), Some(size), Some(unit)) => {
350 let unit: StorageUnit = unit.parse().ok()?;
352 let size = unit.bytes(size.parse().ok()?)? / (ONE_GIBIBYTE as u64);
353
354 if !mount_point.starts_with('/') {
356 return None;
357 }
358
359 Some((size.try_into().ok()?, Some(mount_point)))
360 }
361 _ => unreachable!("should have one, two, or three values"),
362 }
363 }
364
365 fn insert_disk<'a>(
367 spec: &'a str,
368 hints: &HashMap<String, Value>,
369 disks: &mut HashMap<&'a str, DiskRequirement>,
370 ) -> Result<()> {
371 let (size, mount_point) =
372 parse_disk_spec(spec).with_context(|| format!("invalid disk specification `{spec}"))?;
373
374 let prev = disks.insert(
375 mount_point.unwrap_or("/"),
376 DiskRequirement {
377 size,
378 ty: lookup_type(mount_point, hints),
379 },
380 );
381
382 if prev.is_some() {
383 bail!(
384 "duplicate mount point `{mp}` specified in `disks` requirement",
385 mp = mount_point.unwrap_or("/")
386 );
387 }
388
389 Ok(())
390 }
391
392 let mut disks = HashMap::new();
393 if let Some(v) = requirements.get(TASK_REQUIREMENT_DISKS) {
394 if let Some(size) = v.as_integer() {
395 if size < 0 {
397 bail!("task requirement `disks` cannot be less than zero");
398 }
399
400 disks.insert(
401 "/",
402 DiskRequirement {
403 size,
404 ty: lookup_type(None, hints),
405 },
406 );
407 } else if let Some(spec) = v.as_string() {
408 insert_disk(spec, hints, &mut disks)?;
409 } else if let Some(v) = v.as_array() {
410 for spec in v.as_slice() {
411 insert_disk(
412 spec.as_string().expect("spec should be a string"),
413 hints,
414 &mut disks,
415 )?;
416 }
417 } else {
418 unreachable!("value should be an integer, string, or array");
419 }
420 }
421
422 Ok(disks)
423}
424
425pub(crate) fn preemptible(hints: &HashMap<String, Value>) -> i64 {
431 const TASK_HINT_PREEMPTIBLE: &str = "preemptible";
432 const DEFAULT_TASK_HINT_PREEMPTIBLE: i64 = 0;
433
434 hints
435 .get(TASK_HINT_PREEMPTIBLE)
436 .and_then(|v| {
437 Some(
438 v.coerce(&PrimitiveType::Integer.into())
439 .ok()?
440 .unwrap_integer(),
441 )
442 })
443 .unwrap_or(DEFAULT_TASK_HINT_PREEMPTIBLE)
444}
445
446struct TaskEvaluationContext<'a, 'b> {
448 state: &'a State<'b>,
450 downloader: &'a HttpDownloader,
452 scope: ScopeIndex,
454 work_dir: Option<&'a EvaluationPath>,
458 stdout: Option<&'a Value>,
462 stderr: Option<&'a Value>,
466 inputs: Option<&'a [Input]>,
468 task: bool,
472}
473
474impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
475 pub fn new(state: &'a State<'b>, downloader: &'a HttpDownloader, scope: ScopeIndex) -> Self {
477 Self {
478 state,
479 downloader,
480 scope,
481 work_dir: None,
482 stdout: None,
483 stderr: None,
484 inputs: None,
485 task: false,
486 }
487 }
488
489 pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
491 self.work_dir = Some(work_dir);
492 self
493 }
494
495 pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
497 self.stdout = Some(stdout);
498 self
499 }
500
501 pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
503 self.stderr = Some(stderr);
504 self
505 }
506
507 pub fn with_inputs(mut self, inputs: &'a [Input]) -> Self {
509 self.inputs = Some(inputs);
510 self
511 }
512
513 pub fn with_task(mut self) -> Self {
517 self.task = true;
518 self
519 }
520}
521
522impl EvaluationContext for TaskEvaluationContext<'_, '_> {
523 fn version(&self) -> SupportedVersion {
524 self.state
525 .document
526 .version()
527 .expect("document should have a version")
528 }
529
530 fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
531 ScopeRef::new(&self.state.scopes, self.scope)
532 .lookup(name)
533 .cloned()
534 .ok_or_else(|| unknown_name(name, span))
535 }
536
537 fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
538 crate::resolve_type_name(self.state.document, name, span)
539 }
540
541 fn work_dir(&self) -> Option<&EvaluationPath> {
542 self.work_dir
543 }
544
545 fn temp_dir(&self) -> &Path {
546 self.state.temp_dir
547 }
548
549 fn stdout(&self) -> Option<&Value> {
550 self.stdout
551 }
552
553 fn stderr(&self) -> Option<&Value> {
554 self.stderr
555 }
556
557 fn task(&self) -> Option<&Task> {
558 if self.task {
559 Some(self.state.task)
560 } else {
561 None
562 }
563 }
564
565 fn translate_path(&self, path: &str) -> Option<Cow<'_, Path>> {
566 let inputs = self.inputs?;
567 let is_url = path::is_url(path);
568
569 if !is_url && Path::new(path).is_relative() {
571 return None;
572 }
573
574 let (guest_path, stripped) = inputs
576 .iter()
577 .filter_map(|i| {
578 match i.path() {
579 EvaluationPath::Local(base) if !is_url => {
580 let stripped = Path::new(path).strip_prefix(base).ok()?;
581 Some((i.guest_path()?, stripped.to_str()?))
582 }
583 EvaluationPath::Remote(url) if is_url => {
584 let url = url.as_str();
585 let stripped = path.strip_prefix(url.strip_suffix('/').unwrap_or(url))?;
586
587 let stripped = if let Some(pos) = stripped.find('?') {
589 &stripped[..pos]
590 } else if let Some(pos) = stripped.find('#') {
591 &stripped[..pos]
592 } else {
593 stripped.strip_prefix('/').unwrap_or(stripped)
594 };
595
596 Some((i.guest_path()?, stripped))
597 }
598 _ => None,
599 }
600 })
601 .min_by(|(_, a), (_, b)| a.len().cmp(&b.len()))?;
602
603 if stripped.is_empty() {
604 return Some(Path::new(guest_path).into());
605 }
606
607 Some(Path::new(guest_path).join(stripped).into())
608 }
609
610 fn downloader(&self) -> &dyn Downloader {
611 self.downloader
612 }
613}
614
615struct State<'a> {
617 temp_dir: &'a Path,
619 document: &'a Document,
621 task: &'a Task,
623 scopes: [Scope; 3],
629 env: IndexMap<String, String>,
633}
634
635impl<'a> State<'a> {
636 fn new(temp_dir: &'a Path, document: &'a Document, task: &'a Task) -> Result<Self> {
638 let scopes = [
647 Scope::default(),
648 Scope::new(ROOT_SCOPE_INDEX),
649 Scope::new(OUTPUT_SCOPE_INDEX),
650 ];
651
652 Ok(Self {
653 temp_dir,
654 document,
655 task,
656 scopes,
657 env: Default::default(),
658 })
659 }
660}
661
662struct EvaluatedSections {
664 command: String,
666 requirements: Arc<HashMap<String, Value>>,
668 hints: Arc<HashMap<String, Value>>,
670 inputs: Vec<Input>,
672}
673
674pub struct TaskEvaluator {
676 config: Arc<Config>,
678 backend: Arc<dyn TaskExecutionBackend>,
680 token: CancellationToken,
682 downloader: HttpDownloader,
684}
685
686impl TaskEvaluator {
687 pub async fn new(config: Config, token: CancellationToken) -> Result<Self> {
694 config.validate()?;
695
696 let config = Arc::new(config);
697 let backend = config.create_backend().await?;
698 let downloader = HttpDownloader::new(config.clone())?;
699
700 Ok(Self {
701 config,
702 backend,
703 token,
704 downloader,
705 })
706 }
707
708 pub(crate) fn new_unchecked(
713 config: Arc<Config>,
714 backend: Arc<dyn TaskExecutionBackend>,
715 token: CancellationToken,
716 downloader: HttpDownloader,
717 ) -> Self {
718 Self {
719 config,
720 backend,
721 token,
722 downloader,
723 }
724 }
725
726 pub async fn evaluate<P, R>(
730 &self,
731 document: &Document,
732 task: &Task,
733 inputs: &TaskInputs,
734 root: impl AsRef<Path>,
735 progress: P,
736 ) -> EvaluationResult<EvaluatedTask>
737 where
738 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
739 R: Future<Output = ()> + Send,
740 {
741 self.evaluate_with_progress(
742 document,
743 task,
744 inputs,
745 root.as_ref(),
746 task.name(),
747 Arc::new(progress),
748 )
749 .await
750 }
751
752 pub(crate) async fn evaluate_with_progress<P, R>(
754 &self,
755 document: &Document,
756 task: &Task,
757 inputs: &TaskInputs,
758 root: &Path,
759 id: &str,
760 progress: Arc<P>,
761 ) -> EvaluationResult<EvaluatedTask>
762 where
763 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
764 R: Future<Output = ()> + Send,
765 {
766 if document.has_errors() {
768 return Err(anyhow!("cannot evaluate a document with errors").into());
769 }
770
771 progress(ProgressKind::TaskStarted { id }).await;
772
773 let result = self
774 .perform_evaluation(document, task, inputs, root, id, progress.clone())
775 .await;
776
777 progress(ProgressKind::TaskCompleted {
778 id,
779 result: &result,
780 })
781 .await;
782
783 result
784 }
785
786 async fn perform_evaluation<P, R>(
788 &self,
789 document: &Document,
790 task: &Task,
791 inputs: &TaskInputs,
792 root: &Path,
793 id: &str,
794 progress: Arc<P>,
795 ) -> EvaluationResult<EvaluatedTask>
796 where
797 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
798 R: Future<Output = ()> + Send,
799 {
800 inputs.validate(document, task, None).with_context(|| {
801 format!(
802 "failed to validate the inputs to task `{task}`",
803 task = task.name()
804 )
805 })?;
806
807 let ast = match document.root().morph().ast() {
808 Ast::V1(ast) => ast,
809 _ => {
810 return Err(
811 anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
812 );
813 }
814 };
815
816 let definition = ast
818 .tasks()
819 .find(|t| t.name().text() == task.name())
820 .expect("task should exist in the AST");
821
822 let version = document.version().expect("document should have version");
823
824 let mut diagnostics = Vec::new();
826 let graph = TaskGraphBuilder::default().build(version, &definition, &mut diagnostics);
827 assert!(
828 diagnostics.is_empty(),
829 "task evaluation graph should have no diagnostics"
830 );
831
832 debug!(
833 task_id = id,
834 task_name = task.name(),
835 document = document.uri().as_str(),
836 "evaluating task"
837 );
838
839 let root_dir = absolute(root).with_context(|| {
840 format!(
841 "failed to determine absolute path of `{path}`",
842 path = root.display()
843 )
844 })?;
845
846 let temp_dir = root_dir.join("tmp");
848 fs::create_dir_all(&temp_dir).with_context(|| {
849 format!(
850 "failed to create directory `{path}`",
851 path = temp_dir.display()
852 )
853 })?;
854
855 write_json_file(root_dir.join(INPUTS_FILE), inputs)?;
857
858 let mut state = State::new(&temp_dir, document, task)?;
859 let nodes = toposort(&graph, None).expect("graph should be acyclic");
860 let mut current = 0;
861 while current < nodes.len() {
862 match &graph[nodes[current]] {
863 TaskGraphNode::Input(decl) => {
864 self.evaluate_input(id, &mut state, decl, inputs)
865 .await
866 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
867 }
868 TaskGraphNode::Decl(decl) => {
869 self.evaluate_decl(id, &mut state, decl)
870 .await
871 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
872 }
873 TaskGraphNode::Output(_) => {
874 break;
876 }
877 TaskGraphNode::Command(_)
878 | TaskGraphNode::Runtime(_)
879 | TaskGraphNode::Requirements(_)
880 | TaskGraphNode::Hints(_) => {
881 }
884 }
885
886 current += 1;
887 }
888
889 let env = Arc::new(mem::take(&mut state.env));
893
894 let mut attempt = 0;
896 let mut evaluated = loop {
897 let EvaluatedSections {
898 command,
899 requirements,
900 hints,
901 inputs,
902 } = self
903 .evaluate_sections(id, &mut state, &definition, inputs, attempt)
904 .await?;
905
906 let max_retries = requirements
909 .get(TASK_REQUIREMENT_MAX_RETRIES)
910 .or_else(|| requirements.get(TASK_REQUIREMENT_MAX_RETRIES_ALIAS))
911 .cloned()
912 .map(|v| v.unwrap_integer() as u64)
913 .or_else(|| self.config.task.retries)
914 .unwrap_or(DEFAULT_TASK_REQUIREMENT_MAX_RETRIES);
915
916 if max_retries > MAX_RETRIES {
917 return Err(anyhow!(
918 "task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
919 )
920 .into());
921 }
922
923 let mut attempt_dir = root_dir.clone();
924 attempt_dir.push("attempts");
925 attempt_dir.push(attempt.to_string());
926
927 let request = TaskSpawnRequest::new(
928 id.to_string(),
929 TaskSpawnInfo::new(
930 command,
931 inputs,
932 requirements.clone(),
933 hints.clone(),
934 env.clone(),
935 ),
936 attempt,
937 attempt_dir.clone(),
938 );
939
940 let events = self
941 .backend
942 .spawn(request, self.token.clone())
943 .with_context(|| {
944 format!(
945 "failed to spawn task `{name}` in `{path}` (task id `{id}`)",
946 name = task.name(),
947 path = document.path(),
948 )
949 })?;
950
951 if attempt > 0 {
952 progress(ProgressKind::TaskRetried {
953 id,
954 retry: attempt - 1,
955 });
956 }
957
958 events.spawned.await.ok();
960
961 progress(ProgressKind::TaskExecutionStarted { id });
962
963 let result = events
964 .completed
965 .await
966 .expect("failed to receive response from spawned task");
967
968 progress(ProgressKind::TaskExecutionCompleted {
969 id,
970 result: &result,
971 });
972
973 let result = result.map_err(|e| {
974 EvaluationError::new(
975 state.document.clone(),
976 task_execution_failed(e, task.name(), id, task.name_span()),
977 )
978 })?;
979
980 let evaluated = EvaluatedTask::new(attempt_dir, result)?;
982 if version >= SupportedVersion::V1(V1::Two) {
983 let task = state.scopes[TASK_SCOPE_INDEX.0]
984 .get_mut(TASK_VAR_NAME)
985 .unwrap()
986 .as_task_mut()
987 .unwrap();
988
989 task.set_attempt(attempt.try_into().with_context(|| {
990 format!(
991 "too many attempts were made to run task `{task}`",
992 task = state.task.name()
993 )
994 })?);
995 task.set_return_code(evaluated.result.exit_code);
996 }
997
998 if let Err(e) = evaluated.handle_exit(&requirements, &self.downloader).await {
999 if attempt >= max_retries {
1000 return Err(EvaluationError::new(
1001 state.document.clone(),
1002 task_execution_failed(e, task.name(), id, task.name_span()),
1003 ));
1004 }
1005
1006 attempt += 1;
1007
1008 info!(
1009 "retrying execution of task `{name}` (retry {attempt})",
1010 name = state.task.name()
1011 );
1012 continue;
1013 }
1014
1015 break evaluated;
1016 };
1017
1018 for index in &nodes[current..] {
1020 match &graph[*index] {
1021 TaskGraphNode::Decl(decl) => {
1022 self.evaluate_decl(id, &mut state, decl)
1023 .await
1024 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1025 }
1026 TaskGraphNode::Output(decl) => {
1027 self.evaluate_output(id, &mut state, decl, &evaluated)
1028 .await
1029 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1030 }
1031 _ => {
1032 unreachable!(
1033 "only declarations and outputs should be evaluated after the command"
1034 )
1035 }
1036 }
1037 }
1038
1039 let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
1041 drop(state);
1042 if let Some(section) = definition.output() {
1043 let indexes: HashMap<_, _> = section
1044 .declarations()
1045 .enumerate()
1046 .map(|(i, d)| (d.name().hashable(), i))
1047 .collect();
1048 outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
1049 }
1050
1051 write_json_file(root_dir.join(OUTPUTS_FILE), &outputs)?;
1053
1054 evaluated.outputs = Ok(outputs);
1055 Ok(evaluated)
1056 }
1057
1058 async fn evaluate_input(
1060 &self,
1061 id: &str,
1062 state: &mut State<'_>,
1063 decl: &Decl<SyntaxNode>,
1064 inputs: &TaskInputs,
1065 ) -> Result<(), Diagnostic> {
1066 let name = decl.name();
1067 let decl_ty = decl.ty();
1068 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1069
1070 let (value, span) = match inputs.get(name.text()) {
1071 Some(input) => (input.clone(), name.span()),
1072 None => match decl.expr() {
1073 Some(expr) => {
1074 debug!(
1075 task_id = id,
1076 task_name = state.task.name(),
1077 document = state.document.uri().as_str(),
1078 input_name = name.text(),
1079 "evaluating input"
1080 );
1081
1082 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1083 state,
1084 &self.downloader,
1085 ROOT_SCOPE_INDEX,
1086 ));
1087 let value = evaluator.evaluate_expr(&expr).await?;
1088 (value, expr.span())
1089 }
1090 _ => {
1091 assert!(decl.ty().is_optional(), "type should be optional");
1092 (Value::None, name.span())
1093 }
1094 },
1095 };
1096
1097 let value = value
1098 .coerce(&ty)
1099 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), span))?;
1100 state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1101
1102 if decl.env().is_some() {
1104 state.env.insert(
1105 name.text().to_string(),
1106 value
1107 .as_primitive()
1108 .expect("value should be primitive")
1109 .raw(None)
1110 .to_string(),
1111 );
1112 }
1113
1114 Ok(())
1115 }
1116
1117 async fn evaluate_decl(
1119 &self,
1120 id: &str,
1121 state: &mut State<'_>,
1122 decl: &Decl<SyntaxNode>,
1123 ) -> Result<(), Diagnostic> {
1124 let name = decl.name();
1125 debug!(
1126 task_id = id,
1127 task_name = state.task.name(),
1128 document = state.document.uri().as_str(),
1129 decl_name = name.text(),
1130 "evaluating private declaration",
1131 );
1132
1133 let decl_ty = decl.ty();
1134 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1135
1136 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1137 state,
1138 &self.downloader,
1139 ROOT_SCOPE_INDEX,
1140 ));
1141
1142 let expr = decl.expr().expect("private decls should have expressions");
1143 let value = evaluator.evaluate_expr(&expr).await?;
1144 let value = value
1145 .coerce(&ty)
1146 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1147 state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1148
1149 if decl.env().is_some() {
1151 state.env.insert(
1152 name.text().to_string(),
1153 value
1154 .as_primitive()
1155 .expect("value should be primitive")
1156 .raw(None)
1157 .to_string(),
1158 );
1159 }
1160
1161 Ok(())
1162 }
1163
1164 async fn evaluate_runtime_section(
1168 &self,
1169 id: &str,
1170 state: &State<'_>,
1171 section: &RuntimeSection<SyntaxNode>,
1172 inputs: &TaskInputs,
1173 ) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
1174 debug!(
1175 task_id = id,
1176 task_name = state.task.name(),
1177 document = state.document.uri().as_str(),
1178 "evaluating runtimes section",
1179 );
1180
1181 let mut requirements = HashMap::new();
1182 let mut hints = HashMap::new();
1183
1184 let version = state
1185 .document
1186 .version()
1187 .expect("document should have version");
1188 for item in section.items() {
1189 let name = item.name();
1190 match inputs.requirement(name.text()) {
1191 Some(value) => {
1192 requirements.insert(name.text().to_string(), value.clone());
1193 continue;
1194 }
1195 _ => {
1196 if let Some(value) = inputs.hint(name.text()) {
1197 hints.insert(name.text().to_string(), value.clone());
1198 continue;
1199 }
1200 }
1201 }
1202
1203 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1204 state,
1205 &self.downloader,
1206 ROOT_SCOPE_INDEX,
1207 ));
1208
1209 let (types, requirement) = match task_requirement_types(version, name.text()) {
1210 Some(types) => (Some(types), true),
1211 None => match task_hint_types(version, name.text(), false) {
1212 Some(types) => (Some(types), false),
1213 None => (None, false),
1214 },
1215 };
1216
1217 let expr = item.expr();
1219 let mut value = evaluator.evaluate_expr(&expr).await?;
1220 if let Some(types) = types {
1221 value = types
1222 .iter()
1223 .find_map(|ty| value.coerce(ty).ok())
1224 .ok_or_else(|| {
1225 multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1226 })?;
1227 }
1228
1229 if requirement {
1230 requirements.insert(name.text().to_string(), value);
1231 } else {
1232 hints.insert(name.text().to_string(), value);
1233 }
1234 }
1235
1236 Ok((requirements, hints))
1237 }
1238
1239 async fn evaluate_requirements_section(
1241 &self,
1242 id: &str,
1243 state: &State<'_>,
1244 section: &RequirementsSection<SyntaxNode>,
1245 inputs: &TaskInputs,
1246 ) -> Result<HashMap<String, Value>, Diagnostic> {
1247 debug!(
1248 task_id = id,
1249 task_name = state.task.name(),
1250 document = state.document.uri().as_str(),
1251 "evaluating requirements",
1252 );
1253
1254 let mut requirements = HashMap::new();
1255
1256 let version = state
1257 .document
1258 .version()
1259 .expect("document should have version");
1260 for item in section.items() {
1261 let name = item.name();
1262 if let Some(value) = inputs.requirement(name.text()) {
1263 requirements.insert(name.text().to_string(), value.clone());
1264 continue;
1265 }
1266
1267 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1268 state,
1269 &self.downloader,
1270 ROOT_SCOPE_INDEX,
1271 ));
1272
1273 let types =
1274 task_requirement_types(version, name.text()).expect("requirement should be known");
1275
1276 let expr = item.expr();
1278 let value = evaluator.evaluate_expr(&expr).await?;
1279 let value = types
1280 .iter()
1281 .find_map(|ty| value.coerce(ty).ok())
1282 .ok_or_else(|| {
1283 multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1284 })?;
1285
1286 requirements.insert(name.text().to_string(), value);
1287 }
1288
1289 Ok(requirements)
1290 }
1291
1292 async fn evaluate_hints_section(
1294 &self,
1295 id: &str,
1296 state: &State<'_>,
1297 section: &TaskHintsSection<SyntaxNode>,
1298 inputs: &TaskInputs,
1299 ) -> Result<HashMap<String, Value>, Diagnostic> {
1300 debug!(
1301 task_id = id,
1302 task_name = state.task.name(),
1303 document = state.document.uri().as_str(),
1304 "evaluating hints section",
1305 );
1306
1307 let mut hints = HashMap::new();
1308
1309 for item in section.items() {
1310 let name = item.name();
1311 if let Some(value) = inputs.hint(name.text()) {
1312 hints.insert(name.text().to_string(), value.clone());
1313 continue;
1314 }
1315
1316 let mut evaluator = ExprEvaluator::new(
1317 TaskEvaluationContext::new(state, &self.downloader, ROOT_SCOPE_INDEX).with_task(),
1318 );
1319
1320 let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
1321 hints.insert(name.text().to_string(), value);
1322 }
1323
1324 Ok(hints)
1325 }
1326
1327 async fn evaluate_command(
1332 &self,
1333 id: &str,
1334 state: &State<'_>,
1335 section: &CommandSection<SyntaxNode>,
1336 ) -> EvaluationResult<(String, Vec<Input>)> {
1337 debug!(
1338 task_id = id,
1339 task_name = state.task.name(),
1340 document = state.document.uri().as_str(),
1341 "evaluating command section",
1342 );
1343
1344 let mut inputs = Vec::new();
1346
1347 ScopeRef::new(&state.scopes, TASK_SCOPE_INDEX.0).for_each(|_, v| {
1349 v.visit_paths(false, &mut |_, value| {
1350 inputs.push(Input::from_primitive(value)?);
1351 Ok(())
1352 })
1353 })?;
1354
1355 inputs.push(Input::new(
1357 InputKind::Directory,
1358 EvaluationPath::Local(state.temp_dir.to_path_buf()),
1359 ));
1360
1361 self.backend
1363 .localize_inputs(&self.downloader, &mut inputs)
1364 .await
1365 .map_err(|e| {
1366 EvaluationError::new(
1367 state.document.clone(),
1368 task_localization_failed(e, state.task.name(), state.task.name_span()),
1369 )
1370 })?;
1371
1372 if enabled!(Level::DEBUG) {
1373 for input in inputs.iter() {
1374 if let Some(location) = input.location() {
1375 debug!(
1376 task_id = id,
1377 task_name = state.task.name(),
1378 document = state.document.uri().as_str(),
1379 "task input `{path}` (downloaded to `{location}`) mapped to `{guest_path}`",
1380 path = input.path().display(),
1381 location = location.display(),
1382 guest_path = input.guest_path().unwrap_or(""),
1383 );
1384 } else {
1385 debug!(
1386 task_id = id,
1387 task_name = state.task.name(),
1388 document = state.document.uri().as_str(),
1389 "task input `{path}` mapped to `{guest_path}`",
1390 path = input.path().display(),
1391 guest_path = input.guest_path().unwrap_or(""),
1392 );
1393 }
1394 }
1395 }
1396
1397 let mut command = String::new();
1398 match section.strip_whitespace() {
1399 Some(parts) => {
1400 let mut evaluator = ExprEvaluator::new(
1401 TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1402 .with_inputs(&inputs),
1403 );
1404
1405 for part in parts {
1406 match part {
1407 StrippedCommandPart::Text(t) => {
1408 command.push_str(t.as_str());
1409 }
1410 StrippedCommandPart::Placeholder(placeholder) => {
1411 evaluator
1412 .evaluate_placeholder(&placeholder, &mut command)
1413 .await
1414 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1415 }
1416 }
1417 }
1418 }
1419 _ => {
1420 warn!(
1421 "command for task `{task}` in `{uri}` has mixed indentation; whitespace \
1422 stripping was skipped",
1423 task = state.task.name(),
1424 uri = state.document.uri(),
1425 );
1426
1427 let mut evaluator = ExprEvaluator::new(
1428 TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1429 .with_inputs(&inputs),
1430 );
1431
1432 let heredoc = section.is_heredoc();
1433 for part in section.parts() {
1434 match part {
1435 CommandPart::Text(t) => {
1436 t.unescape_to(heredoc, &mut command);
1437 }
1438 CommandPart::Placeholder(placeholder) => {
1439 evaluator
1440 .evaluate_placeholder(&placeholder, &mut command)
1441 .await
1442 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1443 }
1444 }
1445 }
1446 }
1447 }
1448
1449 Ok((command, inputs))
1450 }
1451
1452 async fn evaluate_sections(
1460 &self,
1461 id: &str,
1462 state: &mut State<'_>,
1463 definition: &TaskDefinition<SyntaxNode>,
1464 inputs: &TaskInputs,
1465 attempt: u64,
1466 ) -> EvaluationResult<EvaluatedSections> {
1467 let (requirements, hints) = match definition.runtime() {
1469 Some(section) => self
1470 .evaluate_runtime_section(id, state, §ion, inputs)
1471 .await
1472 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1473 _ => (
1474 match definition.requirements() {
1475 Some(section) => self
1476 .evaluate_requirements_section(id, state, §ion, inputs)
1477 .await
1478 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1479 None => Default::default(),
1480 },
1481 match definition.hints() {
1482 Some(section) => self
1483 .evaluate_hints_section(id, state, §ion, inputs)
1484 .await
1485 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1486 None => Default::default(),
1487 },
1488 ),
1489 };
1490
1491 if state.document.version() >= Some(SupportedVersion::V1(V1::Two)) {
1495 let constraints = self
1497 .backend
1498 .constraints(&requirements, &hints)
1499 .with_context(|| {
1500 format!(
1501 "failed to get constraints for task `{task}`",
1502 task = state.task.name()
1503 )
1504 })?;
1505
1506 let task = TaskValue::new_v1(
1507 state.task.name(),
1508 id,
1509 definition,
1510 constraints,
1511 attempt.try_into().with_context(|| {
1512 format!(
1513 "too many attempts were made to run task `{task}`",
1514 task = state.task.name()
1515 )
1516 })?,
1517 );
1518
1519 let scope = &mut state.scopes[TASK_SCOPE_INDEX.0];
1520 if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1521 *v = Value::Task(task);
1522 } else {
1523 scope.insert(TASK_VAR_NAME, Value::Task(task));
1524 }
1525 }
1526
1527 let (command, inputs) = self
1528 .evaluate_command(
1529 id,
1530 state,
1531 &definition.command().expect("must have command section"),
1532 )
1533 .await?;
1534
1535 Ok(EvaluatedSections {
1536 command,
1537 requirements: Arc::new(requirements),
1538 hints: Arc::new(hints),
1539 inputs,
1540 })
1541 }
1542
1543 async fn evaluate_output(
1545 &self,
1546 id: &str,
1547 state: &mut State<'_>,
1548 decl: &Decl<SyntaxNode>,
1549 evaluated: &EvaluatedTask,
1550 ) -> Result<(), Diagnostic> {
1551 let name = decl.name();
1552 debug!(
1553 task_id = id,
1554 task_name = state.task.name(),
1555 document = state.document.uri().as_str(),
1556 output_name = name.text(),
1557 "evaluating output",
1558 );
1559
1560 let decl_ty = decl.ty();
1561 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1562 let mut evaluator = ExprEvaluator::new(
1563 TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1564 .with_work_dir(&evaluated.result.work_dir)
1565 .with_stdout(&evaluated.result.stdout)
1566 .with_stderr(&evaluated.result.stderr),
1567 );
1568
1569 let expr = decl.expr().expect("outputs should have expressions");
1570 let value = evaluator.evaluate_expr(&expr).await?;
1571
1572 let mut value = value
1574 .coerce(&ty)
1575 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1576
1577 let result = if let Some(guest_work_dir) = self.backend.guest_work_dir() {
1578 value.visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1580 let path = match value {
1581 PrimitiveValue::File(path) => path,
1582 PrimitiveValue::Directory(path) => path,
1583 _ => unreachable!("only file and directory values should be visited"),
1584 };
1585
1586 if !Path::new(path.as_str()).starts_with(state.temp_dir)
1589 && !Path::new(path.as_str()).starts_with(evaluated.attempt_dir())
1590 {
1591 let guest = if path::is_file_url(path) {
1593 path::parse_url(path)
1594 .and_then(|u| u.to_file_path().ok())
1595 .ok_or_else(|| anyhow!("guest path `{path}` is not a valid file URI"))?
1596 } else if path::is_url(path) {
1597 return Ok(true);
1600 } else {
1601 guest_work_dir.join(path.as_str())
1603 };
1604
1605 let host = if let Ok(stripped) = guest.strip_prefix(guest_work_dir) {
1608 Cow::Owned(
1609 evaluated.result.work_dir.join(
1610 stripped.to_str().with_context(|| {
1611 format!("output path `{path}` is not UTF-8")
1612 })?,
1613 )?,
1614 )
1615 } else {
1616 evaluated
1617 .inputs()
1618 .iter()
1619 .filter_map(|i| {
1620 Some((i.path(), guest.strip_prefix(i.guest_path()?).ok()?))
1621 })
1622 .min_by(|(_, a), (_, b)| a.as_os_str().len().cmp(&b.as_os_str().len()))
1623 .and_then(|(path, stripped)| {
1624 if stripped.as_os_str().is_empty() {
1625 return Some(Cow::Borrowed(path));
1626 }
1627
1628 Some(Cow::Owned(path.join(stripped.to_str()?).ok()?))
1629 })
1630 .ok_or_else(|| {
1631 anyhow!("guest path `{path}` is not within a container mount")
1632 })?
1633 };
1634
1635 *Arc::make_mut(path) = host.into_owned().try_into()?;
1637 }
1638
1639 value.ensure_path_exists(optional)
1641 })
1642 } else {
1643 value.visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1645 if let Some(work_dir) = evaluated.result.work_dir.as_local() {
1646 value.join_path_to(work_dir);
1647 }
1648
1649 value.ensure_path_exists(optional)
1650 })
1651 };
1652
1653 result.map_err(|e| {
1654 output_evaluation_failed(e, state.task.name(), true, name.text(), name.span())
1655 })?;
1656
1657 state.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
1658 Ok(())
1659 }
1660}