1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::mem;
7use std::path::Path;
8use std::path::absolute;
9use std::str::FromStr;
10use std::sync::Arc;
11
12use anyhow::Context;
13use anyhow::Result;
14use anyhow::anyhow;
15use anyhow::bail;
16use bimap::BiHashMap;
17use indexmap::IndexMap;
18use petgraph::algo::toposort;
19use tokio::task::JoinSet;
20use tokio_util::sync::CancellationToken;
21use tracing::Level;
22use tracing::debug;
23use tracing::enabled;
24use tracing::info;
25use tracing::warn;
26use wdl_analysis::Document;
27use wdl_analysis::diagnostics::Io;
28use wdl_analysis::diagnostics::multiple_type_mismatch;
29use wdl_analysis::diagnostics::unknown_name;
30use wdl_analysis::document::TASK_VAR_NAME;
31use wdl_analysis::document::Task;
32use wdl_analysis::eval::v1::TaskGraphBuilder;
33use wdl_analysis::eval::v1::TaskGraphNode;
34use wdl_analysis::types::Optional;
35use wdl_analysis::types::PrimitiveType;
36use wdl_analysis::types::Type;
37use wdl_analysis::types::v1::task_hint_types;
38use wdl_analysis::types::v1::task_requirement_types;
39use wdl_ast::Ast;
40use wdl_ast::AstNode;
41use wdl_ast::AstToken;
42use wdl_ast::Diagnostic;
43use wdl_ast::Span;
44use wdl_ast::SupportedVersion;
45use wdl_ast::v1::CommandPart;
46use wdl_ast::v1::CommandSection;
47use wdl_ast::v1::Decl;
48use wdl_ast::v1::RequirementsSection;
49use wdl_ast::v1::RuntimeSection;
50use wdl_ast::v1::StrippedCommandPart;
51use wdl_ast::v1::TASK_HINT_DISKS;
52use wdl_ast::v1::TASK_HINT_MAX_CPU;
53use wdl_ast::v1::TASK_HINT_MAX_CPU_ALIAS;
54use wdl_ast::v1::TASK_HINT_MAX_MEMORY;
55use wdl_ast::v1::TASK_HINT_MAX_MEMORY_ALIAS;
56use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER;
57use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER_ALIAS;
58use wdl_ast::v1::TASK_REQUIREMENT_CPU;
59use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
60use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES;
61use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES_ALIAS;
62use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
63use wdl_ast::v1::TaskDefinition;
64use wdl_ast::v1::TaskHintsSection;
65use wdl_ast::version::V1;
66
67use crate::Coercible;
68use crate::EvaluationContext;
69use crate::EvaluationError;
70use crate::EvaluationResult;
71use crate::Events;
72use crate::GuestPath;
73use crate::HostPath;
74use crate::Input;
75use crate::InputKind;
76use crate::ONE_GIBIBYTE;
77use crate::Outputs;
78use crate::Scope;
79use crate::ScopeIndex;
80use crate::ScopeRef;
81use crate::StorageUnit;
82use crate::TaskExecutionBackend;
83use crate::TaskInputs;
84use crate::TaskSpawnInfo;
85use crate::TaskSpawnRequest;
86use crate::TaskValue;
87use crate::Value;
88use crate::config::Config;
89use crate::config::MAX_RETRIES;
90use crate::convert_unit_string;
91use crate::diagnostics::decl_evaluation_failed;
92use crate::diagnostics::runtime_type_mismatch;
93use crate::diagnostics::task_execution_failed;
94use crate::diagnostics::task_localization_failed;
95use crate::eval::EvaluatedTask;
96use crate::eval::trie::InputTrie;
97use crate::http::HttpTransferer;
98use crate::http::Transferer;
99use crate::path::EvaluationPath;
100use crate::path::is_file_url;
101use crate::path::is_url;
102use crate::tree::SyntaxNode;
103use crate::v1::ExprEvaluator;
104use crate::v1::INPUTS_FILE;
105use crate::v1::OUTPUTS_FILE;
106use crate::v1::write_json_file;
107
108pub const DEFAULT_TASK_REQUIREMENT_CONTAINER: &str = "ubuntu:latest";
110pub const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
112pub const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * (ONE_GIBIBYTE as i64);
114pub const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
116pub const DEFAULT_TASK_REQUIREMENT_DISKS: f64 = 1.0;
118
119const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
121const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
123const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
126
127pub(crate) fn container<'a>(
129 requirements: &'a HashMap<String, Value>,
130 default: Option<&'a str>,
131) -> Cow<'a, str> {
132 requirements
133 .get(TASK_REQUIREMENT_CONTAINER)
134 .or_else(|| requirements.get(TASK_REQUIREMENT_CONTAINER_ALIAS))
135 .and_then(|v| -> Option<Cow<'_, str>> {
136 if let Some(array) = v.as_array() {
140 return array.as_slice().first().map(|v| {
141 v.as_string()
142 .expect("type should be string")
143 .as_ref()
144 .into()
145 });
146 }
147
148 Some(
149 v.coerce(None, &PrimitiveType::String.into())
150 .expect("type should coerce")
151 .unwrap_string()
152 .as_ref()
153 .clone()
154 .into(),
155 )
156 })
157 .and_then(|v| {
158 if v == "*" { None } else { Some(v) }
160 })
161 .unwrap_or_else(|| {
162 default
163 .map(Into::into)
164 .unwrap_or(DEFAULT_TASK_REQUIREMENT_CONTAINER.into())
165 })
166}
167
168pub(crate) fn cpu(requirements: &HashMap<String, Value>) -> f64 {
170 requirements
171 .get(TASK_REQUIREMENT_CPU)
172 .map(|v| {
173 v.coerce(None, &PrimitiveType::Float.into())
174 .expect("type should coerce")
175 .unwrap_float()
176 })
177 .unwrap_or(DEFAULT_TASK_REQUIREMENT_CPU)
178}
179
180pub(crate) fn max_cpu(hints: &HashMap<String, Value>) -> Option<f64> {
182 hints
183 .get(TASK_HINT_MAX_CPU)
184 .or_else(|| hints.get(TASK_HINT_MAX_CPU_ALIAS))
185 .map(|v| {
186 v.coerce(None, &PrimitiveType::Float.into())
187 .expect("type should coerce")
188 .unwrap_float()
189 })
190}
191
192pub(crate) fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
194 Ok(requirements
195 .get(TASK_REQUIREMENT_MEMORY)
196 .map(|v| {
197 if let Some(v) = v.as_integer() {
198 return Ok(v);
199 }
200
201 if let Some(s) = v.as_string() {
202 return convert_unit_string(s)
203 .and_then(|v| v.try_into().ok())
204 .with_context(|| {
205 format!("task specifies an invalid `memory` requirement `{s}`")
206 });
207 }
208
209 unreachable!("value should be an integer or string");
210 })
211 .transpose()?
212 .unwrap_or(DEFAULT_TASK_REQUIREMENT_MEMORY))
213}
214
215pub(crate) fn max_memory(hints: &HashMap<String, Value>) -> Result<Option<i64>> {
217 hints
218 .get(TASK_HINT_MAX_MEMORY)
219 .or_else(|| hints.get(TASK_HINT_MAX_MEMORY_ALIAS))
220 .map(|v| {
221 if let Some(v) = v.as_integer() {
222 return Ok(v);
223 }
224
225 if let Some(s) = v.as_string() {
226 return convert_unit_string(s)
227 .and_then(|v| v.try_into().ok())
228 .with_context(|| {
229 format!("task specifies an invalid `memory` requirement `{s}`")
230 });
231 }
232
233 unreachable!("value should be an integer or string");
234 })
235 .transpose()
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
242pub enum DiskType {
243 SSD,
245 HDD,
247}
248
249impl FromStr for DiskType {
250 type Err = ();
251
252 fn from_str(s: &str) -> Result<Self, Self::Err> {
253 match s {
254 "SSD" => Ok(Self::SSD),
255 "HDD" => Ok(Self::HDD),
256 _ => Err(()),
257 }
258 }
259}
260
261pub struct DiskRequirement {
263 pub size: i64,
265
266 pub ty: Option<DiskType>,
268}
269
270pub(crate) fn disks<'a>(
274 requirements: &'a HashMap<String, Value>,
275 hints: &HashMap<String, Value>,
276) -> Result<HashMap<&'a str, DiskRequirement>> {
277 fn lookup_type(mount_point: Option<&str>, hints: &HashMap<String, Value>) -> Option<DiskType> {
281 hints.get(TASK_HINT_DISKS).and_then(|v| {
282 if let Some(ty) = v.as_string() {
283 return ty.parse().ok();
284 }
285
286 if let Some(map) = v.as_map() {
287 if let Some((_, v)) = map.iter().find(|(k, _)| match (k, mount_point) {
290 (None, None) => true,
291 (None, Some(_)) | (Some(_), None) => false,
292 (Some(k), Some(mount_point)) => k
293 .as_string()
294 .map(|k| k.as_str() == mount_point)
295 .unwrap_or(false),
296 }) {
297 return v.as_string().and_then(|ty| ty.parse().ok());
298 }
299 }
300
301 None
302 })
303 }
304
305 fn parse_disk_spec(spec: &str) -> Option<(i64, Option<&str>)> {
308 let iter = spec.split_whitespace();
309 let mut first = None;
310 let mut second = None;
311 let mut third = None;
312
313 for part in iter {
314 if first.is_none() {
315 first = Some(part);
316 continue;
317 }
318
319 if second.is_none() {
320 second = Some(part);
321 continue;
322 }
323
324 if third.is_none() {
325 third = Some(part);
326 continue;
327 }
328
329 return None;
330 }
331
332 match (first, second, third) {
333 (None, None, None) => None,
334 (Some(size), None, None) => {
335 Some((size.parse().ok()?, None))
337 }
338 (Some(first), Some(second), None) => {
339 if let Ok(size) = first.parse() {
341 let unit: StorageUnit = second.parse().ok()?;
342 let size = unit.bytes(size)? / (ONE_GIBIBYTE as u64);
343 return Some((size.try_into().ok()?, None));
344 }
345
346 if !first.starts_with('/') {
349 return None;
350 }
351
352 Some((second.parse().ok()?, Some(first)))
353 }
354 (Some(mount_point), Some(size), Some(unit)) => {
355 let unit: StorageUnit = unit.parse().ok()?;
357 let size = unit.bytes(size.parse().ok()?)? / (ONE_GIBIBYTE as u64);
358
359 if !mount_point.starts_with('/') {
361 return None;
362 }
363
364 Some((size.try_into().ok()?, Some(mount_point)))
365 }
366 _ => unreachable!("should have one, two, or three values"),
367 }
368 }
369
370 fn insert_disk<'a>(
372 spec: &'a str,
373 hints: &HashMap<String, Value>,
374 disks: &mut HashMap<&'a str, DiskRequirement>,
375 ) -> Result<()> {
376 let (size, mount_point) =
377 parse_disk_spec(spec).with_context(|| format!("invalid disk specification `{spec}"))?;
378
379 let prev = disks.insert(
380 mount_point.unwrap_or("/"),
381 DiskRequirement {
382 size,
383 ty: lookup_type(mount_point, hints),
384 },
385 );
386
387 if prev.is_some() {
388 bail!(
389 "duplicate mount point `{mp}` specified in `disks` requirement",
390 mp = mount_point.unwrap_or("/")
391 );
392 }
393
394 Ok(())
395 }
396
397 let mut disks = HashMap::new();
398 if let Some(v) = requirements.get(TASK_REQUIREMENT_DISKS) {
399 if let Some(size) = v.as_integer() {
400 if size < 0 {
402 bail!("task requirement `disks` cannot be less than zero");
403 }
404
405 disks.insert(
406 "/",
407 DiskRequirement {
408 size,
409 ty: lookup_type(None, hints),
410 },
411 );
412 } else if let Some(spec) = v.as_string() {
413 insert_disk(spec, hints, &mut disks)?;
414 } else if let Some(v) = v.as_array() {
415 for spec in v.as_slice() {
416 insert_disk(
417 spec.as_string().expect("spec should be a string"),
418 hints,
419 &mut disks,
420 )?;
421 }
422 } else {
423 unreachable!("value should be an integer, string, or array");
424 }
425 }
426
427 Ok(disks)
428}
429
430pub(crate) fn preemptible(hints: &HashMap<String, Value>) -> i64 {
436 const TASK_HINT_PREEMPTIBLE: &str = "preemptible";
437 const DEFAULT_TASK_HINT_PREEMPTIBLE: i64 = 0;
438
439 hints
440 .get(TASK_HINT_PREEMPTIBLE)
441 .and_then(|v| {
442 Some(
443 v.coerce(None, &PrimitiveType::Integer.into())
444 .ok()?
445 .unwrap_integer(),
446 )
447 })
448 .unwrap_or(DEFAULT_TASK_HINT_PREEMPTIBLE)
449}
450
451struct TaskEvaluationContext<'a, 'b> {
453 state: &'a mut State<'b>,
455 transferer: &'a dyn Transferer,
457 scope: ScopeIndex,
459 work_dir: Option<&'a EvaluationPath>,
463 stdout: Option<&'a Value>,
467 stderr: Option<&'a Value>,
471 task: bool,
475}
476
477impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
478 pub fn new(
480 state: &'a mut State<'b>,
481 transferer: &'a dyn Transferer,
482 scope: ScopeIndex,
483 ) -> Self {
484 Self {
485 state,
486 transferer,
487 scope,
488 work_dir: None,
489 stdout: None,
490 stderr: None,
491 task: false,
492 }
493 }
494
495 pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
497 self.work_dir = Some(work_dir);
498 self
499 }
500
501 pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
503 self.stdout = Some(stdout);
504 self
505 }
506
507 pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
509 self.stderr = Some(stderr);
510 self
511 }
512
513 pub fn with_task(mut self) -> Self {
517 self.task = true;
518 self
519 }
520}
521
522impl EvaluationContext for TaskEvaluationContext<'_, '_> {
523 fn version(&self) -> SupportedVersion {
524 self.state
525 .document
526 .version()
527 .expect("document should have a version")
528 }
529
530 fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
531 ScopeRef::new(&self.state.scopes, self.scope)
532 .lookup(name)
533 .cloned()
534 .ok_or_else(|| unknown_name(name, span))
535 }
536
537 fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
538 crate::resolve_type_name(self.state.document, name, span)
539 }
540
541 fn base_dir(&self) -> &EvaluationPath {
542 self.work_dir.unwrap_or(&self.state.base_dir)
543 }
544
545 fn temp_dir(&self) -> &Path {
546 self.state.temp_dir
547 }
548
549 fn stdout(&self) -> Option<&Value> {
550 self.stdout
551 }
552
553 fn stderr(&self) -> Option<&Value> {
554 self.stderr
555 }
556
557 fn task(&self) -> Option<&Task> {
558 if self.task {
559 Some(self.state.task)
560 } else {
561 None
562 }
563 }
564
565 fn transferer(&self) -> &dyn Transferer {
566 self.transferer
567 }
568
569 fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
570 self.state.path_map.get_by_right(path).cloned()
571 }
572
573 fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
574 self.state.path_map.get_by_left(path).cloned()
575 }
576
577 fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
578 self.state.insert_backend_input(InputKind::File, path)?;
579 Ok(())
580 }
581}
582
583struct State<'a> {
585 temp_dir: &'a Path,
587 base_dir: EvaluationPath,
594 document: &'a Document,
596 task: &'a Task,
598 scopes: [Scope; 3],
604 env: IndexMap<String, String>,
608 backend_inputs: InputTrie,
610 path_map: BiHashMap<HostPath, GuestPath>,
612}
613
614impl<'a> State<'a> {
615 fn new(
617 document: &'a Document,
618 task: &'a Task,
619 temp_dir: &'a Path,
620 guest_inputs_dir: Option<&'static str>,
621 ) -> Result<Self> {
622 let scopes = [
631 Scope::default(),
632 Scope::new(ROOT_SCOPE_INDEX),
633 Scope::new(OUTPUT_SCOPE_INDEX),
634 ];
635
636 let backend_inputs = if let Some(guest_inputs_dir) = guest_inputs_dir {
637 InputTrie::new_with_guest_dir(guest_inputs_dir)
638 } else {
639 InputTrie::new()
640 };
641
642 let document_path = document.path();
643 let mut base_dir = EvaluationPath::parent_of(&document_path).with_context(|| {
644 format!("document `{document_path}` does not have a parent directory")
645 })?;
646
647 base_dir.make_absolute();
648
649 Ok(Self {
650 temp_dir,
651 base_dir,
652 document,
653 task,
654 scopes,
655 env: Default::default(),
656 backend_inputs,
657 path_map: Default::default(),
658 })
659 }
660
661 async fn add_backend_inputs(
673 &mut self,
674 is_optional: bool,
675 value: &mut Value,
676 transferer: &Arc<dyn Transferer>,
677 needs_local_inputs: bool,
678 ) -> Result<()> {
679 if self
682 .document
683 .version()
684 .expect("document should have a version")
685 >= SupportedVersion::V1(V1::Two)
686 {
687 value
688 .ensure_paths_exist(
689 is_optional,
690 self.base_dir.as_local(),
691 Some(transferer.as_ref()),
692 &|_| Ok(()),
693 )
694 .await?;
695 }
696
697 let mut urls = Vec::new();
699 value.visit_paths(&mut |is_file, path| {
700 if let Some(index) = self.insert_backend_input(
702 if is_file {
703 InputKind::File
704 } else {
705 InputKind::Directory
706 },
707 path,
708 )? {
709 if needs_local_inputs
712 && self.backend_inputs.as_slice()[index].guest_path.is_none()
713 && is_url(path.as_str())
714 && !is_file_url(path.as_str())
715 {
716 urls.push((path.clone(), index));
717 }
718 }
719
720 Ok(())
721 })?;
722
723 if urls.is_empty() {
724 return Ok(());
725 }
726
727 let mut downloads = JoinSet::new();
729 for (url, index) in urls {
730 let transferer = transferer.clone();
731 downloads.spawn(async move {
732 transferer
733 .download(
734 &url.as_str()
735 .parse()
736 .with_context(|| format!("invalid URL `{url}`"))?,
737 )
738 .await
739 .with_context(|| anyhow!("failed to localize `{url}`"))
740 .map(|l| (url, l, index))
741 });
742 }
743
744 while let Some(result) = downloads.join_next().await {
746 let (url, location, index) =
747 result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}")))?;
748
749 let guest_path = GuestPath::new(location.to_str().with_context(|| {
750 format!(
751 "download location `{location}` is not UTF-8",
752 location = location.display()
753 )
754 })?);
755
756 self.path_map.insert(url, guest_path);
758
759 self.backend_inputs.as_slice_mut()[index].set_location(location);
761 }
762
763 Ok(())
764 }
765
766 fn insert_backend_input(&mut self, kind: InputKind, path: &HostPath) -> Result<Option<usize>> {
770 if let Some(index) = self
772 .backend_inputs
773 .insert(kind, path.as_str(), &self.base_dir)?
774 {
775 let input = &self.backend_inputs.as_slice()[index];
777 if let Some(guest_path) = &input.guest_path {
778 self.path_map.insert(path.clone(), guest_path.clone());
779 }
780
781 return Ok(Some(index));
782 }
783
784 Ok(None)
785 }
786}
787
788struct EvaluatedSections {
790 command: String,
792 requirements: Arc<HashMap<String, Value>>,
794 hints: Arc<HashMap<String, Value>>,
796}
797
798pub struct TaskEvaluator {
800 config: Arc<Config>,
802 backend: Arc<dyn TaskExecutionBackend>,
804 token: CancellationToken,
806 transferer: Arc<dyn Transferer>,
808}
809
810impl TaskEvaluator {
811 pub async fn new(config: Config, token: CancellationToken, events: Events) -> Result<Self> {
816 config.validate().await?;
817
818 let config = Arc::new(config);
819 let backend = config.create_backend(events.crankshaft().clone()).await?;
820 let transferer =
821 HttpTransferer::new(config.clone(), token.clone(), events.transfer().clone())?;
822
823 Ok(Self {
824 config,
825 backend,
826 token,
827 transferer: Arc::new(transferer),
828 })
829 }
830
831 pub(crate) fn new_unchecked(
836 config: Arc<Config>,
837 backend: Arc<dyn TaskExecutionBackend>,
838 token: CancellationToken,
839 transferer: Arc<dyn Transferer>,
840 ) -> Self {
841 Self {
842 config,
843 backend,
844 token,
845 transferer,
846 }
847 }
848
849 pub async fn evaluate(
853 &self,
854 document: &Document,
855 task: &Task,
856 inputs: &TaskInputs,
857 root: impl AsRef<Path>,
858 ) -> EvaluationResult<EvaluatedTask> {
859 if document.has_errors() {
861 return Err(anyhow!("cannot evaluate a document with errors").into());
862 }
863
864 self.perform_evaluation(document, task, inputs, root.as_ref(), task.name())
865 .await
866 }
867
868 pub(crate) async fn perform_evaluation(
873 &self,
874 document: &Document,
875 task: &Task,
876 inputs: &TaskInputs,
877 root: &Path,
878 id: &str,
879 ) -> EvaluationResult<EvaluatedTask> {
880 inputs.validate(document, task, None).with_context(|| {
881 format!(
882 "failed to validate the inputs to task `{task}`",
883 task = task.name()
884 )
885 })?;
886
887 let ast = match document.root().morph().ast() {
888 Ast::V1(ast) => ast,
889 _ => {
890 return Err(
891 anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
892 );
893 }
894 };
895
896 let definition = ast
898 .tasks()
899 .find(|t| t.name().text() == task.name())
900 .expect("task should exist in the AST");
901
902 let version = document.version().expect("document should have version");
903
904 let mut diagnostics = Vec::new();
906 let graph = TaskGraphBuilder::default().build(version, &definition, &mut diagnostics);
907 assert!(
908 diagnostics.is_empty(),
909 "task evaluation graph should have no diagnostics"
910 );
911
912 debug!(
913 task_id = id,
914 task_name = task.name(),
915 document = document.uri().as_str(),
916 "evaluating task"
917 );
918
919 let root_dir = absolute(root).with_context(|| {
920 format!(
921 "failed to determine absolute path of `{path}`",
922 path = root.display()
923 )
924 })?;
925
926 let temp_dir = root_dir.join("tmp");
928 fs::create_dir_all(&temp_dir).with_context(|| {
929 format!(
930 "failed to create directory `{path}`",
931 path = temp_dir.display()
932 )
933 })?;
934
935 write_json_file(root_dir.join(INPUTS_FILE), inputs)?;
937
938 let mut state = State::new(document, task, &temp_dir, self.backend.guest_inputs_dir())?;
939 let nodes = toposort(&graph, None).expect("graph should be acyclic");
940 let mut current = 0;
941 while current < nodes.len() {
942 match &graph[nodes[current]] {
943 TaskGraphNode::Input(decl) => {
944 self.evaluate_input(id, &mut state, decl, inputs)
945 .await
946 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
947 }
948 TaskGraphNode::Decl(decl) => {
949 self.evaluate_decl(id, &mut state, decl)
950 .await
951 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
952 }
953 TaskGraphNode::Output(_) => {
954 break;
956 }
957 TaskGraphNode::Command(_)
958 | TaskGraphNode::Runtime(_)
959 | TaskGraphNode::Requirements(_)
960 | TaskGraphNode::Hints(_) => {
961 }
964 }
965
966 current += 1;
967 }
968
969 let env = Arc::new(mem::take(&mut state.env));
970 let mut attempt = 0;
972 let mut evaluated = loop {
973 let EvaluatedSections {
974 command,
975 requirements,
976 hints,
977 } = self
978 .evaluate_sections(id, &mut state, &definition, inputs, attempt)
979 .await?;
980
981 let max_retries = requirements
984 .get(TASK_REQUIREMENT_MAX_RETRIES)
985 .or_else(|| requirements.get(TASK_REQUIREMENT_MAX_RETRIES_ALIAS))
986 .cloned()
987 .map(|v| v.unwrap_integer() as u64)
988 .or_else(|| self.config.task.retries)
989 .unwrap_or(DEFAULT_TASK_REQUIREMENT_MAX_RETRIES);
990
991 if max_retries > MAX_RETRIES {
992 return Err(anyhow!(
993 "task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
994 )
995 .into());
996 }
997
998 let mut attempt_dir = root_dir.clone();
999 attempt_dir.push("attempts");
1000 attempt_dir.push(attempt.to_string());
1001
1002 let request = TaskSpawnRequest::new(
1003 id.to_string(),
1004 TaskSpawnInfo::new(
1005 command,
1006 self.localize_inputs(id, &mut state).await?,
1007 requirements.clone(),
1008 hints.clone(),
1009 env.clone(),
1010 self.transferer.clone(),
1011 ),
1012 attempt,
1013 attempt_dir.clone(),
1014 root_dir.clone(),
1015 temp_dir.clone(),
1016 );
1017
1018 let result = self
1019 .backend
1020 .spawn(request, self.token.clone())
1021 .with_context(|| {
1022 format!(
1023 "failed to spawn task `{name}` in `{path}` (task id `{id}`)",
1024 name = task.name(),
1025 path = document.path(),
1026 )
1027 })?
1028 .await
1029 .expect("failed to receive response from spawned task")
1030 .map_err(|e| {
1031 EvaluationError::new(
1032 state.document.clone(),
1033 task_execution_failed(e, task.name(), id, task.name_span()),
1034 )
1035 })?;
1036
1037 let evaluated = EvaluatedTask::new(attempt_dir, result)?;
1039 if version >= SupportedVersion::V1(V1::Two) {
1040 let task = state.scopes[TASK_SCOPE_INDEX.0]
1041 .get_mut(TASK_VAR_NAME)
1042 .unwrap()
1043 .as_task_mut()
1044 .unwrap();
1045
1046 task.set_attempt(attempt.try_into().with_context(|| {
1047 format!(
1048 "too many attempts were made to run task `{task}`",
1049 task = state.task.name()
1050 )
1051 })?);
1052 task.set_return_code(evaluated.result.exit_code);
1053 }
1054
1055 if let Err(e) = evaluated
1056 .handle_exit(&requirements, self.transferer.as_ref())
1057 .await
1058 {
1059 if attempt >= max_retries {
1060 return Err(EvaluationError::new(
1061 state.document.clone(),
1062 task_execution_failed(e, task.name(), id, task.name_span()),
1063 ));
1064 }
1065
1066 attempt += 1;
1067
1068 info!(
1069 "retrying execution of task `{name}` (retry {attempt})",
1070 name = state.task.name()
1071 );
1072 continue;
1073 }
1074
1075 break evaluated;
1076 };
1077
1078 if let Some(cleanup) = self
1080 .backend
1081 .cleanup(&evaluated.result.work_dir, self.token.clone())
1082 {
1083 cleanup.await;
1084 }
1085
1086 for index in &nodes[current..] {
1088 match &graph[*index] {
1089 TaskGraphNode::Decl(decl) => {
1090 self.evaluate_decl(id, &mut state, decl)
1091 .await
1092 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1093 }
1094 TaskGraphNode::Output(decl) => {
1095 self.evaluate_output(id, &mut state, decl, &evaluated)
1096 .await
1097 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1098 }
1099 _ => {
1100 unreachable!(
1101 "only declarations and outputs should be evaluated after the command"
1102 )
1103 }
1104 }
1105 }
1106
1107 let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
1109 if let Some(section) = definition.output() {
1110 let indexes: HashMap<_, _> = section
1111 .declarations()
1112 .enumerate()
1113 .map(|(i, d)| (d.name().hashable(), i))
1114 .collect();
1115 outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
1116 }
1117
1118 write_json_file(root_dir.join(OUTPUTS_FILE), &outputs)?;
1120
1121 evaluated.outputs = Ok(outputs);
1122 Ok(evaluated)
1123 }
1124
1125 async fn evaluate_input(
1127 &self,
1128 id: &str,
1129 state: &mut State<'_>,
1130 decl: &Decl<SyntaxNode>,
1131 inputs: &TaskInputs,
1132 ) -> Result<(), Diagnostic> {
1133 let name = decl.name();
1134 let decl_ty = decl.ty();
1135 let expected_ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1136
1137 let (value, span) = match inputs.get(name.text()) {
1139 Some(input) => {
1140 if input.is_none()
1143 && !expected_ty.is_optional()
1144 && state
1145 .document
1146 .version()
1147 .map(|v| v >= SupportedVersion::V1(V1::Two))
1148 .unwrap_or(false)
1149 && let Some(expr) = decl.expr()
1150 {
1151 debug!(
1152 task_id = id,
1153 task_name = state.task.name(),
1154 document = state.document.uri().as_str(),
1155 input_name = name.text(),
1156 "evaluating input default expression"
1157 );
1158
1159 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1160 state,
1161 self.transferer.as_ref(),
1162 ROOT_SCOPE_INDEX,
1163 ));
1164 (evaluator.evaluate_expr(&expr).await?, expr.span())
1165 } else {
1166 (input.clone(), name.span())
1167 }
1168 }
1169 None => match decl.expr() {
1170 Some(expr) => {
1171 debug!(
1172 task_id = id,
1173 task_name = state.task.name(),
1174 document = state.document.uri().as_str(),
1175 input_name = name.text(),
1176 "evaluating input default expression"
1177 );
1178
1179 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1180 state,
1181 self.transferer.as_ref(),
1182 ROOT_SCOPE_INDEX,
1183 ));
1184 (evaluator.evaluate_expr(&expr).await?, expr.span())
1185 }
1186 _ => {
1187 assert!(expected_ty.is_optional(), "type should be optional");
1188 (Value::new_none(expected_ty.clone()), name.span())
1189 }
1190 },
1191 };
1192
1193 let mut value = value
1195 .coerce(
1196 Some(&TaskEvaluationContext::new(
1197 state,
1198 self.transferer.as_ref(),
1199 ROOT_SCOPE_INDEX,
1200 )),
1201 &expected_ty,
1202 )
1203 .map_err(|e| runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), span))?;
1204
1205 state
1207 .add_backend_inputs(
1208 decl_ty.is_optional(),
1209 &mut value,
1210 &self.transferer,
1211 self.backend.needs_local_inputs(),
1212 )
1213 .await
1214 .map_err(|e| {
1215 decl_evaluation_failed(
1216 e,
1217 state.task.name(),
1218 true,
1219 name.text(),
1220 Some(Io::Input),
1221 name.span(),
1222 )
1223 })?;
1224
1225 state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1227
1228 if decl.env().is_some() {
1230 let value = value
1231 .as_primitive()
1232 .expect("value should be primitive")
1233 .raw(Some(&TaskEvaluationContext::new(
1234 state,
1235 self.transferer.as_ref(),
1236 ROOT_SCOPE_INDEX,
1237 )))
1238 .to_string();
1239 state.env.insert(name.text().to_string(), value);
1240 }
1241
1242 Ok(())
1243 }
1244
1245 async fn evaluate_decl(
1247 &self,
1248 id: &str,
1249 state: &mut State<'_>,
1250 decl: &Decl<SyntaxNode>,
1251 ) -> Result<(), Diagnostic> {
1252 let name = decl.name();
1253 debug!(
1254 task_id = id,
1255 task_name = state.task.name(),
1256 document = state.document.uri().as_str(),
1257 decl_name = name.text(),
1258 "evaluating private declaration",
1259 );
1260
1261 let decl_ty = decl.ty();
1262 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1263
1264 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1265 state,
1266 self.transferer.as_ref(),
1267 ROOT_SCOPE_INDEX,
1268 ));
1269
1270 let expr = decl.expr().expect("private decls should have expressions");
1271 let value = evaluator.evaluate_expr(&expr).await?;
1272 let mut value = value
1273 .coerce(
1274 Some(&TaskEvaluationContext::new(
1275 state,
1276 self.transferer.as_ref(),
1277 ROOT_SCOPE_INDEX,
1278 )),
1279 &ty,
1280 )
1281 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1282
1283 state
1285 .add_backend_inputs(
1286 decl_ty.is_optional(),
1287 &mut value,
1288 &self.transferer,
1289 self.backend.needs_local_inputs(),
1290 )
1291 .await
1292 .map_err(|e| {
1293 decl_evaluation_failed(e, state.task.name(), true, name.text(), None, name.span())
1294 })?;
1295
1296 state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
1297
1298 if decl.env().is_some() {
1300 let value = value
1301 .as_primitive()
1302 .expect("value should be primitive")
1303 .raw(Some(&TaskEvaluationContext::new(
1304 state,
1305 self.transferer.as_ref(),
1306 ROOT_SCOPE_INDEX,
1307 )))
1308 .to_string();
1309 state.env.insert(name.text().to_string(), value);
1310 }
1311
1312 Ok(())
1313 }
1314
1315 async fn evaluate_runtime_section(
1319 &self,
1320 id: &str,
1321 state: &mut State<'_>,
1322 section: &RuntimeSection<SyntaxNode>,
1323 inputs: &TaskInputs,
1324 ) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
1325 debug!(
1326 task_id = id,
1327 task_name = state.task.name(),
1328 document = state.document.uri().as_str(),
1329 "evaluating runtimes section",
1330 );
1331
1332 let mut requirements = HashMap::new();
1333 let mut hints = HashMap::new();
1334
1335 let version = state
1336 .document
1337 .version()
1338 .expect("document should have version");
1339 for item in section.items() {
1340 let name = item.name();
1341 match inputs.requirement(name.text()) {
1342 Some(value) => {
1343 requirements.insert(name.text().to_string(), value.clone());
1344 continue;
1345 }
1346 _ => {
1347 if let Some(value) = inputs.hint(name.text()) {
1348 hints.insert(name.text().to_string(), value.clone());
1349 continue;
1350 }
1351 }
1352 }
1353
1354 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1355 state,
1356 self.transferer.as_ref(),
1357 ROOT_SCOPE_INDEX,
1358 ));
1359
1360 let (types, requirement) = match task_requirement_types(version, name.text()) {
1361 Some(types) => (Some(types), true),
1362 None => match task_hint_types(version, name.text(), false) {
1363 Some(types) => (Some(types), false),
1364 None => (None, false),
1365 },
1366 };
1367
1368 let expr = item.expr();
1370 let mut value = evaluator.evaluate_expr(&expr).await?;
1371 if let Some(types) = types {
1372 value = types
1373 .iter()
1374 .find_map(|ty| {
1375 value
1376 .coerce(
1377 Some(&TaskEvaluationContext::new(
1378 state,
1379 self.transferer.as_ref(),
1380 ROOT_SCOPE_INDEX,
1381 )),
1382 ty,
1383 )
1384 .ok()
1385 })
1386 .ok_or_else(|| {
1387 multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1388 })?;
1389 }
1390
1391 if requirement {
1392 requirements.insert(name.text().to_string(), value);
1393 } else {
1394 hints.insert(name.text().to_string(), value);
1395 }
1396 }
1397
1398 Ok((requirements, hints))
1399 }
1400
1401 async fn evaluate_requirements_section(
1403 &self,
1404 id: &str,
1405 state: &mut State<'_>,
1406 section: &RequirementsSection<SyntaxNode>,
1407 inputs: &TaskInputs,
1408 ) -> Result<HashMap<String, Value>, Diagnostic> {
1409 debug!(
1410 task_id = id,
1411 task_name = state.task.name(),
1412 document = state.document.uri().as_str(),
1413 "evaluating requirements",
1414 );
1415
1416 let mut requirements = HashMap::new();
1417
1418 let version = state
1419 .document
1420 .version()
1421 .expect("document should have version");
1422 for item in section.items() {
1423 let name = item.name();
1424 if let Some(value) = inputs.requirement(name.text()) {
1425 requirements.insert(name.text().to_string(), value.clone());
1426 continue;
1427 }
1428
1429 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1430 state,
1431 self.transferer.as_ref(),
1432 ROOT_SCOPE_INDEX,
1433 ));
1434
1435 let types =
1436 task_requirement_types(version, name.text()).expect("requirement should be known");
1437
1438 let expr = item.expr();
1440 let value = evaluator.evaluate_expr(&expr).await?;
1441 let value = types
1442 .iter()
1443 .find_map(|ty| {
1444 value
1445 .coerce(
1446 Some(&TaskEvaluationContext::new(
1447 state,
1448 self.transferer.as_ref(),
1449 ROOT_SCOPE_INDEX,
1450 )),
1451 ty,
1452 )
1453 .ok()
1454 })
1455 .ok_or_else(|| {
1456 multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1457 })?;
1458
1459 requirements.insert(name.text().to_string(), value);
1460 }
1461
1462 Ok(requirements)
1463 }
1464
1465 async fn evaluate_hints_section(
1467 &self,
1468 id: &str,
1469 state: &mut State<'_>,
1470 section: &TaskHintsSection<SyntaxNode>,
1471 inputs: &TaskInputs,
1472 ) -> Result<HashMap<String, Value>, Diagnostic> {
1473 debug!(
1474 task_id = id,
1475 task_name = state.task.name(),
1476 document = state.document.uri().as_str(),
1477 "evaluating hints section",
1478 );
1479
1480 let mut hints = HashMap::new();
1481
1482 for item in section.items() {
1483 let name = item.name();
1484 if let Some(value) = inputs.hint(name.text()) {
1485 hints.insert(name.text().to_string(), value.clone());
1486 continue;
1487 }
1488
1489 let mut evaluator = ExprEvaluator::new(
1490 TaskEvaluationContext::new(state, self.transferer.as_ref(), ROOT_SCOPE_INDEX)
1491 .with_task(),
1492 );
1493
1494 let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
1495 hints.insert(name.text().to_string(), value);
1496 }
1497
1498 Ok(hints)
1499 }
1500
1501 async fn evaluate_command(
1505 &self,
1506 id: &str,
1507 state: &mut State<'_>,
1508 section: &CommandSection<SyntaxNode>,
1509 ) -> EvaluationResult<String> {
1510 debug!(
1511 task_id = id,
1512 task_name = state.task.name(),
1513 document = state.document.uri().as_str(),
1514 "evaluating command section",
1515 );
1516
1517 let document = state.document.clone();
1518 let mut command = String::new();
1519 match section.strip_whitespace() {
1520 Some(parts) => {
1521 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1522 state,
1523 self.transferer.as_ref(),
1524 TASK_SCOPE_INDEX,
1525 ));
1526
1527 for part in parts {
1528 match part {
1529 StrippedCommandPart::Text(t) => {
1530 command.push_str(t.as_str());
1531 }
1532 StrippedCommandPart::Placeholder(placeholder) => {
1533 evaluator
1534 .evaluate_placeholder(&placeholder, &mut command)
1535 .await
1536 .map_err(|d| EvaluationError::new(document.clone(), d))?;
1537 }
1538 }
1539 }
1540 }
1541 _ => {
1542 warn!(
1543 "command for task `{task}` in `{uri}` has mixed indentation; whitespace \
1544 stripping was skipped",
1545 task = state.task.name(),
1546 uri = state.document.uri(),
1547 );
1548
1549 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1550 state,
1551 self.transferer.as_ref(),
1552 TASK_SCOPE_INDEX,
1553 ));
1554
1555 let heredoc = section.is_heredoc();
1556 for part in section.parts() {
1557 match part {
1558 CommandPart::Text(t) => {
1559 t.unescape_to(heredoc, &mut command);
1560 }
1561 CommandPart::Placeholder(placeholder) => {
1562 evaluator
1563 .evaluate_placeholder(&placeholder, &mut command)
1564 .await
1565 .map_err(|d| EvaluationError::new(document.clone(), d))?;
1566 }
1567 }
1568 }
1569 }
1570 }
1571
1572 Ok(command)
1573 }
1574
1575 async fn evaluate_sections(
1583 &self,
1584 id: &str,
1585 state: &mut State<'_>,
1586 definition: &TaskDefinition<SyntaxNode>,
1587 inputs: &TaskInputs,
1588 attempt: u64,
1589 ) -> EvaluationResult<EvaluatedSections> {
1590 let (requirements, hints) = match definition.runtime() {
1592 Some(section) => self
1593 .evaluate_runtime_section(id, state, §ion, inputs)
1594 .await
1595 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1596 _ => (
1597 match definition.requirements() {
1598 Some(section) => self
1599 .evaluate_requirements_section(id, state, §ion, inputs)
1600 .await
1601 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1602 None => Default::default(),
1603 },
1604 match definition.hints() {
1605 Some(section) => self
1606 .evaluate_hints_section(id, state, §ion, inputs)
1607 .await
1608 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1609 None => Default::default(),
1610 },
1611 ),
1612 };
1613
1614 if state.document.version() >= Some(SupportedVersion::V1(V1::Two)) {
1618 let constraints = self
1620 .backend
1621 .constraints(&requirements, &hints)
1622 .with_context(|| {
1623 format!(
1624 "failed to get constraints for task `{task}`",
1625 task = state.task.name()
1626 )
1627 })?;
1628
1629 let task = TaskValue::new_v1(
1630 state.task.name(),
1631 id,
1632 definition,
1633 constraints,
1634 attempt.try_into().with_context(|| {
1635 format!(
1636 "too many attempts were made to run task `{task}`",
1637 task = state.task.name()
1638 )
1639 })?,
1640 );
1641
1642 let scope = &mut state.scopes[TASK_SCOPE_INDEX.0];
1643 if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1644 *v = Value::Task(task);
1645 } else {
1646 scope.insert(TASK_VAR_NAME, Value::Task(task));
1647 }
1648 }
1649
1650 let command = self
1651 .evaluate_command(
1652 id,
1653 state,
1654 &definition.command().expect("must have command section"),
1655 )
1656 .await?;
1657
1658 Ok(EvaluatedSections {
1659 command,
1660 requirements: Arc::new(requirements),
1661 hints: Arc::new(hints),
1662 })
1663 }
1664
1665 async fn evaluate_output(
1667 &self,
1668 id: &str,
1669 state: &mut State<'_>,
1670 decl: &Decl<SyntaxNode>,
1671 evaluated: &EvaluatedTask,
1672 ) -> Result<(), Diagnostic> {
1673 let name = decl.name();
1674 debug!(
1675 task_id = id,
1676 task_name = state.task.name(),
1677 document = state.document.uri().as_str(),
1678 output_name = name.text(),
1679 "evaluating output",
1680 );
1681
1682 let decl_ty = decl.ty();
1683 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1684 let mut evaluator = ExprEvaluator::new(
1685 TaskEvaluationContext::new(state, self.transferer.as_ref(), TASK_SCOPE_INDEX)
1686 .with_work_dir(&evaluated.result.work_dir)
1687 .with_stdout(&evaluated.result.stdout)
1688 .with_stderr(&evaluated.result.stderr),
1689 );
1690
1691 let expr = decl.expr().expect("outputs should have expressions");
1692 let value = evaluator.evaluate_expr(&expr).await?;
1693
1694 let mut value = value
1696 .coerce(Some(evaluator.context()), &ty)
1697 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1698
1699 value
1700 .ensure_paths_exist(
1701 ty.is_optional(),
1702 state.base_dir.as_local(),
1703 Some(self.transferer.as_ref()),
1704 &|path| {
1705 let mut output_path = evaluated.result.work_dir.join(path.as_str())?;
1707
1708 let output_path = match (&mut output_path, &evaluated.result.work_dir) {
1710 (EvaluationPath::Local(joined), EvaluationPath::Local(base))
1711 if joined.starts_with(base)
1712 || joined.starts_with(&evaluated.attempt_dir) =>
1713 {
1714 HostPath::new(String::try_from(output_path)?)
1717 }
1718 (EvaluationPath::Local(_), EvaluationPath::Local(_)) => {
1719 state
1722 .path_map
1723 .get_by_left(path)
1724 .ok_or_else(|| {
1725 anyhow!(
1726 "guest path `{path}` is not an input or within the task's \
1727 working directory"
1728 )
1729 })?
1730 .0
1731 .clone()
1732 .into()
1733 }
1734 (EvaluationPath::Local(_), EvaluationPath::Remote(_)) => {
1735 bail!(
1737 "cannot access guest path `{path}` from a remotely executing task"
1738 )
1739 }
1740 (EvaluationPath::Remote(_), _) => {
1741 HostPath::new(String::try_from(output_path)?)
1742 }
1743 };
1744
1745 *path = output_path;
1746 Ok(())
1747 },
1748 )
1749 .await
1750 .map_err(|e| {
1751 decl_evaluation_failed(
1752 e,
1753 state.task.name(),
1754 true,
1755 name.text(),
1756 Some(Io::Output),
1757 name.span(),
1758 )
1759 })?;
1760
1761 state.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
1762 Ok(())
1763 }
1764
1765 async fn localize_inputs(
1769 &self,
1770 task_id: &str,
1771 state: &mut State<'_>,
1772 ) -> EvaluationResult<Vec<Input>> {
1773 if self.backend.needs_local_inputs() {
1775 let mut downloads = JoinSet::new();
1776
1777 for (idx, input) in state.backend_inputs.as_slice_mut().iter_mut().enumerate() {
1779 if input.local_path().is_some() {
1780 continue;
1781 }
1782
1783 if let EvaluationPath::Remote(url) = input.path() {
1784 let transferer = self.transferer.clone();
1785 let url = url.clone();
1786 downloads.spawn(async move {
1787 transferer
1788 .download(&url)
1789 .await
1790 .map(|l| (idx, l))
1791 .with_context(|| anyhow!("failed to localize `{url}`"))
1792 });
1793 }
1794 }
1795
1796 while let Some(result) = downloads.join_next().await {
1798 match result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}"))) {
1799 Ok((idx, location)) => {
1800 state.backend_inputs.as_slice_mut()[idx].set_location(location);
1801 }
1802 Err(e) => {
1803 return Err(EvaluationError::new(
1804 state.document.clone(),
1805 task_localization_failed(e, state.task.name(), state.task.name_span()),
1806 ));
1807 }
1808 }
1809 }
1810 }
1811
1812 if enabled!(Level::DEBUG) {
1813 for input in state.backend_inputs.as_slice() {
1814 match (
1815 input.path().as_local().is_some(),
1816 input.local_path(),
1817 input.guest_path(),
1818 ) {
1819 (true, _, None) | (false, None, None) => {}
1821 (true, _, Some(guest_path)) => {
1823 debug!(
1824 task_id,
1825 task_name = state.task.name(),
1826 document = state.document.uri().as_str(),
1827 "task input `{path}` mapped to `{guest_path}`",
1828 path = input.path().display(),
1829 );
1830 }
1831 (false, Some(local_path), None) => {
1833 debug!(
1834 task_id,
1835 task_name = state.task.name(),
1836 document = state.document.uri().as_str(),
1837 "task input `{path}` downloaded to `{local_path}`",
1838 path = input.path().display(),
1839 local_path = local_path.display()
1840 );
1841 }
1842 (false, None, Some(guest_path)) => {
1844 debug!(
1845 task_id,
1846 task_name = state.task.name(),
1847 document = state.document.uri().as_str(),
1848 "task input `{path}` mapped to `{guest_path}`",
1849 path = input.path().display(),
1850 );
1851 }
1852 (false, Some(local_path), Some(guest_path)) => {
1854 debug!(
1855 task_id,
1856 task_name = state.task.name(),
1857 document = state.document.uri().as_str(),
1858 "task input `{path}` downloaded to `{local_path}` and mapped to \
1859 `{guest_path}`",
1860 path = input.path().display(),
1861 local_path = local_path.display(),
1862 );
1863 }
1864 }
1865 }
1866 }
1867
1868 Ok(state.backend_inputs.as_slice().into())
1869 }
1870}