1use std::borrow::Cow;
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fs;
7use std::mem;
8use std::path::Path;
9use std::path::absolute;
10use std::str::FromStr;
11use std::sync::Arc;
12
13use anyhow::Context;
14use anyhow::Result;
15use anyhow::anyhow;
16use anyhow::bail;
17use bimap::BiHashMap;
18use indexmap::IndexMap;
19use petgraph::algo::toposort;
20use tokio::task::JoinSet;
21use tracing::Level;
22use tracing::debug;
23use tracing::enabled;
24use tracing::error;
25use tracing::info;
26use tracing::warn;
27use wdl_analysis::Document;
28use wdl_analysis::diagnostics::Io;
29use wdl_analysis::diagnostics::multiple_type_mismatch;
30use wdl_analysis::diagnostics::unknown_name;
31use wdl_analysis::document::TASK_VAR_NAME;
32use wdl_analysis::document::Task;
33use wdl_analysis::eval::v1::TaskGraphBuilder;
34use wdl_analysis::eval::v1::TaskGraphNode;
35use wdl_analysis::types::Optional;
36use wdl_analysis::types::PrimitiveType;
37use wdl_analysis::types::Type;
38use wdl_analysis::types::v1::task_hint_types;
39use wdl_analysis::types::v1::task_requirement_types;
40use wdl_ast::Ast;
41use wdl_ast::AstNode;
42use wdl_ast::AstToken;
43use wdl_ast::Diagnostic;
44use wdl_ast::Span;
45use wdl_ast::SupportedVersion;
46use wdl_ast::v1::CommandPart;
47use wdl_ast::v1::CommandSection;
48use wdl_ast::v1::Decl;
49use wdl_ast::v1::RequirementsSection;
50use wdl_ast::v1::RuntimeSection;
51use wdl_ast::v1::StrippedCommandPart;
52use wdl_ast::v1::TASK_HINT_CACHEABLE;
53use wdl_ast::v1::TASK_HINT_DISKS;
54use wdl_ast::v1::TASK_HINT_GPU;
55use wdl_ast::v1::TASK_HINT_MAX_CPU;
56use wdl_ast::v1::TASK_HINT_MAX_CPU_ALIAS;
57use wdl_ast::v1::TASK_HINT_MAX_MEMORY;
58use wdl_ast::v1::TASK_HINT_MAX_MEMORY_ALIAS;
59use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER;
60use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER_ALIAS;
61use wdl_ast::v1::TASK_REQUIREMENT_CPU;
62use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
63use wdl_ast::v1::TASK_REQUIREMENT_GPU;
64use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES;
65use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES_ALIAS;
66use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
67use wdl_ast::v1::TaskDefinition;
68use wdl_ast::v1::TaskHintsSection;
69use wdl_ast::version::V1;
70
71use super::TopLevelEvaluator;
72use crate::CancellationContextState;
73use crate::Coercible;
74use crate::ContentKind;
75use crate::EngineEvent;
76use crate::EvaluationContext;
77use crate::EvaluationError;
78use crate::EvaluationResult;
79use crate::GuestPath;
80use crate::HiddenValue;
81use crate::HostPath;
82use crate::Input;
83use crate::ONE_GIBIBYTE;
84use crate::Object;
85use crate::Outputs;
86use crate::Scope;
87use crate::ScopeIndex;
88use crate::ScopeRef;
89use crate::StorageUnit;
90use crate::TaskInputs;
91use crate::TaskPostEvaluationData;
92use crate::TaskPostEvaluationValue;
93use crate::TaskPreEvaluationValue;
94use crate::Value;
95use crate::backend::TaskSpawnInfo;
96use crate::backend::TaskSpawnRequest;
97use crate::cache::KeyRequest;
98use crate::config::CallCachingMode;
99use crate::config::Config;
100use crate::config::DEFAULT_TASK_SHELL;
101use crate::config::MAX_RETRIES;
102use crate::convert_unit_string;
103use crate::diagnostics::decl_evaluation_failed;
104use crate::diagnostics::runtime_type_mismatch;
105use crate::diagnostics::task_execution_failed;
106use crate::diagnostics::task_localization_failed;
107use crate::eval::EvaluatedTask;
108use crate::eval::trie::InputTrie;
109use crate::http::Transferer;
110use crate::path::EvaluationPath;
111use crate::path::is_file_url;
112use crate::path::is_supported_url;
113use crate::tree::SyntaxNode;
114use crate::v1::INPUTS_FILE;
115use crate::v1::OUTPUTS_FILE;
116use crate::v1::expr::ExprEvaluator;
117use crate::v1::write_json_file;
118
119pub const DEFAULT_TASK_REQUIREMENT_CONTAINER: &str = "ubuntu:latest";
121pub const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
123pub const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * (ONE_GIBIBYTE as i64);
125pub const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
127pub const DEFAULT_TASK_REQUIREMENT_DISKS: f64 = 1.0;
129pub const DEFAULT_GPU_COUNT: u64 = 1;
132
133const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
135const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
137const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
140
141pub(crate) fn container<'a>(
143 requirements: &'a HashMap<String, Value>,
144 default: Option<&'a str>,
145) -> Cow<'a, str> {
146 requirements
147 .get(TASK_REQUIREMENT_CONTAINER)
148 .or_else(|| requirements.get(TASK_REQUIREMENT_CONTAINER_ALIAS))
149 .and_then(|v| -> Option<Cow<'_, str>> {
150 if let Some(array) = v.as_array() {
154 return array.as_slice().first().map(|v| {
155 v.as_string()
156 .expect("type should be string")
157 .as_ref()
158 .into()
159 });
160 }
161
162 Some(
163 v.coerce(None, &PrimitiveType::String.into())
164 .expect("type should coerce")
165 .unwrap_string()
166 .as_ref()
167 .clone()
168 .into(),
169 )
170 })
171 .and_then(|v| {
172 if v == "*" { None } else { Some(v) }
174 })
175 .unwrap_or_else(|| {
176 default
177 .map(Into::into)
178 .unwrap_or(DEFAULT_TASK_REQUIREMENT_CONTAINER.into())
179 })
180}
181
182pub(crate) fn cpu(requirements: &HashMap<String, Value>) -> f64 {
184 requirements
185 .get(TASK_REQUIREMENT_CPU)
186 .map(|v| {
187 v.coerce(None, &PrimitiveType::Float.into())
188 .expect("type should coerce")
189 .unwrap_float()
190 })
191 .unwrap_or(DEFAULT_TASK_REQUIREMENT_CPU)
192}
193
194pub(crate) fn max_cpu(hints: &HashMap<String, Value>) -> Option<f64> {
196 hints
197 .get(TASK_HINT_MAX_CPU)
198 .or_else(|| hints.get(TASK_HINT_MAX_CPU_ALIAS))
199 .map(|v| {
200 v.coerce(None, &PrimitiveType::Float.into())
201 .expect("type should coerce")
202 .unwrap_float()
203 })
204}
205
206pub(crate) fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
208 Ok(requirements
209 .get(TASK_REQUIREMENT_MEMORY)
210 .map(|v| {
211 if let Some(v) = v.as_integer() {
212 return Ok(v);
213 }
214
215 if let Some(s) = v.as_string() {
216 return convert_unit_string(s)
217 .and_then(|v| v.try_into().ok())
218 .with_context(|| {
219 format!("task specifies an invalid `memory` requirement `{s}`")
220 });
221 }
222
223 unreachable!("value should be an integer or string");
224 })
225 .transpose()?
226 .unwrap_or(DEFAULT_TASK_REQUIREMENT_MEMORY))
227}
228
229pub(crate) fn max_memory(hints: &HashMap<String, Value>) -> Result<Option<i64>> {
231 hints
232 .get(TASK_HINT_MAX_MEMORY)
233 .or_else(|| hints.get(TASK_HINT_MAX_MEMORY_ALIAS))
234 .map(|v| {
235 if let Some(v) = v.as_integer() {
236 return Ok(v);
237 }
238
239 if let Some(s) = v.as_string() {
240 return convert_unit_string(s)
241 .and_then(|v| v.try_into().ok())
242 .with_context(|| {
243 format!("task specifies an invalid `memory` requirement `{s}`")
244 });
245 }
246
247 unreachable!("value should be an integer or string");
248 })
249 .transpose()
250}
251
252pub(crate) fn gpu(
254 requirements: &HashMap<String, Value>,
255 hints: &HashMap<String, Value>,
256) -> Option<u64> {
257 let Some(true) = requirements
260 .get(TASK_REQUIREMENT_GPU)
261 .and_then(|v| v.as_boolean())
262 else {
263 return None;
264 };
265
266 let Some(hint) = hints.get(TASK_HINT_GPU) else {
269 return Some(DEFAULT_GPU_COUNT);
270 };
271
272 if let Some(hint) = hint.as_string() {
277 warn!(
278 %hint,
279 "string `gpu` hints are not supported; falling back to {DEFAULT_GPU_COUNT} GPU(s)"
280 );
281 return Some(DEFAULT_GPU_COUNT);
282 }
283
284 match hint.as_integer() {
285 Some(count) if count >= 1 => Some(count as u64),
286 Some(count) => {
290 warn!(
291 %count,
292 "`gpu` hint specified {count} GPU(s); no GPUs will be requested for execution"
293 );
294 None
295 }
296 None => {
297 unreachable!("`gpu` hint must be an integer or string")
300 }
301 }
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
308pub enum DiskType {
309 SSD,
311 HDD,
313}
314
315impl FromStr for DiskType {
316 type Err = ();
317
318 fn from_str(s: &str) -> Result<Self, Self::Err> {
319 match s {
320 "SSD" => Ok(Self::SSD),
321 "HDD" => Ok(Self::HDD),
322 _ => Err(()),
323 }
324 }
325}
326
327pub struct DiskRequirement {
329 pub size: i64,
331
332 pub ty: Option<DiskType>,
334}
335
336pub(crate) fn disks<'a>(
340 requirements: &'a HashMap<String, Value>,
341 hints: &HashMap<String, Value>,
342) -> Result<HashMap<&'a str, DiskRequirement>> {
343 fn lookup_type(mount_point: Option<&str>, hints: &HashMap<String, Value>) -> Option<DiskType> {
347 hints.get(TASK_HINT_DISKS).and_then(|v| {
348 if let Some(ty) = v.as_string() {
349 return ty.parse().ok();
350 }
351
352 if let Some(map) = v.as_map() {
353 if let Some((_, v)) = map.iter().find(|(k, _)| match (k, mount_point) {
356 (None, None) => true,
357 (None, Some(_)) | (Some(_), None) => false,
358 (Some(k), Some(mount_point)) => k
359 .as_string()
360 .map(|k| k.as_str() == mount_point)
361 .unwrap_or(false),
362 }) {
363 return v.as_string().and_then(|ty| ty.parse().ok());
364 }
365 }
366
367 None
368 })
369 }
370
371 fn parse_disk_spec(spec: &str) -> Option<(i64, Option<&str>)> {
374 let iter = spec.split_whitespace();
375 let mut first = None;
376 let mut second = None;
377 let mut third = None;
378
379 for part in iter {
380 if first.is_none() {
381 first = Some(part);
382 continue;
383 }
384
385 if second.is_none() {
386 second = Some(part);
387 continue;
388 }
389
390 if third.is_none() {
391 third = Some(part);
392 continue;
393 }
394
395 return None;
396 }
397
398 match (first, second, third) {
399 (None, None, None) => None,
400 (Some(size), None, None) => {
401 Some((size.parse().ok()?, None))
403 }
404 (Some(first), Some(second), None) => {
405 if let Ok(size) = first.parse() {
407 let unit: StorageUnit = second.parse().ok()?;
408 let size = unit.bytes(size)? / (ONE_GIBIBYTE as u64);
409 return Some((size.try_into().ok()?, None));
410 }
411
412 if !first.starts_with('/') {
415 return None;
416 }
417
418 Some((second.parse().ok()?, Some(first)))
419 }
420 (Some(mount_point), Some(size), Some(unit)) => {
421 let unit: StorageUnit = unit.parse().ok()?;
423 let size = unit.bytes(size.parse().ok()?)? / (ONE_GIBIBYTE as u64);
424
425 if !mount_point.starts_with('/') {
427 return None;
428 }
429
430 Some((size.try_into().ok()?, Some(mount_point)))
431 }
432 _ => unreachable!("should have one, two, or three values"),
433 }
434 }
435
436 fn insert_disk<'a>(
438 spec: &'a str,
439 hints: &HashMap<String, Value>,
440 disks: &mut HashMap<&'a str, DiskRequirement>,
441 ) -> Result<()> {
442 let (size, mount_point) =
443 parse_disk_spec(spec).with_context(|| format!("invalid disk specification `{spec}"))?;
444
445 let prev = disks.insert(
446 mount_point.unwrap_or("/"),
447 DiskRequirement {
448 size,
449 ty: lookup_type(mount_point, hints),
450 },
451 );
452
453 if prev.is_some() {
454 bail!(
455 "duplicate mount point `{mp}` specified in `disks` requirement",
456 mp = mount_point.unwrap_or("/")
457 );
458 }
459
460 Ok(())
461 }
462
463 let mut disks = HashMap::new();
464 if let Some(v) = requirements.get(TASK_REQUIREMENT_DISKS) {
465 if let Some(size) = v.as_integer() {
466 if size < 0 {
468 bail!("task requirement `disks` cannot be less than zero");
469 }
470
471 disks.insert(
472 "/",
473 DiskRequirement {
474 size,
475 ty: lookup_type(None, hints),
476 },
477 );
478 } else if let Some(spec) = v.as_string() {
479 insert_disk(spec, hints, &mut disks)?;
480 } else if let Some(v) = v.as_array() {
481 for spec in v.as_slice() {
482 insert_disk(
483 spec.as_string().expect("spec should be a string"),
484 hints,
485 &mut disks,
486 )?;
487 }
488 } else {
489 unreachable!("value should be an integer, string, or array");
490 }
491 }
492
493 Ok(disks)
494}
495
496pub(crate) fn preemptible(hints: &HashMap<String, Value>) -> i64 {
502 const TASK_HINT_PREEMPTIBLE: &str = "preemptible";
503 const DEFAULT_TASK_HINT_PREEMPTIBLE: i64 = 0;
504
505 hints
506 .get(TASK_HINT_PREEMPTIBLE)
507 .and_then(|v| {
508 Some(
509 v.coerce(None, &PrimitiveType::Integer.into())
510 .ok()?
511 .unwrap_integer(),
512 )
513 })
514 .unwrap_or(DEFAULT_TASK_HINT_PREEMPTIBLE)
515}
516
517pub(crate) fn max_retries(requirements: &HashMap<String, Value>, config: &Config) -> u64 {
520 requirements
521 .get(TASK_REQUIREMENT_MAX_RETRIES)
522 .or_else(|| requirements.get(TASK_REQUIREMENT_MAX_RETRIES_ALIAS))
523 .and_then(|v| v.as_integer())
524 .map(|v| v as u64)
525 .or(config.task.retries)
526 .unwrap_or(DEFAULT_TASK_REQUIREMENT_MAX_RETRIES)
527}
528
529pub(crate) fn cacheable(hints: &HashMap<String, Value>, config: &Config) -> bool {
531 hints
532 .get(TASK_HINT_CACHEABLE)
533 .and_then(|v| v.as_boolean())
534 .unwrap_or(match config.task.cache {
535 CallCachingMode::Off | CallCachingMode::Explicit => false,
536 CallCachingMode::On => true,
537 })
538}
539
540struct TaskEvaluationContext<'a, 'b> {
542 state: &'a mut State<'b>,
544 scope: ScopeIndex,
546 work_dir: Option<&'a EvaluationPath>,
550 stdout: Option<&'a Value>,
554 stderr: Option<&'a Value>,
558 task: bool,
562}
563
564impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
565 pub fn new(state: &'a mut State<'b>, scope: ScopeIndex) -> Self {
567 Self {
568 state,
569 scope,
570 work_dir: None,
571 stdout: None,
572 stderr: None,
573 task: false,
574 }
575 }
576
577 pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
579 self.work_dir = Some(work_dir);
580 self
581 }
582
583 pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
585 self.stdout = Some(stdout);
586 self
587 }
588
589 pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
591 self.stderr = Some(stderr);
592 self
593 }
594
595 pub fn with_task(mut self) -> Self {
599 self.task = true;
600 self
601 }
602}
603
604impl EvaluationContext for TaskEvaluationContext<'_, '_> {
605 fn version(&self) -> SupportedVersion {
606 self.state
607 .document
608 .version()
609 .expect("document should have a version")
610 }
611
612 fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
613 ScopeRef::new(&self.state.scopes, self.scope)
614 .lookup(name)
615 .cloned()
616 .ok_or_else(|| unknown_name(name, span))
617 }
618
619 fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
620 crate::resolve_type_name(self.state.document, name, span)
621 }
622
623 fn base_dir(&self) -> &EvaluationPath {
624 self.work_dir.unwrap_or(&self.state.base_dir)
625 }
626
627 fn temp_dir(&self) -> &Path {
628 self.state.temp_dir
629 }
630
631 fn stdout(&self) -> Option<&Value> {
632 self.stdout
633 }
634
635 fn stderr(&self) -> Option<&Value> {
636 self.stderr
637 }
638
639 fn task(&self) -> Option<&Task> {
640 if self.task {
641 Some(self.state.task)
642 } else {
643 None
644 }
645 }
646
647 fn transferer(&self) -> &dyn Transferer {
648 self.state.transferer().as_ref()
649 }
650
651 fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
652 self.state.path_map.get_by_right(path).cloned()
653 }
654
655 fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
656 self.state.path_map.get_by_left(path).cloned()
657 }
658
659 fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
660 self.state.insert_backend_input(ContentKind::File, path)?;
661 Ok(())
662 }
663}
664
665struct State<'a> {
667 top_level: &'a TopLevelEvaluator,
669 temp_dir: &'a Path,
671 base_dir: EvaluationPath,
678 document: &'a Document,
680 task: &'a Task,
682 scopes: [Scope; 3],
688 env: IndexMap<String, String>,
692 inputs: BTreeMap<String, Value>,
696 backend_inputs: InputTrie,
698 path_map: BiHashMap<HostPath, GuestPath>,
700}
701
702impl<'a> State<'a> {
703 fn transferer(&self) -> &Arc<dyn Transferer> {
705 &self.top_level.transferer
706 }
707
708 fn new(
710 top_level: &'a TopLevelEvaluator,
711 document: &'a Document,
712 task: &'a Task,
713 temp_dir: &'a Path,
714 ) -> Result<Self> {
715 let scopes = [
724 Scope::default(),
725 Scope::new(ROOT_SCOPE_INDEX),
726 Scope::new(OUTPUT_SCOPE_INDEX),
727 ];
728
729 let backend_inputs = if let Some(guest_inputs_dir) = top_level.backend.guest_inputs_dir() {
730 InputTrie::new_with_guest_dir(guest_inputs_dir)
731 } else {
732 InputTrie::new()
733 };
734
735 let document_path = document.uri();
736 let base_dir = EvaluationPath::parent_of(document_path.as_str()).with_context(|| {
737 format!(
738 "document `{path}` does not have a parent directory",
739 path = document.path()
740 )
741 })?;
742
743 Ok(Self {
744 top_level,
745 temp_dir,
746 base_dir,
747 document,
748 task,
749 scopes,
750 env: Default::default(),
751 inputs: Default::default(),
752 backend_inputs,
753 path_map: Default::default(),
754 })
755 }
756
757 async fn add_backend_inputs(
769 &mut self,
770 is_optional: bool,
771 value: &mut Value,
772 transferer: Arc<dyn Transferer>,
773 needs_local_inputs: bool,
774 ) -> Result<()> {
775 if self
778 .document
779 .version()
780 .expect("document should have a version")
781 >= SupportedVersion::V1(V1::Two)
782 {
783 *value = value
784 .resolve_paths(
785 is_optional,
786 self.base_dir.as_local(),
787 Some(transferer.as_ref()),
788 &|path| Ok(path.clone()),
789 )
790 .await?;
791 }
792
793 let mut urls = Vec::new();
795 value.visit_paths(&mut |is_file, path| {
796 if let Some(index) = self.insert_backend_input(
798 if is_file {
799 ContentKind::File
800 } else {
801 ContentKind::Directory
802 },
803 path,
804 )? {
805 if needs_local_inputs
808 && self.backend_inputs.as_slice()[index].guest_path.is_none()
809 && is_supported_url(path.as_str())
810 && !is_file_url(path.as_str())
811 {
812 urls.push((path.clone(), index));
813 }
814 }
815
816 Ok(())
817 })?;
818
819 if urls.is_empty() {
820 return Ok(());
821 }
822
823 let mut downloads = JoinSet::new();
825 for (url, index) in urls {
826 let transferer = transferer.clone();
827 downloads.spawn(async move {
828 transferer
829 .download(
830 &url.as_str()
831 .parse()
832 .with_context(|| format!("invalid URL `{url}`"))?,
833 )
834 .await
835 .with_context(|| anyhow!("failed to localize `{url}`"))
836 .map(|l| (url, l, index))
837 });
838 }
839
840 while let Some(result) = downloads.join_next().await {
842 let (url, location, index) =
843 result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}")))?;
844
845 let guest_path = GuestPath::new(location.to_str().with_context(|| {
846 format!(
847 "download location `{location}` is not UTF-8",
848 location = location.display()
849 )
850 })?);
851
852 self.path_map.insert(url, guest_path);
854
855 self.backend_inputs.as_slice_mut()[index].set_location(location);
857 }
858
859 Ok(())
860 }
861
862 fn insert_backend_input(
866 &mut self,
867 kind: ContentKind,
868 path: &HostPath,
869 ) -> Result<Option<usize>> {
870 if let Some(index) = self
872 .backend_inputs
873 .insert(kind, path.as_str(), &self.base_dir)?
874 {
875 let input = &self.backend_inputs.as_slice()[index];
877 if let Some(guest_path) = &input.guest_path {
878 self.path_map.insert(path.clone(), guest_path.clone());
879 }
880
881 return Ok(Some(index));
882 }
883
884 Ok(None)
885 }
886}
887
888struct EvaluatedSections {
890 command: String,
892 requirements: Arc<HashMap<String, Value>>,
894 hints: Arc<HashMap<String, Value>>,
896}
897
898impl TopLevelEvaluator {
899 pub async fn evaluate_task(
903 &self,
904 document: &Document,
905 task: &Task,
906 inputs: &TaskInputs,
907 task_eval_root: impl AsRef<Path>,
908 ) -> EvaluationResult<EvaluatedTask> {
909 if document.has_errors() {
911 return Err(anyhow!("cannot evaluate a document with errors").into());
912 }
913
914 let result = self
915 .perform_task_evaluation(document, task, inputs, task_eval_root.as_ref(), task.name())
916 .await;
917
918 if self.cancellation.user_canceled() {
919 return Err(EvaluationError::Canceled);
920 }
921
922 result
923 }
924
925 pub(crate) async fn perform_task_evaluation(
930 &self,
931 document: &Document,
932 task: &Task,
933 inputs: &TaskInputs,
934 task_eval_root: &Path,
935 id: &str,
936 ) -> EvaluationResult<EvaluatedTask> {
937 inputs.validate(document, task, None).with_context(|| {
938 format!(
939 "failed to validate the inputs to task `{task}`",
940 task = task.name()
941 )
942 })?;
943
944 let ast = match document.root().morph().ast() {
945 Ast::V1(ast) => ast,
946 _ => {
947 return Err(
948 anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
949 );
950 }
951 };
952
953 let definition = ast
955 .tasks()
956 .find(|t| t.name().text() == task.name())
957 .expect("task should exist in the AST");
958
959 let version = document.version().expect("document should have version");
960
961 let mut diagnostics = Vec::new();
963 let graph = TaskGraphBuilder::default().build(version, &definition, &mut diagnostics);
964 assert!(
965 diagnostics.is_empty(),
966 "task evaluation graph should have no diagnostics"
967 );
968
969 debug!(
970 task_id = id,
971 task_name = task.name(),
972 document = document.uri().as_str(),
973 "evaluating task"
974 );
975
976 let task_eval_root = absolute(task_eval_root).with_context(|| {
977 format!(
978 "failed to determine absolute path of `{path}`",
979 path = task_eval_root.display()
980 )
981 })?;
982
983 let temp_dir = task_eval_root.join("tmp");
985 fs::create_dir_all(&temp_dir).with_context(|| {
986 format!(
987 "failed to create directory `{path}`",
988 path = temp_dir.display()
989 )
990 })?;
991
992 write_json_file(task_eval_root.join(INPUTS_FILE), inputs)?;
994
995 let mut state = State::new(self, document, task, &temp_dir)?;
996 let nodes = toposort(&graph, None).expect("graph should be acyclic");
997 let mut current = 0;
998 while current < nodes.len() {
999 match &graph[nodes[current]] {
1000 TaskGraphNode::Input(decl) => {
1001 state
1002 .evaluate_input(id, decl, inputs)
1003 .await
1004 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1005 }
1006 TaskGraphNode::Decl(decl) => {
1007 state
1008 .evaluate_decl(id, decl)
1009 .await
1010 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1011 }
1012 TaskGraphNode::Output(_) => {
1013 break;
1015 }
1016 TaskGraphNode::Command(_)
1017 | TaskGraphNode::Runtime(_)
1018 | TaskGraphNode::Requirements(_)
1019 | TaskGraphNode::Hints(_) => {
1020 }
1023 }
1024
1025 current += 1;
1026 }
1027
1028 let mut cached;
1029 let env = Arc::new(mem::take(&mut state.env));
1030 let mut attempt = 0;
1032 let mut previous_task_data: Option<Arc<TaskPostEvaluationData>> = None;
1033 let mut evaluated = loop {
1034 if self.cancellation.state() != CancellationContextState::NotCanceled {
1035 return Err(EvaluationError::Canceled);
1036 }
1037
1038 let EvaluatedSections {
1039 command,
1040 requirements,
1041 hints,
1042 } = state
1043 .evaluate_sections(id, &definition, inputs, attempt, previous_task_data.clone())
1044 .await?;
1045
1046 let max_retries = max_retries(&requirements, &self.config);
1049
1050 if max_retries > MAX_RETRIES {
1051 return Err(anyhow!(
1052 "task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
1053 )
1054 .into());
1055 }
1056
1057 let backend_inputs = state.localize_inputs(id).await?;
1058
1059 let mut key = if attempt == 0
1061 && let Some(cache) = &self.cache
1062 {
1063 if cacheable(&hints, &self.config) {
1064 let request = KeyRequest {
1065 document_uri: state.document.uri().as_ref(),
1066 task_name: task.name(),
1067 inputs: &state.inputs,
1068 command: &command,
1069 requirements: requirements.as_ref(),
1070 hints: hints.as_ref(),
1071 container: &container(&requirements, self.config.task.container.as_deref()),
1072 shell: self
1073 .config
1074 .task
1075 .shell
1076 .as_deref()
1077 .unwrap_or(DEFAULT_TASK_SHELL),
1078 backend_inputs: &backend_inputs,
1079 };
1080
1081 match cache.key(request).await {
1082 Ok(key) => {
1083 debug!(
1084 task_id = id,
1085 task_name = state.task.name(),
1086 document = state.document.uri().as_str(),
1087 "task cache key is `{key}`"
1088 );
1089 Some(key)
1090 }
1091 Err(e) => {
1092 warn!(
1093 task_id = id,
1094 task_name = state.task.name(),
1095 document = state.document.uri().as_str(),
1096 "call caching disabled due to cache key calculation failure: {e:#}"
1097 );
1098 None
1099 }
1100 }
1101 } else {
1102 match self.config.task.cache {
1104 CallCachingMode::Off => {
1105 unreachable!("cache was used despite not being enabled")
1106 }
1107 CallCachingMode::On => debug!(
1108 task_id = id,
1109 task_name = state.task.name(),
1110 document = state.document.uri().as_str(),
1111 "task is not cacheable due to `cacheable` hint being set to `false`"
1112 ),
1113 CallCachingMode::Explicit => debug!(
1114 task_id = id,
1115 task_name = state.task.name(),
1116 document = state.document.uri().as_str(),
1117 "task is not cacheable due to `cacheable` hint not being explicitly \
1118 set to `true`"
1119 ),
1120 }
1121
1122 None
1123 }
1124 } else {
1125 None
1126 };
1127
1128 cached = false;
1130 let result = if let Some(cache_key) = &key {
1131 match self
1132 .cache
1133 .as_ref()
1134 .expect("should have cache")
1135 .get(cache_key)
1136 .await
1137 {
1138 Ok(Some(results)) => {
1139 info!(
1140 task_id = id,
1141 task_name = state.task.name(),
1142 document = state.document.uri().as_str(),
1143 "task execution was skipped due to previous result being present in \
1144 the call cache"
1145 );
1146
1147 cached = true;
1149 if let Some(sender) = &self.events {
1150 let _ = sender.send(EngineEvent::ReusedCachedExecutionResult {
1151 id: id.to_string(),
1152 });
1153 }
1154
1155 key = None;
1158 Some(results)
1159 }
1160 Ok(None) => {
1161 debug!(
1162 task_id = id,
1163 task_name = state.task.name(),
1164 document = state.document.uri().as_str(),
1165 "call cache miss for key `{cache_key}`"
1166 );
1167 None
1168 }
1169 Err(e) => {
1170 info!(
1171 task_id = id,
1172 task_name = state.task.name(),
1173 document = state.document.uri().as_str(),
1174 "ignoring call cache entry: {e:#}"
1175 );
1176 None
1177 }
1178 }
1179 } else {
1180 None
1181 };
1182
1183 let result = match result {
1184 Some(result) => result,
1185 None => {
1186 let mut attempt_dir = task_eval_root.clone();
1187 attempt_dir.push("attempts");
1188 attempt_dir.push(attempt.to_string());
1189 let request = TaskSpawnRequest::new(
1190 id.to_string(),
1191 TaskSpawnInfo::new(
1192 command,
1193 backend_inputs,
1194 requirements.clone(),
1195 hints.clone(),
1196 env.clone(),
1197 self.transferer.clone(),
1198 ),
1199 attempt,
1200 attempt_dir.clone(),
1201 task_eval_root.clone(),
1202 temp_dir.clone(),
1203 );
1204
1205 self.backend
1206 .spawn(request, self.cancellation.token())
1207 .with_context(|| {
1208 format!(
1209 "failed to spawn task `{name}` in `{path}` (task id `{id}`)",
1210 name = task.name(),
1211 path = document.path(),
1212 )
1213 })?
1214 .await
1215 .expect("failed to receive response from spawned task")
1216 .map_err(|e| {
1217 EvaluationError::new(
1218 state.document.clone(),
1219 task_execution_failed(e, task.name(), id, task.name_span()),
1220 )
1221 })?
1222 }
1223 };
1224
1225 let evaluated = EvaluatedTask::new(cached, result);
1227 if version >= SupportedVersion::V1(V1::Two) {
1228 let task = state.scopes[TASK_SCOPE_INDEX.0]
1229 .get_mut(TASK_VAR_NAME)
1230 .expect("task variable should exist in scope for WDL v1.2+")
1231 .as_task_post_evaluation_mut()
1232 .expect("task should be a post evaluation task at this point");
1233
1234 task.set_attempt(attempt.try_into().with_context(|| {
1235 format!(
1236 "too many attempts were made to run task `{task}`",
1237 task = state.task.name()
1238 )
1239 })?);
1240 task.set_return_code(evaluated.result.exit_code);
1241 }
1242
1243 if let Err(e) = evaluated
1244 .handle_exit(&requirements, state.transferer().as_ref())
1245 .await
1246 {
1247 if attempt >= max_retries {
1248 return Err(EvaluationError::new(
1249 state.document.clone(),
1250 task_execution_failed(e, task.name(), id, task.name_span()),
1251 ));
1252 }
1253
1254 attempt += 1;
1255
1256 if let Some(task) = state.scopes[TASK_SCOPE_INDEX.0].names.get(TASK_VAR_NAME) {
1257 let task = task.as_task_post_evaluation().unwrap();
1259 previous_task_data = Some(task.data().clone());
1260 }
1261
1262 info!(
1263 "retrying execution of task `{name}` (retry {attempt})",
1264 name = state.task.name()
1265 );
1266 continue;
1267 }
1268
1269 if let Some(key) = key {
1271 match self
1272 .cache
1273 .as_ref()
1274 .expect("should have cache")
1275 .put(key, &evaluated.result)
1276 .await
1277 {
1278 Ok(key) => {
1279 debug!(
1280 task_id = id,
1281 task_name = state.task.name(),
1282 document = state.document.uri().as_str(),
1283 "updated call cache entry for key `{key}`"
1284 );
1285 }
1286 Err(e) => {
1287 error!(
1288 "failed to update call cache entry for task `{name}` (task id \
1289 `{id}`): cache entry has been discard: {e:#}",
1290 name = task.name()
1291 );
1292 }
1293 }
1294 }
1295
1296 break evaluated;
1297 };
1298
1299 if !cached
1301 && let Some(cleanup) = self
1302 .backend
1303 .cleanup(&evaluated.result.work_dir, self.cancellation.token())
1304 {
1305 cleanup.await;
1306 }
1307
1308 for index in &nodes[current..] {
1310 match &graph[*index] {
1311 TaskGraphNode::Decl(decl) => {
1312 state
1313 .evaluate_decl(id, decl)
1314 .await
1315 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1316 }
1317 TaskGraphNode::Output(decl) => {
1318 state
1319 .evaluate_output(id, decl, &evaluated)
1320 .await
1321 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1322 }
1323 _ => {
1324 unreachable!(
1325 "only declarations and outputs should be evaluated after the command"
1326 )
1327 }
1328 }
1329 }
1330
1331 let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
1333 if let Some(section) = definition.output() {
1334 let indexes: HashMap<_, _> = section
1335 .declarations()
1336 .enumerate()
1337 .map(|(i, d)| (d.name().hashable(), i))
1338 .collect();
1339 outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
1340 }
1341
1342 write_json_file(task_eval_root.join(OUTPUTS_FILE), &outputs)?;
1344
1345 evaluated.outputs = Ok(outputs);
1346 Ok(evaluated)
1347 }
1348}
1349
1350impl<'a> State<'a> {
1351 async fn evaluate_input(
1353 &mut self,
1354 id: &str,
1355 decl: &Decl<SyntaxNode>,
1356 inputs: &TaskInputs,
1357 ) -> Result<(), Diagnostic> {
1358 let name = decl.name();
1359 let decl_ty = decl.ty();
1360 let expected_ty = crate::convert_ast_type_v1(self.document, &decl_ty)?;
1361
1362 let (value, span) = match inputs.get(name.text()) {
1364 Some(input) => {
1365 if input.is_none()
1368 && !expected_ty.is_optional()
1369 && self
1370 .document
1371 .version()
1372 .map(|v| v >= SupportedVersion::V1(V1::Two))
1373 .unwrap_or(false)
1374 && let Some(expr) = decl.expr()
1375 {
1376 debug!(
1377 task_id = id,
1378 task_name = self.task.name(),
1379 document = self.document.uri().as_str(),
1380 input_name = name.text(),
1381 "evaluating input default expression"
1382 );
1383
1384 let mut evaluator =
1385 ExprEvaluator::new(TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX));
1386 (evaluator.evaluate_expr(&expr).await?, expr.span())
1387 } else {
1388 (input.clone(), name.span())
1389 }
1390 }
1391 None => match decl.expr() {
1392 Some(expr) => {
1393 debug!(
1394 task_id = id,
1395 task_name = self.task.name(),
1396 document = self.document.uri().as_str(),
1397 input_name = name.text(),
1398 "evaluating input default expression"
1399 );
1400
1401 let mut evaluator =
1402 ExprEvaluator::new(TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX));
1403 (evaluator.evaluate_expr(&expr).await?, expr.span())
1404 }
1405 _ => {
1406 assert!(expected_ty.is_optional(), "type should be optional");
1407 (Value::new_none(expected_ty.clone()), name.span())
1408 }
1409 },
1410 };
1411
1412 let mut value = value
1414 .coerce(
1415 Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)),
1416 &expected_ty,
1417 )
1418 .map_err(|e| runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), span))?;
1419
1420 self.add_backend_inputs(
1422 decl_ty.is_optional(),
1423 &mut value,
1424 self.transferer().clone(),
1425 self.top_level.backend.needs_local_inputs(),
1426 )
1427 .await
1428 .map_err(|e| {
1429 decl_evaluation_failed(
1430 e,
1431 self.task.name(),
1432 true,
1433 name.text(),
1434 Some(Io::Input),
1435 name.span(),
1436 )
1437 })?;
1438
1439 self.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1441 self.inputs.insert(name.text().to_string(), value.clone());
1442
1443 if decl.env().is_some() {
1445 let value = value
1446 .as_primitive()
1447 .expect("value should be primitive")
1448 .raw(Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)))
1449 .to_string();
1450 self.env.insert(name.text().to_string(), value);
1451 }
1452
1453 Ok(())
1454 }
1455
1456 async fn evaluate_decl(&mut self, id: &str, decl: &Decl<SyntaxNode>) -> Result<(), Diagnostic> {
1458 let name = decl.name();
1459 debug!(
1460 task_id = id,
1461 task_name = self.task.name(),
1462 document = self.document.uri().as_str(),
1463 decl_name = name.text(),
1464 "evaluating private declaration",
1465 );
1466
1467 let decl_ty = decl.ty();
1468 let ty = crate::convert_ast_type_v1(self.document, &decl_ty)?;
1469
1470 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX));
1471
1472 let expr = decl.expr().expect("private decls should have expressions");
1473 let value = evaluator.evaluate_expr(&expr).await?;
1474 let mut value = value
1475 .coerce(
1476 Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)),
1477 &ty,
1478 )
1479 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1480
1481 self.add_backend_inputs(
1483 decl_ty.is_optional(),
1484 &mut value,
1485 self.transferer().clone(),
1486 self.top_level.backend.needs_local_inputs(),
1487 )
1488 .await
1489 .map_err(|e| {
1490 decl_evaluation_failed(e, self.task.name(), true, name.text(), None, name.span())
1491 })?;
1492
1493 self.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1494
1495 if decl.env().is_some() {
1497 let value = value
1498 .as_primitive()
1499 .expect("value should be primitive")
1500 .raw(Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)))
1501 .to_string();
1502 self.env.insert(name.text().to_string(), value);
1503 }
1504
1505 Ok(())
1506 }
1507
1508 async fn evaluate_runtime_section(
1512 &mut self,
1513 id: &str,
1514 section: &RuntimeSection<SyntaxNode>,
1515 inputs: &TaskInputs,
1516 ) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
1517 debug!(
1518 task_id = id,
1519 task_name = self.task.name(),
1520 document = self.document.uri().as_str(),
1521 "evaluating runtimes section",
1522 );
1523
1524 let mut requirements = HashMap::new();
1525 let mut hints = HashMap::new();
1526
1527 let version = self
1528 .document
1529 .version()
1530 .expect("document should have version");
1531
1532 let scope_index = if version >= SupportedVersion::V1(V1::Three) {
1534 TASK_SCOPE_INDEX
1535 } else {
1536 ROOT_SCOPE_INDEX
1537 };
1538
1539 for item in section.items() {
1540 let name = item.name();
1541 match inputs.requirement(name.text()) {
1542 Some(value) => {
1543 requirements.insert(name.text().to_string(), value.clone());
1544 continue;
1545 }
1546 _ => {
1547 if let Some(value) = inputs.hint(name.text()) {
1548 hints.insert(name.text().to_string(), value.clone());
1549 continue;
1550 }
1551 }
1552 }
1553
1554 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(self, scope_index));
1555
1556 let (types, requirement) = match task_requirement_types(version, name.text()) {
1557 Some(types) => (Some(types), true),
1558 None => match task_hint_types(version, name.text(), false) {
1559 Some(types) => (Some(types), false),
1560 None => (None, false),
1561 },
1562 };
1563
1564 let expr = item.expr();
1566 let mut value = evaluator.evaluate_expr(&expr).await?;
1567 if let Some(types) = types {
1568 value = types
1569 .iter()
1570 .find_map(|ty| {
1571 value
1572 .coerce(Some(&TaskEvaluationContext::new(self, scope_index)), ty)
1573 .ok()
1574 })
1575 .ok_or_else(|| {
1576 multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1577 })?;
1578 }
1579
1580 if requirement {
1581 requirements.insert(name.text().to_string(), value);
1582 } else {
1583 hints.insert(name.text().to_string(), value);
1584 }
1585 }
1586
1587 Ok((requirements, hints))
1588 }
1589
1590 async fn evaluate_requirements_section(
1592 &mut self,
1593 id: &str,
1594 section: &RequirementsSection<SyntaxNode>,
1595 inputs: &TaskInputs,
1596 ) -> Result<HashMap<String, Value>, Diagnostic> {
1597 debug!(
1598 task_id = id,
1599 task_name = self.task.name(),
1600 document = self.document.uri().as_str(),
1601 "evaluating requirements",
1602 );
1603
1604 let mut requirements = HashMap::new();
1605
1606 let version = self
1607 .document
1608 .version()
1609 .expect("document should have version");
1610
1611 let scope_index = if version >= SupportedVersion::V1(V1::Three) {
1613 TASK_SCOPE_INDEX
1614 } else {
1615 ROOT_SCOPE_INDEX
1616 };
1617
1618 for item in section.items() {
1619 let name = item.name();
1620 if let Some(value) = inputs.requirement(name.text()) {
1621 requirements.insert(name.text().to_string(), value.clone());
1622 continue;
1623 }
1624
1625 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(self, scope_index));
1626
1627 let types =
1628 task_requirement_types(version, name.text()).expect("requirement should be known");
1629
1630 let expr = item.expr();
1632 let value = evaluator.evaluate_expr(&expr).await?;
1633 let value = types
1634 .iter()
1635 .find_map(|ty| {
1636 value
1637 .coerce(Some(&TaskEvaluationContext::new(self, scope_index)), ty)
1638 .ok()
1639 })
1640 .ok_or_else(|| {
1641 multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1642 })?;
1643
1644 requirements.insert(name.text().to_string(), value);
1645 }
1646
1647 Ok(requirements)
1648 }
1649
1650 async fn evaluate_hints_section(
1652 &mut self,
1653 id: &str,
1654 section: &TaskHintsSection<SyntaxNode>,
1655 inputs: &TaskInputs,
1656 ) -> Result<HashMap<String, Value>, Diagnostic> {
1657 debug!(
1658 task_id = id,
1659 task_name = self.task.name(),
1660 document = self.document.uri().as_str(),
1661 "evaluating hints section",
1662 );
1663
1664 let mut hints = HashMap::new();
1665
1666 let version = self
1667 .document
1668 .version()
1669 .expect("document should have version");
1670
1671 let scope_index = if version >= SupportedVersion::V1(V1::Three) {
1673 TASK_SCOPE_INDEX
1674 } else {
1675 ROOT_SCOPE_INDEX
1676 };
1677
1678 for item in section.items() {
1679 let name = item.name();
1680 if let Some(value) = inputs.hint(name.text()) {
1681 hints.insert(name.text().to_string(), value.clone());
1682 continue;
1683 }
1684
1685 let mut evaluator =
1686 ExprEvaluator::new(TaskEvaluationContext::new(self, scope_index).with_task());
1687
1688 let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
1689 hints.insert(name.text().to_string(), value);
1690 }
1691
1692 Ok(hints)
1693 }
1694
1695 async fn evaluate_command(
1699 &mut self,
1700 id: &str,
1701 section: &CommandSection<SyntaxNode>,
1702 ) -> EvaluationResult<String> {
1703 debug!(
1704 task_id = id,
1705 task_name = self.task.name(),
1706 document = self.document.uri().as_str(),
1707 "evaluating command section",
1708 );
1709
1710 let document = self.document.clone();
1711 let mut command = String::new();
1712 match section.strip_whitespace() {
1713 Some(parts) => {
1714 let mut evaluator =
1715 ExprEvaluator::new(TaskEvaluationContext::new(self, TASK_SCOPE_INDEX));
1716
1717 for part in parts {
1718 match part {
1719 StrippedCommandPart::Text(t) => {
1720 command.push_str(t.as_str());
1721 }
1722 StrippedCommandPart::Placeholder(placeholder) => {
1723 evaluator
1724 .evaluate_placeholder(&placeholder, &mut command)
1725 .await
1726 .map_err(|d| EvaluationError::new(document.clone(), d))?;
1727 }
1728 }
1729 }
1730 }
1731 _ => {
1732 warn!(
1733 "command for task `{task}` in `{uri}` has mixed indentation; whitespace \
1734 stripping was skipped",
1735 task = self.task.name(),
1736 uri = self.document.uri(),
1737 );
1738
1739 let mut evaluator =
1740 ExprEvaluator::new(TaskEvaluationContext::new(self, TASK_SCOPE_INDEX));
1741
1742 let heredoc = section.is_heredoc();
1743 for part in section.parts() {
1744 match part {
1745 CommandPart::Text(t) => {
1746 t.unescape_to(heredoc, &mut command);
1747 }
1748 CommandPart::Placeholder(placeholder) => {
1749 evaluator
1750 .evaluate_placeholder(&placeholder, &mut command)
1751 .await
1752 .map_err(|d| EvaluationError::new(document.clone(), d))?;
1753 }
1754 }
1755 }
1756 }
1757 }
1758
1759 Ok(command)
1760 }
1761
1762 async fn evaluate_sections(
1770 &mut self,
1771 id: &str,
1772 definition: &TaskDefinition<SyntaxNode>,
1773 inputs: &TaskInputs,
1774 attempt: u64,
1775 previous_task_data: Option<Arc<TaskPostEvaluationData>>,
1776 ) -> EvaluationResult<EvaluatedSections> {
1777 let version = self.document.version();
1778
1779 let task_meta = definition
1781 .metadata()
1782 .map(|s| Object::from_v1_metadata(s.items()))
1783 .unwrap_or_else(Object::empty);
1784 let task_parameter_meta = definition
1785 .parameter_metadata()
1786 .map(|s| Object::from_v1_metadata(s.items()))
1787 .unwrap_or_else(Object::empty);
1788 let task_ext = Object::empty();
1791
1792 if version >= Some(SupportedVersion::V1(V1::Three)) {
1795 let mut task = TaskPreEvaluationValue::new(
1796 self.task.name(),
1797 id,
1798 attempt.try_into().expect("attempt should fit in i64"),
1799 task_meta.clone(),
1800 task_parameter_meta.clone(),
1801 task_ext.clone(),
1802 );
1803
1804 if let Some(prev_data) = &previous_task_data {
1805 task.set_previous(prev_data.clone());
1806 }
1807
1808 let scope = &mut self.scopes[TASK_SCOPE_INDEX.0];
1809 if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1810 *v = HiddenValue::TaskPreEvaluation(task).into();
1811 } else {
1812 scope.insert(TASK_VAR_NAME, HiddenValue::TaskPreEvaluation(task));
1813 }
1814 }
1815
1816 let (requirements, hints) = match definition.runtime() {
1818 Some(section) => self
1819 .evaluate_runtime_section(id, §ion, inputs)
1820 .await
1821 .map_err(|d| EvaluationError::new(self.document.clone(), d))?,
1822 _ => (
1823 match definition.requirements() {
1824 Some(section) => self
1825 .evaluate_requirements_section(id, §ion, inputs)
1826 .await
1827 .map_err(|d| EvaluationError::new(self.document.clone(), d))?,
1828 None => Default::default(),
1829 },
1830 match definition.hints() {
1831 Some(section) => self
1832 .evaluate_hints_section(id, §ion, inputs)
1833 .await
1834 .map_err(|d| EvaluationError::new(self.document.clone(), d))?,
1835 None => Default::default(),
1836 },
1837 ),
1838 };
1839
1840 if version >= Some(SupportedVersion::V1(V1::Two)) {
1844 let constraints = self
1846 .top_level
1847 .backend
1848 .constraints(&requirements, &hints)
1849 .with_context(|| {
1850 format!(
1851 "failed to get constraints for task `{task}`",
1852 task = self.task.name()
1853 )
1854 })?;
1855
1856 let max_retries = max_retries(&requirements, &self.top_level.config);
1857
1858 let mut task = TaskPostEvaluationValue::new(
1859 self.task.name(),
1860 id,
1861 constraints,
1862 max_retries.try_into().with_context(|| {
1863 format!(
1864 "the number of max retries is too large to run task `{task}`",
1865 task = self.task.name()
1866 )
1867 })?,
1868 attempt.try_into().with_context(|| {
1869 format!(
1870 "too many attempts were made to run task `{task}`",
1871 task = self.task.name()
1872 )
1873 })?,
1874 task_meta,
1875 task_parameter_meta,
1876 task_ext,
1877 );
1878
1879 if let Some(version) = version
1881 && version >= SupportedVersion::V1(V1::Three)
1882 && let Some(prev_data) = &previous_task_data
1883 {
1884 task.set_previous(prev_data.clone());
1885 }
1886
1887 let scope = &mut self.scopes[TASK_SCOPE_INDEX.0];
1888 if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1889 *v = HiddenValue::TaskPostEvaluation(task).into();
1890 } else {
1891 scope.insert(TASK_VAR_NAME, HiddenValue::TaskPostEvaluation(task));
1892 }
1893 }
1894
1895 let command = self
1896 .evaluate_command(
1897 id,
1898 &definition.command().expect("must have command section"),
1899 )
1900 .await?;
1901
1902 Ok(EvaluatedSections {
1903 command,
1904 requirements: Arc::new(requirements),
1905 hints: Arc::new(hints),
1906 })
1907 }
1908
1909 async fn evaluate_output(
1911 &mut self,
1912 id: &str,
1913 decl: &Decl<SyntaxNode>,
1914 evaluated: &EvaluatedTask,
1915 ) -> Result<(), Diagnostic> {
1916 let name = decl.name();
1917 debug!(
1918 task_id = id,
1919 task_name = self.task.name(),
1920 document = self.document.uri().as_str(),
1921 output_name = name.text(),
1922 "evaluating output",
1923 );
1924
1925 let decl_ty = decl.ty();
1926 let ty = crate::convert_ast_type_v1(self.document, &decl_ty)?;
1927 let mut evaluator = ExprEvaluator::new(
1928 TaskEvaluationContext::new(self, TASK_SCOPE_INDEX)
1929 .with_work_dir(&evaluated.result.work_dir)
1930 .with_stdout(&evaluated.result.stdout)
1931 .with_stderr(&evaluated.result.stderr),
1932 );
1933
1934 let expr = decl.expr().expect("outputs should have expressions");
1935 let value = evaluator.evaluate_expr(&expr).await?;
1936
1937 let mut value = value
1939 .coerce(Some(evaluator.context()), &ty)
1940 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1941 value = value
1942 .resolve_paths(
1943 ty.is_optional(),
1944 self.base_dir.as_local(),
1945 Some(self.transferer().as_ref()),
1946 &|path| {
1947 let mut output_path = evaluated.result.work_dir.join(path.as_str())?;
1949
1950 let output_path = match (&mut output_path, &evaluated.result.work_dir) {
1952 (EvaluationPath::Local(joined), EvaluationPath::Local(base))
1953 if joined.starts_with(base)
1954 || joined == evaluated.stdout().as_file().unwrap().as_str()
1955 || joined == evaluated.stderr().as_file().unwrap().as_str() =>
1956 {
1957 HostPath::new(String::try_from(output_path)?)
1960 }
1961 (EvaluationPath::Local(_), EvaluationPath::Local(_)) => {
1962 self.path_map
1965 .get_by_left(path)
1966 .ok_or_else(|| {
1967 anyhow!(
1968 "guest path `{path}` is not an input or within the task's \
1969 working directory"
1970 )
1971 })?
1972 .0
1973 .clone()
1974 .into()
1975 }
1976 (EvaluationPath::Local(_), EvaluationPath::Remote(_)) => {
1977 bail!(
1979 "cannot access guest path `{path}` from a remotely executing task"
1980 )
1981 }
1982 (EvaluationPath::Remote(_), _) => {
1983 HostPath::new(String::try_from(output_path)?)
1984 }
1985 };
1986
1987 Ok(output_path)
1988 },
1989 )
1990 .await
1991 .map_err(|e| {
1992 decl_evaluation_failed(
1993 e,
1994 self.task.name(),
1995 true,
1996 name.text(),
1997 Some(Io::Output),
1998 name.span(),
1999 )
2000 })?;
2001
2002 self.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
2003 Ok(())
2004 }
2005
2006 async fn localize_inputs(&mut self, task_id: &str) -> EvaluationResult<Vec<Input>> {
2010 if self.top_level.backend.needs_local_inputs() {
2012 let mut downloads = JoinSet::new();
2013
2014 for (idx, input) in self.backend_inputs.as_slice_mut().iter_mut().enumerate() {
2016 if input.local_path().is_some() {
2017 continue;
2018 }
2019
2020 if let EvaluationPath::Remote(url) = input.path() {
2021 let transferer = self.top_level.transferer.clone();
2022 let url = url.clone();
2023 downloads.spawn(async move {
2024 transferer
2025 .download(&url)
2026 .await
2027 .map(|l| (idx, l))
2028 .with_context(|| anyhow!("failed to localize `{url}`"))
2029 });
2030 }
2031 }
2032
2033 while let Some(result) = downloads.join_next().await {
2035 match result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}"))) {
2036 Ok((idx, location)) => {
2037 self.backend_inputs.as_slice_mut()[idx].set_location(location);
2038 }
2039 Err(e) => {
2040 return Err(EvaluationError::new(
2041 self.document.clone(),
2042 task_localization_failed(e, self.task.name(), self.task.name_span()),
2043 ));
2044 }
2045 }
2046 }
2047 }
2048
2049 if enabled!(Level::DEBUG) {
2050 for input in self.backend_inputs.as_slice() {
2051 match (
2052 input.path().as_local().is_some(),
2053 input.local_path(),
2054 input.guest_path(),
2055 ) {
2056 (true, _, None) | (false, None, None) => {}
2058 (true, _, Some(guest_path)) => {
2060 debug!(
2061 task_id,
2062 task_name = self.task.name(),
2063 document = self.document.uri().as_str(),
2064 "task input `{path}` mapped to `{guest_path}`",
2065 path = input.path().display(),
2066 );
2067 }
2068 (false, Some(local_path), None) => {
2070 debug!(
2071 task_id,
2072 task_name = self.task.name(),
2073 document = self.document.uri().as_str(),
2074 "task input `{path}` downloaded to `{local_path}`",
2075 path = input.path().display(),
2076 local_path = local_path.display()
2077 );
2078 }
2079 (false, None, Some(guest_path)) => {
2081 debug!(
2082 task_id,
2083 task_name = self.task.name(),
2084 document = self.document.uri().as_str(),
2085 "task input `{path}` mapped to `{guest_path}`",
2086 path = input.path().display(),
2087 );
2088 }
2089 (false, Some(local_path), Some(guest_path)) => {
2091 debug!(
2092 task_id,
2093 task_name = self.task.name(),
2094 document = self.document.uri().as_str(),
2095 "task input `{path}` downloaded to `{local_path}` and mapped to \
2096 `{guest_path}`",
2097 path = input.path().display(),
2098 local_path = local_path.display(),
2099 );
2100 }
2101 }
2102 }
2103 }
2104
2105 Ok(self.backend_inputs.as_slice().into())
2106 }
2107}
2108
2109#[cfg(test)]
2110mod test {
2111 use std::fs;
2112 use std::path::Path;
2113
2114 use pretty_assertions::assert_eq;
2115 use tempfile::tempdir;
2116 use tracing_test::traced_test;
2117 use wdl_analysis::Analyzer;
2118 use wdl_analysis::Config as AnalysisConfig;
2119 use wdl_analysis::DiagnosticsConfig;
2120
2121 use crate::CancellationContext;
2122 use crate::EvaluatedTask;
2123 use crate::Events;
2124 use crate::TaskInputs;
2125 use crate::config::BackendConfig;
2126 use crate::config::CallCachingMode;
2127 use crate::config::Config;
2128 use crate::v1::TopLevelEvaluator;
2129
2130 async fn evaluate_task(mode: CallCachingMode, root_dir: &Path, source: &str) -> EvaluatedTask {
2132 fs::write(root_dir.join("source.wdl"), source).expect("failed to write WDL source file");
2133
2134 let analyzer = Analyzer::new(
2136 AnalysisConfig::default().with_diagnostics_config(DiagnosticsConfig::except_all()),
2137 |(), _, _, _| async {},
2138 );
2139 analyzer
2140 .add_directory(root_dir)
2141 .await
2142 .expect("failed to add directory");
2143 let results = analyzer
2144 .analyze(())
2145 .await
2146 .expect("failed to analyze document");
2147 assert_eq!(results.len(), 1, "expected only one result");
2148
2149 let document = results.first().expect("should have result").document();
2150
2151 let mut config = Config::default();
2152 config.task.cache = mode;
2153 config.task.cache_dir = Some(root_dir.join("cache"));
2154 config
2155 .backends
2156 .insert("default".into(), BackendConfig::Local(Default::default()));
2157
2158 let evaluator = TopLevelEvaluator::new(
2159 &root_dir.join("runs"),
2160 config,
2161 CancellationContext::default(),
2162 Events::disabled(),
2163 )
2164 .await
2165 .unwrap();
2166
2167 let runs_dir = root_dir.join("runs");
2168 evaluator
2169 .evaluate_task(
2170 document,
2171 document.task_by_name("test").expect("should have task"),
2172 &TaskInputs::default(),
2173 &runs_dir,
2174 )
2175 .await
2176 .unwrap()
2177 }
2178
2179 #[tokio::test]
2181 #[traced_test]
2182 async fn cache_off() {
2183 const SOURCE: &str = r#"
2184version 1.2
2185
2186task test {
2187 input {
2188 String name = "friend"
2189 }
2190
2191 command <<<echo "hello, ~{name}!">>>
2192
2193 output {
2194 String message = read_string(stdout())
2195 }
2196}
2197"#;
2198
2199 let root_dir = tempdir().expect("failed to create temporary directory");
2200 let evaluated = evaluate_task(CallCachingMode::Off, root_dir.path(), SOURCE).await;
2201 assert!(!evaluated.cached());
2202 assert_eq!(evaluated.exit_code(), 0);
2203 assert_eq!(
2204 fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2205 .unwrap()
2206 .trim(),
2207 "hello, friend!"
2208 );
2209 assert_eq!(
2210 fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2211 ""
2212 );
2213 assert!(
2214 logs_contain("call caching is disabled"),
2215 "expected cache to be off"
2216 );
2217 }
2218
2219 #[tokio::test]
2221 #[traced_test]
2222 async fn cache_on() {
2223 const SOURCE: &str = r#"
2224version 1.2
2225
2226task test {
2227 input {
2228 String name = "friend"
2229 }
2230
2231 command <<<echo "hello, ~{name}!">>>
2232
2233 output {
2234 String message = read_string(stdout())
2235 }
2236}
2237"#;
2238
2239 let root_dir = tempdir().expect("failed to create temporary directory");
2240 let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
2241 assert!(!evaluated.cached());
2242 assert_eq!(evaluated.exit_code(), 0);
2243 assert_eq!(
2244 fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2245 .unwrap()
2246 .trim(),
2247 "hello, friend!"
2248 );
2249 assert_eq!(
2250 fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2251 ""
2252 );
2253 assert!(logs_contain("using call cache"), "expected cache to be on");
2254 assert!(
2255 logs_contain("call cache miss"),
2256 "expected first run to miss the cache"
2257 );
2258 assert!(logs_contain("spawning task"), "expected the task to spawn");
2259
2260 let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
2261 assert!(evaluated.cached());
2262 assert_eq!(evaluated.exit_code(), 0);
2263 assert_eq!(
2264 fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2265 .unwrap()
2266 .trim(),
2267 "hello, friend!"
2268 );
2269 assert_eq!(
2270 fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2271 ""
2272 );
2273 assert!(
2274 logs_contain("task execution was skipped"),
2275 "expected second run to skip execution"
2276 );
2277 }
2278
2279 #[tokio::test]
2282 #[traced_test]
2283 async fn cache_on_not_cacheable() {
2284 const SOURCE: &str = r#"
2285version 1.2
2286
2287task test {
2288 input {
2289 String name = "friend"
2290 }
2291
2292 command <<<echo "hello, ~{name}!">>>
2293
2294 hints {
2295 cacheable: false
2296 }
2297
2298 output {
2299 String message = read_string(stdout())
2300 }
2301}
2302"#;
2303
2304 let root_dir = tempdir().expect("failed to create temporary directory");
2305 let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
2306 assert!(!evaluated.cached());
2307 assert_eq!(evaluated.exit_code(), 0);
2308 assert_eq!(
2309 fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2310 .unwrap()
2311 .trim(),
2312 "hello, friend!"
2313 );
2314 assert_eq!(
2315 fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2316 ""
2317 );
2318 assert!(logs_contain("using call cache"), "expected cache to be on");
2319 assert!(
2320 logs_contain("task is not cacheable due to `cacheable` hint being set to `false`"),
2321 "expected task to not be cacheable"
2322 );
2323
2324 let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
2325 assert!(!evaluated.cached());
2326 assert_eq!(evaluated.exit_code(), 0);
2327 assert_eq!(
2328 fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2329 .unwrap()
2330 .trim(),
2331 "hello, friend!"
2332 );
2333 assert_eq!(
2334 fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2335 ""
2336 );
2337 assert!(
2338 !logs_contain("task execution was skipped"),
2339 "expected second run to not skip execution"
2340 );
2341 }
2342
2343 #[tokio::test]
2346 #[traced_test]
2347 async fn cache_explicit() {
2348 const SOURCE: &str = r#"
2349version 1.2
2350
2351task test {
2352 input {
2353 String name = "friend"
2354 }
2355
2356 command <<<echo "hello, ~{name}!">>>
2357
2358 output {
2359 String message = read_string(stdout())
2360 }
2361}
2362"#;
2363
2364 let root_dir = tempdir().expect("failed to create temporary directory");
2365 let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
2366 assert!(!evaluated.cached());
2367 assert_eq!(evaluated.exit_code(), 0);
2368 assert_eq!(
2369 fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2370 .unwrap()
2371 .trim(),
2372 "hello, friend!"
2373 );
2374 assert_eq!(
2375 fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2376 ""
2377 );
2378 assert!(logs_contain("using call cache"), "expected cache to be on");
2379 assert!(
2380 logs_contain(
2381 "task is not cacheable due to `cacheable` hint not being explicitly set to `true`"
2382 ),
2383 "expected task to not be cacheable"
2384 );
2385
2386 let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
2387 assert!(!evaluated.cached());
2388 assert_eq!(evaluated.exit_code(), 0);
2389 assert_eq!(
2390 fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2391 .unwrap()
2392 .trim(),
2393 "hello, friend!"
2394 );
2395 assert_eq!(
2396 fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2397 ""
2398 );
2399 assert!(
2400 !logs_contain("task execution was skipped"),
2401 "expected second run to not skip execution"
2402 );
2403 }
2404
2405 #[tokio::test]
2408 #[traced_test]
2409 async fn cache_explicit_cacheable() {
2410 const SOURCE: &str = r#"
2411version 1.2
2412
2413task test {
2414 input {
2415 String name = "friend"
2416 }
2417
2418 command <<<echo "hello, ~{name}!">>>
2419
2420 hints {
2421 cacheable: true
2422 }
2423
2424 output {
2425 String message = read_string(stdout())
2426 }
2427}
2428"#;
2429
2430 let root_dir = tempdir().expect("failed to create temporary directory");
2431 let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
2432 assert!(!evaluated.cached());
2433 assert_eq!(evaluated.exit_code(), 0);
2434 assert_eq!(
2435 fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2436 .unwrap()
2437 .trim(),
2438 "hello, friend!"
2439 );
2440 assert_eq!(
2441 fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2442 ""
2443 );
2444 assert!(logs_contain("using call cache"), "expected cache to be on");
2445 assert!(
2446 logs_contain("call cache miss"),
2447 "expected first run to miss the cache"
2448 );
2449 assert!(logs_contain("spawning task"), "expected the task to spawn");
2450
2451 let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
2452 assert!(evaluated.cached());
2453 assert_eq!(evaluated.exit_code(), 0);
2454 assert_eq!(
2455 fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
2456 .unwrap()
2457 .trim(),
2458 "hello, friend!"
2459 );
2460 assert_eq!(
2461 fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
2462 ""
2463 );
2464 assert!(
2465 logs_contain("task execution was skipped"),
2466 "expected second run to skip execution"
2467 );
2468 }
2469}