1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::future::Future;
7use std::mem;
8use std::path::Path;
9use std::path::absolute;
10use std::sync::Arc;
11
12use anyhow::Context;
13use anyhow::Result;
14use anyhow::anyhow;
15use indexmap::IndexMap;
16use petgraph::algo::toposort;
17use tokio_util::sync::CancellationToken;
18use tracing::Level;
19use tracing::debug;
20use tracing::enabled;
21use tracing::info;
22use tracing::warn;
23use wdl_analysis::Document;
24use wdl_analysis::diagnostics::multiple_type_mismatch;
25use wdl_analysis::diagnostics::unknown_name;
26use wdl_analysis::document::TASK_VAR_NAME;
27use wdl_analysis::document::Task;
28use wdl_analysis::eval::v1::TaskGraphBuilder;
29use wdl_analysis::eval::v1::TaskGraphNode;
30use wdl_analysis::types::Optional;
31use wdl_analysis::types::PrimitiveType;
32use wdl_analysis::types::Type;
33use wdl_analysis::types::v1::task_hint_types;
34use wdl_analysis::types::v1::task_requirement_types;
35use wdl_ast::Ast;
36use wdl_ast::AstNode;
37use wdl_ast::AstToken;
38use wdl_ast::Diagnostic;
39use wdl_ast::Span;
40use wdl_ast::SupportedVersion;
41use wdl_ast::v1::CommandPart;
42use wdl_ast::v1::CommandSection;
43use wdl_ast::v1::Decl;
44use wdl_ast::v1::RequirementsSection;
45use wdl_ast::v1::RuntimeSection;
46use wdl_ast::v1::StrippedCommandPart;
47use wdl_ast::v1::TASK_HINT_MAX_CPU;
48use wdl_ast::v1::TASK_HINT_MAX_CPU_ALIAS;
49use wdl_ast::v1::TASK_HINT_MAX_MEMORY;
50use wdl_ast::v1::TASK_HINT_MAX_MEMORY_ALIAS;
51use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER;
52use wdl_ast::v1::TASK_REQUIREMENT_CONTAINER_ALIAS;
53use wdl_ast::v1::TASK_REQUIREMENT_CPU;
54use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES;
55use wdl_ast::v1::TASK_REQUIREMENT_MAX_RETRIES_ALIAS;
56use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
57use wdl_ast::v1::TaskDefinition;
58use wdl_ast::v1::TaskHintsSection;
59use wdl_ast::version::V1;
60
61use super::ProgressKind;
62use crate::Coercible;
63use crate::EvaluationContext;
64use crate::EvaluationError;
65use crate::EvaluationResult;
66use crate::Input;
67use crate::InputKind;
68use crate::Outputs;
69use crate::PrimitiveValue;
70use crate::Scope;
71use crate::ScopeIndex;
72use crate::ScopeRef;
73use crate::TaskExecutionBackend;
74use crate::TaskInputs;
75use crate::TaskSpawnInfo;
76use crate::TaskSpawnRequest;
77use crate::TaskValue;
78use crate::Value;
79use crate::config::Config;
80use crate::config::MAX_RETRIES;
81use crate::convert_unit_string;
82use crate::diagnostics::output_evaluation_failed;
83use crate::diagnostics::runtime_type_mismatch;
84use crate::diagnostics::task_execution_failed;
85use crate::diagnostics::task_localization_failed;
86use crate::eval::EvaluatedTask;
87use crate::http::Downloader;
88use crate::http::HttpDownloader;
89use crate::path;
90use crate::path::EvaluationPath;
91use crate::tree::SyntaxNode;
92use crate::v1::ExprEvaluator;
93use crate::v1::INPUTS_FILE;
94use crate::v1::OUTPUTS_FILE;
95use crate::v1::write_json_file;
96
97pub const DEFAULT_TASK_REQUIREMENT_CONTAINER: &str = "ubuntu:latest";
99pub const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
101pub const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * 1024 * 1024 * 1024;
103pub const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
105
106const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
108const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
110const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
113
114pub(crate) fn container<'a>(
116 requirements: &'a HashMap<String, Value>,
117 default: Option<&'a str>,
118) -> Cow<'a, str> {
119 requirements
120 .get(TASK_REQUIREMENT_CONTAINER)
121 .or_else(|| requirements.get(TASK_REQUIREMENT_CONTAINER_ALIAS))
122 .and_then(|v| -> Option<Cow<'_, str>> {
123 if let Some(array) = v.as_array() {
127 return array.as_slice().first().map(|v| {
128 v.as_string()
129 .expect("type should be string")
130 .as_ref()
131 .into()
132 });
133 }
134
135 Some(
136 v.coerce(&PrimitiveType::String.into())
137 .expect("type should coerce")
138 .unwrap_string()
139 .as_ref()
140 .clone()
141 .into(),
142 )
143 })
144 .and_then(|v| {
145 if v == "*" { None } else { Some(v) }
147 })
148 .unwrap_or_else(|| {
149 default
150 .map(Into::into)
151 .unwrap_or(DEFAULT_TASK_REQUIREMENT_CONTAINER.into())
152 })
153}
154
155pub(crate) fn cpu(requirements: &HashMap<String, Value>) -> f64 {
157 requirements
158 .get(TASK_REQUIREMENT_CPU)
159 .map(|v| {
160 v.coerce(&PrimitiveType::Float.into())
161 .expect("type should coerce")
162 .unwrap_float()
163 })
164 .unwrap_or(DEFAULT_TASK_REQUIREMENT_CPU)
165}
166
167pub(crate) fn max_cpu(hints: &HashMap<String, Value>) -> Option<f64> {
169 hints
170 .get(TASK_HINT_MAX_CPU)
171 .or_else(|| hints.get(TASK_HINT_MAX_CPU_ALIAS))
172 .map(|v| {
173 v.coerce(&PrimitiveType::Float.into())
174 .expect("type should coerce")
175 .unwrap_float()
176 })
177}
178
179pub(crate) fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
181 Ok(requirements
182 .get(TASK_REQUIREMENT_MEMORY)
183 .map(|v| {
184 if let Some(v) = v.as_integer() {
185 return Ok(v);
186 }
187
188 if let Some(s) = v.as_string() {
189 return convert_unit_string(s)
190 .and_then(|v| v.try_into().ok())
191 .with_context(|| {
192 format!("task specifies an invalid `memory` requirement `{s}`")
193 });
194 }
195
196 unreachable!("value should be an integer or string");
197 })
198 .transpose()?
199 .unwrap_or(DEFAULT_TASK_REQUIREMENT_MEMORY))
200}
201
202pub(crate) fn max_memory(hints: &HashMap<String, Value>) -> Result<Option<i64>> {
204 hints
205 .get(TASK_HINT_MAX_MEMORY)
206 .or_else(|| hints.get(TASK_HINT_MAX_MEMORY_ALIAS))
207 .map(|v| {
208 if let Some(v) = v.as_integer() {
209 return Ok(v);
210 }
211
212 if let Some(s) = v.as_string() {
213 return convert_unit_string(s)
214 .and_then(|v| v.try_into().ok())
215 .with_context(|| {
216 format!("task specifies an invalid `memory` requirement `{s}`")
217 });
218 }
219
220 unreachable!("value should be an integer or string");
221 })
222 .transpose()
223}
224
225struct TaskEvaluationContext<'a, 'b> {
227 state: &'a State<'b>,
229 downloader: &'a HttpDownloader,
231 scope: ScopeIndex,
233 work_dir: Option<&'a EvaluationPath>,
237 stdout: Option<&'a Value>,
241 stderr: Option<&'a Value>,
245 inputs: Option<&'a [Input]>,
247 task: bool,
251}
252
253impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
254 pub fn new(state: &'a State<'b>, downloader: &'a HttpDownloader, scope: ScopeIndex) -> Self {
256 Self {
257 state,
258 downloader,
259 scope,
260 work_dir: None,
261 stdout: None,
262 stderr: None,
263 inputs: None,
264 task: false,
265 }
266 }
267
268 pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
270 self.work_dir = Some(work_dir);
271 self
272 }
273
274 pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
276 self.stdout = Some(stdout);
277 self
278 }
279
280 pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
282 self.stderr = Some(stderr);
283 self
284 }
285
286 pub fn with_inputs(mut self, inputs: &'a [Input]) -> Self {
288 self.inputs = Some(inputs);
289 self
290 }
291
292 pub fn with_task(mut self) -> Self {
296 self.task = true;
297 self
298 }
299}
300
301impl EvaluationContext for TaskEvaluationContext<'_, '_> {
302 fn version(&self) -> SupportedVersion {
303 self.state
304 .document
305 .version()
306 .expect("document should have a version")
307 }
308
309 fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
310 ScopeRef::new(&self.state.scopes, self.scope)
311 .lookup(name)
312 .cloned()
313 .ok_or_else(|| unknown_name(name, span))
314 }
315
316 fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
317 crate::resolve_type_name(self.state.document, name, span)
318 }
319
320 fn work_dir(&self) -> Option<&EvaluationPath> {
321 self.work_dir
322 }
323
324 fn temp_dir(&self) -> &Path {
325 self.state.temp_dir
326 }
327
328 fn stdout(&self) -> Option<&Value> {
329 self.stdout
330 }
331
332 fn stderr(&self) -> Option<&Value> {
333 self.stderr
334 }
335
336 fn task(&self) -> Option<&Task> {
337 if self.task {
338 Some(self.state.task)
339 } else {
340 None
341 }
342 }
343
344 fn translate_path(&self, path: &str) -> Option<Cow<'_, Path>> {
345 let inputs = self.inputs?;
346 let is_url = path::is_url(path);
347
348 if !is_url && Path::new(path).is_relative() {
350 return None;
351 }
352
353 let (guest_path, stripped) = inputs
355 .iter()
356 .filter_map(|i| {
357 match i.path() {
358 EvaluationPath::Local(base) if !is_url => {
359 let stripped = Path::new(path).strip_prefix(base).ok()?;
360 Some((i.guest_path()?, stripped.to_str()?))
361 }
362 EvaluationPath::Remote(url) if is_url => {
363 let url = url.as_str();
364 let stripped = path.strip_prefix(url.strip_suffix('/').unwrap_or(url))?;
365
366 let stripped = if let Some(pos) = stripped.find('?') {
368 &stripped[..pos]
369 } else if let Some(pos) = stripped.find('#') {
370 &stripped[..pos]
371 } else {
372 stripped.strip_prefix('/').unwrap_or(stripped)
373 };
374
375 Some((i.guest_path()?, stripped))
376 }
377 _ => None,
378 }
379 })
380 .min_by(|(_, a), (_, b)| a.len().cmp(&b.len()))?;
381
382 if stripped.is_empty() {
383 return Some(Path::new(guest_path).into());
384 }
385
386 Some(Path::new(guest_path).join(stripped).into())
387 }
388
389 fn downloader(&self) -> &dyn Downloader {
390 self.downloader
391 }
392}
393
394struct State<'a> {
396 temp_dir: &'a Path,
398 document: &'a Document,
400 task: &'a Task,
402 scopes: [Scope; 3],
408 env: IndexMap<String, String>,
412}
413
414impl<'a> State<'a> {
415 fn new(temp_dir: &'a Path, document: &'a Document, task: &'a Task) -> Result<Self> {
417 let scopes = [
426 Scope::default(),
427 Scope::new(ROOT_SCOPE_INDEX),
428 Scope::new(OUTPUT_SCOPE_INDEX),
429 ];
430
431 Ok(Self {
432 temp_dir,
433 document,
434 task,
435 scopes,
436 env: Default::default(),
437 })
438 }
439}
440
441struct EvaluatedSections {
443 command: String,
445 requirements: Arc<HashMap<String, Value>>,
447 hints: Arc<HashMap<String, Value>>,
449 inputs: Vec<Input>,
451}
452
453pub struct TaskEvaluator {
455 config: Arc<Config>,
457 backend: Arc<dyn TaskExecutionBackend>,
459 token: CancellationToken,
461 downloader: HttpDownloader,
463}
464
465impl TaskEvaluator {
466 pub async fn new(config: Config, token: CancellationToken) -> Result<Self> {
473 let backend = config.create_backend().await?;
474 Self::new_with_backend(config, backend, token)
475 }
476
477 pub fn new_with_backend(
482 config: Config,
483 backend: Arc<dyn TaskExecutionBackend>,
484 token: CancellationToken,
485 ) -> Result<Self> {
486 config.validate()?;
487
488 let config = Arc::new(config);
489 let downloader = HttpDownloader::new(config.clone())?;
490
491 Ok(Self {
492 config,
493 backend,
494 token,
495 downloader,
496 })
497 }
498
499 pub(crate) fn new_unchecked(
504 config: Arc<Config>,
505 backend: Arc<dyn TaskExecutionBackend>,
506 token: CancellationToken,
507 downloader: HttpDownloader,
508 ) -> Self {
509 Self {
510 config,
511 backend,
512 token,
513 downloader,
514 }
515 }
516
517 pub async fn evaluate<P, R>(
521 &self,
522 document: &Document,
523 task: &Task,
524 inputs: &TaskInputs,
525 root: impl AsRef<Path>,
526 progress: P,
527 ) -> EvaluationResult<EvaluatedTask>
528 where
529 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
530 R: Future<Output = ()> + Send,
531 {
532 self.evaluate_with_progress(
533 document,
534 task,
535 inputs,
536 root.as_ref(),
537 task.name(),
538 Arc::new(progress),
539 )
540 .await
541 }
542
543 pub(crate) async fn evaluate_with_progress<P, R>(
545 &self,
546 document: &Document,
547 task: &Task,
548 inputs: &TaskInputs,
549 root: &Path,
550 id: &str,
551 progress: Arc<P>,
552 ) -> EvaluationResult<EvaluatedTask>
553 where
554 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
555 R: Future<Output = ()> + Send,
556 {
557 if document.has_errors() {
559 return Err(anyhow!("cannot evaluate a document with errors").into());
560 }
561
562 progress(ProgressKind::TaskStarted { id }).await;
563
564 let result = self
565 .perform_evaluation(document, task, inputs, root, id, progress.clone())
566 .await;
567
568 progress(ProgressKind::TaskCompleted {
569 id,
570 result: &result,
571 })
572 .await;
573
574 result
575 }
576
577 async fn perform_evaluation<P, R>(
579 &self,
580 document: &Document,
581 task: &Task,
582 inputs: &TaskInputs,
583 root: &Path,
584 id: &str,
585 progress: Arc<P>,
586 ) -> EvaluationResult<EvaluatedTask>
587 where
588 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
589 R: Future<Output = ()> + Send,
590 {
591 inputs.validate(document, task, None).with_context(|| {
592 format!(
593 "failed to validate the inputs to task `{task}`",
594 task = task.name()
595 )
596 })?;
597
598 let ast = match document.root().morph().ast() {
599 Ast::V1(ast) => ast,
600 _ => {
601 return Err(
602 anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
603 );
604 }
605 };
606
607 let definition = ast
609 .tasks()
610 .find(|t| t.name().text() == task.name())
611 .expect("task should exist in the AST");
612
613 let version = document.version().expect("document should have version");
614
615 let mut diagnostics = Vec::new();
617 let graph = TaskGraphBuilder::default().build(version, &definition, &mut diagnostics);
618 assert!(
619 diagnostics.is_empty(),
620 "task evaluation graph should have no diagnostics"
621 );
622
623 debug!(
624 task_id = id,
625 task_name = task.name(),
626 document = document.uri().as_str(),
627 "evaluating task"
628 );
629
630 let root_dir = absolute(root).with_context(|| {
631 format!(
632 "failed to determine absolute path of `{path}`",
633 path = root.display()
634 )
635 })?;
636
637 let temp_dir = root_dir.join("tmp");
639 fs::create_dir_all(&temp_dir).with_context(|| {
640 format!(
641 "failed to create directory `{path}`",
642 path = temp_dir.display()
643 )
644 })?;
645
646 write_json_file(root_dir.join(INPUTS_FILE), inputs)?;
648
649 let mut state = State::new(&temp_dir, document, task)?;
650 let nodes = toposort(&graph, None).expect("graph should be acyclic");
651 let mut current = 0;
652 while current < nodes.len() {
653 match &graph[nodes[current]] {
654 TaskGraphNode::Input(decl) => {
655 self.evaluate_input(id, &mut state, decl, inputs)
656 .await
657 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
658 }
659 TaskGraphNode::Decl(decl) => {
660 self.evaluate_decl(id, &mut state, decl)
661 .await
662 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
663 }
664 TaskGraphNode::Output(_) => {
665 break;
667 }
668 TaskGraphNode::Command(_)
669 | TaskGraphNode::Runtime(_)
670 | TaskGraphNode::Requirements(_)
671 | TaskGraphNode::Hints(_) => {
672 }
675 }
676
677 current += 1;
678 }
679
680 let env = Arc::new(mem::take(&mut state.env));
684
685 let mut attempt = 0;
687 let mut evaluated = loop {
688 let EvaluatedSections {
689 command,
690 requirements,
691 hints,
692 inputs,
693 } = self
694 .evaluate_sections(id, &mut state, &definition, inputs, attempt)
695 .await?;
696
697 let max_retries = requirements
700 .get(TASK_REQUIREMENT_MAX_RETRIES)
701 .or_else(|| requirements.get(TASK_REQUIREMENT_MAX_RETRIES_ALIAS))
702 .cloned()
703 .map(|v| v.unwrap_integer() as u64)
704 .or_else(|| self.config.task.retries)
705 .unwrap_or(DEFAULT_TASK_REQUIREMENT_MAX_RETRIES);
706
707 if max_retries > MAX_RETRIES {
708 return Err(anyhow!(
709 "task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
710 )
711 .into());
712 }
713
714 let mut attempt_dir = root_dir.clone();
715 attempt_dir.push("attempts");
716 attempt_dir.push(attempt.to_string());
717
718 let request = TaskSpawnRequest::new(
719 id.to_string(),
720 TaskSpawnInfo::new(
721 command,
722 inputs,
723 requirements.clone(),
724 hints.clone(),
725 env.clone(),
726 ),
727 attempt,
728 attempt_dir.clone(),
729 );
730
731 let events = self
732 .backend
733 .spawn(request, self.token.clone())
734 .with_context(|| {
735 format!(
736 "failed to spawn task `{name}` in `{path}` (task id `{id}`)",
737 name = task.name(),
738 path = document.path(),
739 )
740 })?;
741
742 if attempt > 0 {
743 progress(ProgressKind::TaskRetried {
744 id,
745 retry: attempt - 1,
746 });
747 }
748
749 events.spawned.await.ok();
751
752 progress(ProgressKind::TaskExecutionStarted { id });
753
754 let result = events
755 .completed
756 .await
757 .expect("failed to receive response from spawned task");
758
759 progress(ProgressKind::TaskExecutionCompleted {
760 id,
761 result: &result,
762 });
763
764 let result = result.map_err(|e| {
765 EvaluationError::new(
766 state.document.clone(),
767 task_execution_failed(e, task.name(), id, task.name_span()),
768 )
769 })?;
770
771 let evaluated = EvaluatedTask::new(attempt_dir, result)?;
773 if version >= SupportedVersion::V1(V1::Two) {
774 let task = state.scopes[TASK_SCOPE_INDEX.0]
775 .get_mut(TASK_VAR_NAME)
776 .unwrap()
777 .as_task_mut()
778 .unwrap();
779
780 task.set_attempt(attempt.try_into().with_context(|| {
781 format!(
782 "too many attempts were made to run task `{task}`",
783 task = state.task.name()
784 )
785 })?);
786 task.set_return_code(evaluated.result.exit_code);
787 }
788
789 if let Err(e) = evaluated.handle_exit(&requirements, &self.downloader).await {
790 if attempt >= max_retries {
791 return Err(EvaluationError::new(
792 state.document.clone(),
793 task_execution_failed(e, task.name(), id, task.name_span()),
794 ));
795 }
796
797 attempt += 1;
798
799 info!(
800 "retrying execution of task `{name}` (retry {attempt})",
801 name = state.task.name()
802 );
803 continue;
804 }
805
806 break evaluated;
807 };
808
809 for index in &nodes[current..] {
811 match &graph[*index] {
812 TaskGraphNode::Decl(decl) => {
813 self.evaluate_decl(id, &mut state, decl)
814 .await
815 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
816 }
817 TaskGraphNode::Output(decl) => {
818 self.evaluate_output(id, &mut state, decl, &evaluated)
819 .await
820 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
821 }
822 _ => {
823 unreachable!(
824 "only declarations and outputs should be evaluated after the command"
825 )
826 }
827 }
828 }
829
830 let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
832 drop(state);
833 if let Some(section) = definition.output() {
834 let indexes: HashMap<_, _> = section
835 .declarations()
836 .enumerate()
837 .map(|(i, d)| (d.name().hashable(), i))
838 .collect();
839 outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
840 }
841
842 write_json_file(root_dir.join(OUTPUTS_FILE), &outputs)?;
844
845 evaluated.outputs = Ok(outputs);
846 Ok(evaluated)
847 }
848
849 async fn evaluate_input(
851 &self,
852 id: &str,
853 state: &mut State<'_>,
854 decl: &Decl<SyntaxNode>,
855 inputs: &TaskInputs,
856 ) -> Result<(), Diagnostic> {
857 let name = decl.name();
858 let decl_ty = decl.ty();
859 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
860
861 let (value, span) = match inputs.get(name.text()) {
862 Some(input) => (input.clone(), name.span()),
863 None => match decl.expr() {
864 Some(expr) => {
865 debug!(
866 task_id = id,
867 task_name = state.task.name(),
868 document = state.document.uri().as_str(),
869 input_name = name.text(),
870 "evaluating input"
871 );
872
873 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
874 state,
875 &self.downloader,
876 ROOT_SCOPE_INDEX,
877 ));
878 let value = evaluator.evaluate_expr(&expr).await?;
879 (value, expr.span())
880 }
881 _ => {
882 assert!(decl.ty().is_optional(), "type should be optional");
883 (Value::None, name.span())
884 }
885 },
886 };
887
888 let value = value
889 .coerce(&ty)
890 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), span))?;
891 state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
892
893 if decl.env().is_some() {
895 state.env.insert(
896 name.text().to_string(),
897 value
898 .as_primitive()
899 .expect("value should be primitive")
900 .raw(None)
901 .to_string(),
902 );
903 }
904
905 Ok(())
906 }
907
908 async fn evaluate_decl(
910 &self,
911 id: &str,
912 state: &mut State<'_>,
913 decl: &Decl<SyntaxNode>,
914 ) -> Result<(), Diagnostic> {
915 let name = decl.name();
916 debug!(
917 task_id = id,
918 task_name = state.task.name(),
919 document = state.document.uri().as_str(),
920 decl_name = name.text(),
921 "evaluating private declaration",
922 );
923
924 let decl_ty = decl.ty();
925 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
926
927 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
928 state,
929 &self.downloader,
930 ROOT_SCOPE_INDEX,
931 ));
932
933 let expr = decl.expr().expect("private decls should have expressions");
934 let value = evaluator.evaluate_expr(&expr).await?;
935 let value = value
936 .coerce(&ty)
937 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
938 state.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
939
940 if decl.env().is_some() {
942 state.env.insert(
943 name.text().to_string(),
944 value
945 .as_primitive()
946 .expect("value should be primitive")
947 .raw(None)
948 .to_string(),
949 );
950 }
951
952 Ok(())
953 }
954
955 async fn evaluate_runtime_section(
959 &self,
960 id: &str,
961 state: &State<'_>,
962 section: &RuntimeSection<SyntaxNode>,
963 inputs: &TaskInputs,
964 ) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
965 debug!(
966 task_id = id,
967 task_name = state.task.name(),
968 document = state.document.uri().as_str(),
969 "evaluating runtimes section",
970 );
971
972 let mut requirements = HashMap::new();
973 let mut hints = HashMap::new();
974
975 let version = state
976 .document
977 .version()
978 .expect("document should have version");
979 for item in section.items() {
980 let name = item.name();
981 match inputs.requirement(name.text()) {
982 Some(value) => {
983 requirements.insert(name.text().to_string(), value.clone());
984 continue;
985 }
986 _ => {
987 if let Some(value) = inputs.hint(name.text()) {
988 hints.insert(name.text().to_string(), value.clone());
989 continue;
990 }
991 }
992 }
993
994 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
995 state,
996 &self.downloader,
997 ROOT_SCOPE_INDEX,
998 ));
999
1000 let (types, requirement) = match task_requirement_types(version, name.text()) {
1001 Some(types) => (Some(types), true),
1002 None => match task_hint_types(version, name.text(), false) {
1003 Some(types) => (Some(types), false),
1004 None => (None, false),
1005 },
1006 };
1007
1008 let expr = item.expr();
1010 let mut value = evaluator.evaluate_expr(&expr).await?;
1011 if let Some(types) = types {
1012 value = types
1013 .iter()
1014 .find_map(|ty| value.coerce(ty).ok())
1015 .ok_or_else(|| {
1016 multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1017 })?;
1018 }
1019
1020 if requirement {
1021 requirements.insert(name.text().to_string(), value);
1022 } else {
1023 hints.insert(name.text().to_string(), value);
1024 }
1025 }
1026
1027 Ok((requirements, hints))
1028 }
1029
1030 async fn evaluate_requirements_section(
1032 &self,
1033 id: &str,
1034 state: &State<'_>,
1035 section: &RequirementsSection<SyntaxNode>,
1036 inputs: &TaskInputs,
1037 ) -> Result<HashMap<String, Value>, Diagnostic> {
1038 debug!(
1039 task_id = id,
1040 task_name = state.task.name(),
1041 document = state.document.uri().as_str(),
1042 "evaluating requirements",
1043 );
1044
1045 let mut requirements = HashMap::new();
1046
1047 let version = state
1048 .document
1049 .version()
1050 .expect("document should have version");
1051 for item in section.items() {
1052 let name = item.name();
1053 if let Some(value) = inputs.requirement(name.text()) {
1054 requirements.insert(name.text().to_string(), value.clone());
1055 continue;
1056 }
1057
1058 let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(
1059 state,
1060 &self.downloader,
1061 ROOT_SCOPE_INDEX,
1062 ));
1063
1064 let types =
1065 task_requirement_types(version, name.text()).expect("requirement should be known");
1066
1067 let expr = item.expr();
1069 let value = evaluator.evaluate_expr(&expr).await?;
1070 let value = types
1071 .iter()
1072 .find_map(|ty| value.coerce(ty).ok())
1073 .ok_or_else(|| {
1074 multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
1075 })?;
1076
1077 requirements.insert(name.text().to_string(), value);
1078 }
1079
1080 Ok(requirements)
1081 }
1082
1083 async fn evaluate_hints_section(
1085 &self,
1086 id: &str,
1087 state: &State<'_>,
1088 section: &TaskHintsSection<SyntaxNode>,
1089 inputs: &TaskInputs,
1090 ) -> Result<HashMap<String, Value>, Diagnostic> {
1091 debug!(
1092 task_id = id,
1093 task_name = state.task.name(),
1094 document = state.document.uri().as_str(),
1095 "evaluating hints section",
1096 );
1097
1098 let mut hints = HashMap::new();
1099
1100 for item in section.items() {
1101 let name = item.name();
1102 if let Some(value) = inputs.hint(name.text()) {
1103 hints.insert(name.text().to_string(), value.clone());
1104 continue;
1105 }
1106
1107 let mut evaluator = ExprEvaluator::new(
1108 TaskEvaluationContext::new(state, &self.downloader, ROOT_SCOPE_INDEX).with_task(),
1109 );
1110
1111 let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
1112 hints.insert(name.text().to_string(), value);
1113 }
1114
1115 Ok(hints)
1116 }
1117
1118 async fn evaluate_command(
1123 &self,
1124 id: &str,
1125 state: &State<'_>,
1126 section: &CommandSection<SyntaxNode>,
1127 ) -> EvaluationResult<(String, Vec<Input>)> {
1128 debug!(
1129 task_id = id,
1130 task_name = state.task.name(),
1131 document = state.document.uri().as_str(),
1132 "evaluating command section",
1133 );
1134
1135 let mut inputs = Vec::new();
1137
1138 ScopeRef::new(&state.scopes, TASK_SCOPE_INDEX.0).for_each(|_, v| {
1140 v.visit_paths(false, &mut |_, value| {
1141 inputs.push(Input::from_primitive(value)?);
1142 Ok(())
1143 })
1144 })?;
1145
1146 inputs.push(Input::new(
1148 InputKind::Directory,
1149 EvaluationPath::Local(state.temp_dir.to_path_buf()),
1150 ));
1151
1152 self.backend
1154 .localize_inputs(&self.downloader, &mut inputs)
1155 .await
1156 .map_err(|e| {
1157 EvaluationError::new(
1158 state.document.clone(),
1159 task_localization_failed(e, state.task.name(), state.task.name_span()),
1160 )
1161 })?;
1162
1163 if enabled!(Level::DEBUG) {
1164 for input in inputs.iter() {
1165 if let Some(location) = input.location() {
1166 debug!(
1167 task_id = id,
1168 task_name = state.task.name(),
1169 document = state.document.uri().as_str(),
1170 "task input `{path}` (downloaded to `{location}`) mapped to `{guest_path}`",
1171 path = input.path().display(),
1172 location = location.display(),
1173 guest_path = input.guest_path().unwrap_or(""),
1174 );
1175 } else {
1176 debug!(
1177 task_id = id,
1178 task_name = state.task.name(),
1179 document = state.document.uri().as_str(),
1180 "task input `{path}` mapped to `{guest_path}`",
1181 path = input.path().display(),
1182 guest_path = input.guest_path().unwrap_or(""),
1183 );
1184 }
1185 }
1186 }
1187
1188 let mut command = String::new();
1189 match section.strip_whitespace() {
1190 Some(parts) => {
1191 let mut evaluator = ExprEvaluator::new(
1192 TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1193 .with_inputs(&inputs),
1194 );
1195
1196 for part in parts {
1197 match part {
1198 StrippedCommandPart::Text(t) => {
1199 command.push_str(t.as_str());
1200 }
1201 StrippedCommandPart::Placeholder(placeholder) => {
1202 evaluator
1203 .evaluate_placeholder(&placeholder, &mut command)
1204 .await
1205 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1206 }
1207 }
1208 }
1209 }
1210 _ => {
1211 warn!(
1212 "command for task `{task}` in `{uri}` has mixed indentation; whitespace \
1213 stripping was skipped",
1214 task = state.task.name(),
1215 uri = state.document.uri(),
1216 );
1217
1218 let mut evaluator = ExprEvaluator::new(
1219 TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1220 .with_inputs(&inputs),
1221 );
1222
1223 let heredoc = section.is_heredoc();
1224 for part in section.parts() {
1225 match part {
1226 CommandPart::Text(t) => {
1227 t.unescape_to(heredoc, &mut command);
1228 }
1229 CommandPart::Placeholder(placeholder) => {
1230 evaluator
1231 .evaluate_placeholder(&placeholder, &mut command)
1232 .await
1233 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1234 }
1235 }
1236 }
1237 }
1238 }
1239
1240 Ok((command, inputs))
1241 }
1242
1243 async fn evaluate_sections(
1251 &self,
1252 id: &str,
1253 state: &mut State<'_>,
1254 definition: &TaskDefinition<SyntaxNode>,
1255 inputs: &TaskInputs,
1256 attempt: u64,
1257 ) -> EvaluationResult<EvaluatedSections> {
1258 let (requirements, hints) = match definition.runtime() {
1260 Some(section) => self
1261 .evaluate_runtime_section(id, state, §ion, inputs)
1262 .await
1263 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1264 _ => (
1265 match definition.requirements() {
1266 Some(section) => self
1267 .evaluate_requirements_section(id, state, §ion, inputs)
1268 .await
1269 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1270 None => Default::default(),
1271 },
1272 match definition.hints() {
1273 Some(section) => self
1274 .evaluate_hints_section(id, state, §ion, inputs)
1275 .await
1276 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
1277 None => Default::default(),
1278 },
1279 ),
1280 };
1281
1282 if state.document.version() >= Some(SupportedVersion::V1(V1::Two)) {
1286 let constraints = self
1288 .backend
1289 .constraints(&requirements, &hints)
1290 .with_context(|| {
1291 format!(
1292 "failed to get constraints for task `{task}`",
1293 task = state.task.name()
1294 )
1295 })?;
1296
1297 let task = TaskValue::new_v1(
1298 state.task.name(),
1299 id,
1300 definition,
1301 constraints,
1302 attempt.try_into().with_context(|| {
1303 format!(
1304 "too many attempts were made to run task `{task}`",
1305 task = state.task.name()
1306 )
1307 })?,
1308 );
1309
1310 let scope = &mut state.scopes[TASK_SCOPE_INDEX.0];
1311 if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
1312 *v = Value::Task(task);
1313 } else {
1314 scope.insert(TASK_VAR_NAME, Value::Task(task));
1315 }
1316 }
1317
1318 let (command, inputs) = self
1319 .evaluate_command(
1320 id,
1321 state,
1322 &definition.command().expect("must have command section"),
1323 )
1324 .await?;
1325
1326 Ok(EvaluatedSections {
1327 command,
1328 requirements: Arc::new(requirements),
1329 hints: Arc::new(hints),
1330 inputs,
1331 })
1332 }
1333
1334 async fn evaluate_output(
1336 &self,
1337 id: &str,
1338 state: &mut State<'_>,
1339 decl: &Decl<SyntaxNode>,
1340 evaluated: &EvaluatedTask,
1341 ) -> Result<(), Diagnostic> {
1342 let name = decl.name();
1343 debug!(
1344 task_id = id,
1345 task_name = state.task.name(),
1346 document = state.document.uri().as_str(),
1347 output_name = name.text(),
1348 "evaluating output",
1349 );
1350
1351 let decl_ty = decl.ty();
1352 let ty = crate::convert_ast_type_v1(state.document, &decl_ty)?;
1353 let mut evaluator = ExprEvaluator::new(
1354 TaskEvaluationContext::new(state, &self.downloader, TASK_SCOPE_INDEX)
1355 .with_work_dir(&evaluated.result.work_dir)
1356 .with_stdout(&evaluated.result.stdout)
1357 .with_stderr(&evaluated.result.stderr),
1358 );
1359
1360 let expr = decl.expr().expect("outputs should have expressions");
1361 let value = evaluator.evaluate_expr(&expr).await?;
1362
1363 let mut value = value
1365 .coerce(&ty)
1366 .map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
1367
1368 let result = if let Some(guest_work_dir) = self.backend.guest_work_dir() {
1369 value.visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1371 let path = match value {
1372 PrimitiveValue::File(path) => path,
1373 PrimitiveValue::Directory(path) => path,
1374 _ => unreachable!("only file and directory values should be visited"),
1375 };
1376
1377 if !Path::new(path.as_str()).starts_with(state.temp_dir)
1380 && !Path::new(path.as_str()).starts_with(evaluated.attempt_dir())
1381 {
1382 let guest = if path::is_file_url(path) {
1384 path::parse_url(path)
1385 .and_then(|u| u.to_file_path().ok())
1386 .ok_or_else(|| anyhow!("guest path `{path}` is not a valid file URI"))?
1387 } else if path::is_url(path) {
1388 return Ok(true);
1391 } else {
1392 guest_work_dir.join(path.as_str())
1394 };
1395
1396 let host = if let Ok(stripped) = guest.strip_prefix(guest_work_dir) {
1399 Cow::Owned(
1400 evaluated.result.work_dir.join(
1401 stripped.to_str().with_context(|| {
1402 format!("output path `{path}` is not UTF-8")
1403 })?,
1404 )?,
1405 )
1406 } else {
1407 evaluated
1408 .inputs()
1409 .iter()
1410 .filter_map(|i| {
1411 Some((i.path(), guest.strip_prefix(i.guest_path()?).ok()?))
1412 })
1413 .min_by(|(_, a), (_, b)| a.as_os_str().len().cmp(&b.as_os_str().len()))
1414 .and_then(|(path, stripped)| {
1415 if stripped.as_os_str().is_empty() {
1416 return Some(Cow::Borrowed(path));
1417 }
1418
1419 Some(Cow::Owned(path.join(stripped.to_str()?).ok()?))
1420 })
1421 .ok_or_else(|| {
1422 anyhow!("guest path `{path}` is not within a container mount")
1423 })?
1424 };
1425
1426 *Arc::make_mut(path) = host.into_owned().try_into()?;
1428 }
1429
1430 value.ensure_path_exists(optional)
1432 })
1433 } else {
1434 value.visit_paths_mut(ty.is_optional(), &mut |optional, value| {
1436 if let Some(work_dir) = evaluated.result.work_dir.as_local() {
1437 value.join_path_to(work_dir);
1438 }
1439
1440 value.ensure_path_exists(optional)
1441 })
1442 };
1443
1444 result.map_err(|e| {
1445 output_evaluation_failed(e, state.task.name(), true, name.text(), name.span())
1446 })?;
1447
1448 state.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
1449 Ok(())
1450 }
1451}