1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::fmt::Write;
7use std::fs;
8use std::future::Future;
9use std::mem;
10use std::path::Path;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use anyhow::Context;
15use anyhow::Result;
16use anyhow::anyhow;
17use anyhow::bail;
18use futures::FutureExt;
19use futures::future::BoxFuture;
20use indexmap::IndexMap;
21use petgraph::Direction;
22use petgraph::graph::DiGraph;
23use petgraph::graph::NodeIndex;
24use petgraph::visit::Bfs;
25use petgraph::visit::EdgeRef;
26use tokio::sync::RwLock;
27use tokio::task::JoinSet;
28use tokio_util::sync::CancellationToken;
29use tracing::debug;
30use tracing::trace;
31use wdl_analysis::Document;
32use wdl_analysis::diagnostics::only_one_namespace;
33use wdl_analysis::diagnostics::recursive_workflow_call;
34use wdl_analysis::diagnostics::type_is_not_array;
35use wdl_analysis::diagnostics::unknown_name;
36use wdl_analysis::diagnostics::unknown_namespace;
37use wdl_analysis::diagnostics::unknown_task_or_workflow;
38use wdl_analysis::document::Task;
39use wdl_analysis::eval::v1::WorkflowGraphBuilder;
40use wdl_analysis::eval::v1::WorkflowGraphNode;
41use wdl_analysis::types::ArrayType;
42use wdl_analysis::types::CallType;
43use wdl_analysis::types::Optional;
44use wdl_analysis::types::PrimitiveType;
45use wdl_analysis::types::PromotionKind;
46use wdl_analysis::types::Type;
47use wdl_ast::Ast;
48use wdl_ast::AstNode;
49use wdl_ast::AstToken;
50use wdl_ast::Diagnostic;
51use wdl_ast::Span;
52use wdl_ast::SupportedVersion;
53use wdl_ast::v1::CallKeyword;
54use wdl_ast::v1::CallStatement;
55use wdl_ast::v1::ConditionalStatement;
56use wdl_ast::v1::Decl;
57use wdl_ast::v1::Expr;
58use wdl_ast::v1::ScatterStatement;
59
60use super::ProgressKind;
61use crate::Array;
62use crate::CallLocation;
63use crate::CallValue;
64use crate::Coercible;
65use crate::EvaluationContext;
66use crate::EvaluationError;
67use crate::EvaluationResult;
68use crate::Inputs;
69use crate::Outputs;
70use crate::PrimitiveValue;
71use crate::Scope;
72use crate::ScopeIndex;
73use crate::ScopeRef;
74use crate::TaskExecutionBackend;
75use crate::Value;
76use crate::WorkflowInputs;
77use crate::config::Config;
78use crate::diagnostics::if_conditional_mismatch;
79use crate::diagnostics::output_evaluation_failed;
80use crate::diagnostics::runtime_type_mismatch;
81use crate::http::Downloader;
82use crate::http::HttpDownloader;
83use crate::path;
84use crate::path::EvaluationPath;
85use crate::tree::SyntaxNode;
86use crate::tree::SyntaxToken;
87use crate::v1::ExprEvaluator;
88use crate::v1::INPUTS_FILE;
89use crate::v1::OUTPUTS_FILE;
90use crate::v1::TaskEvaluator;
91use crate::v1::write_json_file;
92
93fn format_id(namespace: Option<&str>, target: &str, alias: &str, scatter_index: &str) -> String {
95 if alias != target {
96 match namespace {
97 Some(ns) => {
98 format!(
99 "{ns}-{target}-{alias}{sep}{scatter_index}",
100 sep = if scatter_index.is_empty() { "" } else { "-" },
101 )
102 }
103 None => {
104 format!(
105 "{target}-{alias}{sep}{scatter_index}",
106 sep = if scatter_index.is_empty() { "" } else { "-" },
107 )
108 }
109 }
110 } else {
111 match namespace {
112 Some(ns) => {
113 format!(
114 "{ns}-{alias}{sep}{scatter_index}",
115 sep = if scatter_index.is_empty() { "" } else { "-" },
116 )
117 }
118 None => {
119 format!(
120 "{alias}{sep}{scatter_index}",
121 sep = if scatter_index.is_empty() { "" } else { "-" },
122 )
123 }
124 }
125 }
126}
127
128const SCATTER_INDEX_VAR: &str = "$idx";
135
136struct WorkflowEvaluationContext<'a, 'b> {
138 document: &'a Document,
140 scope: ScopeRef<'b>,
142 temp_dir: &'a Path,
144 downloader: &'a HttpDownloader,
146}
147
148impl<'a, 'b> WorkflowEvaluationContext<'a, 'b> {
149 pub fn new(
151 document: &'a Document,
152 scope: ScopeRef<'b>,
153 temp_dir: &'a Path,
154 downloader: &'a HttpDownloader,
155 ) -> Self {
156 Self {
157 document,
158 scope,
159 temp_dir,
160 downloader,
161 }
162 }
163}
164
165impl EvaluationContext for WorkflowEvaluationContext<'_, '_> {
166 fn version(&self) -> SupportedVersion {
167 self.document
168 .version()
169 .expect("document should have a version")
170 }
171
172 fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
173 self.scope
174 .lookup(name)
175 .cloned()
176 .ok_or_else(|| unknown_name(name, span))
177 }
178
179 fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
180 crate::resolve_type_name(self.document, name, span)
181 }
182
183 fn work_dir(&self) -> Option<&EvaluationPath> {
184 None
185 }
186
187 fn temp_dir(&self) -> &Path {
188 self.temp_dir
189 }
190
191 fn stdout(&self) -> Option<&Value> {
192 None
193 }
194
195 fn stderr(&self) -> Option<&Value> {
196 None
197 }
198
199 fn task(&self) -> Option<&Task> {
200 None
201 }
202
203 fn translate_path(&self, _path: &str) -> Option<Cow<'_, Path>> {
204 None
205 }
206
207 fn downloader(&self) -> &dyn Downloader {
208 self.downloader
209 }
210}
211
212#[derive(Debug)]
214struct Scopes {
215 all: Vec<Scope>,
223 free: Vec<ScopeIndex>,
228}
229
230impl Scopes {
231 const OUTPUT_INDEX: ScopeIndex = ScopeIndex::new(1);
233 const ROOT_INDEX: ScopeIndex = ScopeIndex::new(0);
235
236 fn alloc(&mut self, parent: ScopeIndex) -> ScopeIndex {
238 if let Some(index) = self.free.pop() {
239 self.all[index.0].set_parent(parent);
240 return index;
241 }
242
243 let index = self.all.len();
244 self.all.push(Scope::new(parent));
245 index.into()
246 }
247
248 fn reference(&self, index: ScopeIndex) -> ScopeRef<'_> {
250 ScopeRef::new(&self.all, index)
251 }
252
253 fn take(&mut self, index: ScopeIndex) -> Scope {
257 mem::take(&mut self.all[index.0])
258 }
259
260 fn get_mut(&mut self, index: ScopeIndex) -> &mut Scope {
262 &mut self.all[index.0]
263 }
264
265 fn parent_mut(&mut self, index: ScopeIndex) -> (&mut Scope, &Scope) {
268 let parent = self.all[index.0].parent.expect("should have parent");
269 if index.0 < parent.0 {
270 let (left, right) = self.all.split_at_mut(index.0 + 1);
271 (&mut right[parent.0 - index.0 - 1], &left[index.0])
272 } else {
273 let (left, right) = self.all.split_at_mut(parent.0 + 1);
274 (&mut left[parent.0], &right[index.0 - parent.0 - 1])
275 }
276 }
277
278 fn scatter_index(&self, scope: ScopeIndex) -> String {
280 let mut scope = ScopeRef::new(&self.all, scope);
281 let mut s = String::new();
282 loop {
283 if let Some(value) = scope.local(SCATTER_INDEX_VAR) {
284 if !s.is_empty() {
285 s.push('-');
286 }
287
288 write!(
289 &mut s,
290 "{i}",
291 i = value.as_integer().expect("index should be an integer")
292 )
293 .expect("failed to write to string");
294 }
295
296 match scope.parent() {
297 Some(parent) => scope = parent,
298 None => break,
299 }
300 }
301
302 s
303 }
304
305 fn free(&mut self, index: ScopeIndex) {
310 let scope = &mut self.all[index.0];
311 scope.clear();
312 self.free.push(index);
313 }
314}
315
316impl Default for Scopes {
317 fn default() -> Self {
318 Self {
319 all: vec![Scope::default(), Scope::new(Self::ROOT_INDEX)],
321 free: Default::default(),
322 }
323 }
324}
325
326struct GatherArray {
328 element_ty: Type,
330 elements: Vec<Value>,
332}
333
334impl GatherArray {
335 fn new(index: usize, value: Value, capacity: usize) -> Self {
338 let element_ty = value.ty();
339 let mut elements = vec![Value::None; capacity];
340 elements[index] = value;
341 Self {
342 element_ty,
343 elements,
344 }
345 }
346
347 fn into_array(self) -> Array {
349 Array::new_unchecked(ArrayType::new(self.element_ty).into(), self.elements)
350 }
351}
352
353enum Gather {
355 Array(GatherArray),
357 Call {
359 call_ty: CallType,
361 outputs: IndexMap<String, GatherArray>,
363 },
364}
365
366impl Gather {
367 fn new(capacity: usize, index: usize, value: Value) -> Self {
370 if let Value::Call(call) = value {
371 return Self::Call {
372 call_ty: call.ty().promote(PromotionKind::Scatter),
373 outputs: call
374 .outputs()
375 .iter()
376 .map(|(n, v)| (n.to_string(), GatherArray::new(index, v.clone(), capacity)))
377 .collect(),
378 };
379 }
380
381 Self::Array(GatherArray::new(index, value, capacity))
382 }
383
384 fn set(&mut self, index: usize, value: Value) -> EvaluationResult<()> {
386 match self {
387 Self::Array(array) => {
388 assert!(value.as_call().is_none(), "value should not be a call");
389 if let Some(ty) = array.element_ty.common_type(&value.ty()) {
390 array.element_ty = ty;
391 }
392
393 array.elements[index] = value;
394 }
395 Self::Call { outputs, .. } => {
396 for (k, v) in value.unwrap_call().outputs().iter() {
397 let output = outputs
398 .get_mut(k)
399 .expect("expected call output to be present");
400 if let Some(ty) = output.element_ty.common_type(&v.ty()) {
401 output.element_ty = ty;
402 }
403
404 output.elements[index] = v.clone();
405 }
406 }
407 }
408
409 Ok(())
410 }
411
412 fn into_value(self) -> Value {
414 match self {
415 Self::Array(array) => array.into_array().into(),
416 Self::Call { call_ty, outputs } => CallValue::new_unchecked(
417 call_ty,
418 Outputs::from_iter(outputs.into_iter().map(|(n, v)| (n, v.into_array().into())))
419 .into(),
420 )
421 .into(),
422 }
423 }
424}
425
426#[derive(Debug, Clone, Default)]
435struct Subgraph(HashMap<NodeIndex, usize>);
436
437impl Subgraph {
438 fn new(graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>) -> Self {
443 let mut nodes = HashMap::with_capacity(graph.node_count());
444 for index in graph.node_indices() {
445 nodes.insert(
446 index,
447 graph.edges_directed(index, Direction::Incoming).count(),
448 );
449 }
450
451 Self(nodes)
452 }
453
454 fn split(
460 &mut self,
461 graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
462 ) -> HashMap<NodeIndex, Subgraph> {
463 fn split(
474 graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
475 parent: &mut HashMap<NodeIndex, usize>,
476 entry: NodeIndex,
477 exit: NodeIndex,
478 ) -> HashMap<NodeIndex, usize> {
479 let mut nodes = HashMap::new();
480 let mut bfs = Bfs::new(graph, entry);
481 while let Some(node) = {
482 if bfs.stack.front() == Some(&exit) {
484 bfs.stack.pop_front();
485 }
486 bfs.next(graph)
487 } {
488 if node == entry || node == exit {
490 continue;
491 }
492
493 let prev = nodes.insert(
495 node,
496 parent.remove(&node).expect("node should exist in parent"),
497 );
498 assert!(prev.is_none());
499 }
500
501 for edge in graph.edges_directed(entry, Direction::Outgoing) {
504 if edge.target() != exit {
505 *nodes
506 .get_mut(&edge.target())
507 .expect("should be in subgraph") -= 1;
508 }
509 }
510
511 *parent.get_mut(&exit).expect("should have exit node") = 1;
513 nodes
514 }
515
516 fn split_recurse(
518 graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
519 nodes: &mut HashMap<NodeIndex, usize>,
520 subgraphs: &mut HashMap<NodeIndex, Subgraph>,
521 ) {
522 for index in graph.node_indices() {
523 if !nodes.contains_key(&index) {
524 continue;
525 }
526
527 match &graph[index] {
528 WorkflowGraphNode::Conditional(_, exit)
529 | WorkflowGraphNode::Scatter(_, exit) => {
530 let mut nodes = split(graph, nodes, index, *exit);
531 split_recurse(graph, &mut nodes, subgraphs);
532 subgraphs.insert(index, Subgraph(nodes));
533 }
534 _ => {}
535 }
536 }
537 }
538
539 let mut subgraphs = HashMap::new();
540 split_recurse(graph, &mut self.0, &mut subgraphs);
541 subgraphs
542 }
543
544 fn remove_node(&mut self, graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>, node: NodeIndex) {
550 let indegree = self.0.remove(&node);
551 assert_eq!(
552 indegree,
553 Some(0),
554 "removed a node with an indegree greater than 0"
555 );
556
557 for edge in graph.edges_directed(node, Direction::Outgoing) {
559 if let Some(indegree) = self.0.get_mut(&edge.target()) {
560 *indegree -= 1;
561 }
562 }
563 }
564}
565
566struct State {
568 config: Arc<Config>,
570 backend: Arc<dyn TaskExecutionBackend>,
572 token: CancellationToken,
574 document: Document,
576 inputs: WorkflowInputs,
578 scopes: RwLock<Scopes>,
580 graph: DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
582 subgraphs: HashMap<NodeIndex, Subgraph>,
584 temp_dir: PathBuf,
586 calls_dir: PathBuf,
588 downloader: HttpDownloader,
590}
591
592#[derive(Clone)]
596pub struct WorkflowEvaluator {
597 config: Arc<Config>,
599 backend: Arc<dyn TaskExecutionBackend>,
601 token: CancellationToken,
603 downloader: HttpDownloader,
605}
606
607impl WorkflowEvaluator {
608 pub async fn new(config: Config, token: CancellationToken) -> Result<Self> {
615 let backend = config.create_backend().await?;
616 Self::new_with_backend(config, backend, token)
617 }
618
619 pub fn new_with_backend(
624 config: Config,
625 backend: Arc<dyn TaskExecutionBackend>,
626 token: CancellationToken,
627 ) -> Result<Self> {
628 config.validate()?;
629
630 let config = Arc::new(config);
631 let downloader = HttpDownloader::new(config.clone())?;
632
633 Ok(Self {
634 config,
635 backend,
636 token,
637 downloader,
638 })
639 }
640
641 pub async fn evaluate<P, R>(
645 &self,
646 document: &Document,
647 inputs: WorkflowInputs,
648 root_dir: impl AsRef<Path>,
649 progress: P,
650 ) -> EvaluationResult<Outputs>
651 where
652 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
653 R: Future<Output = ()> + Send,
654 {
655 let workflow = document
656 .workflow()
657 .context("document does not contain a workflow")?;
658
659 self.evaluate_with_progress(
660 document,
661 inputs,
662 root_dir.as_ref(),
663 workflow.name(),
664 Arc::new(progress),
665 )
666 .await
667 }
668
669 async fn evaluate_with_progress<P, R>(
672 &self,
673 document: &Document,
674 inputs: WorkflowInputs,
675 root_dir: &Path,
676 id: &str,
677 progress: Arc<P>,
678 ) -> EvaluationResult<Outputs>
679 where
680 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
681 R: Future<Output = ()> + Send,
682 {
683 if document.has_errors() {
685 return Err(anyhow!("cannot evaluate a document with errors").into());
686 }
687
688 progress(ProgressKind::WorkflowStarted { id }).await;
689
690 let result = self
691 .perform_evaluation(document, inputs, root_dir, id, progress.clone())
692 .await;
693
694 progress(ProgressKind::WorkflowCompleted {
695 id,
696 result: &result,
697 })
698 .await;
699
700 result
701 }
702
703 async fn perform_evaluation<P, R>(
706 &self,
707 document: &Document,
708 inputs: WorkflowInputs,
709 root_dir: &Path,
710 id: &str,
711 progress: Arc<P>,
712 ) -> EvaluationResult<Outputs>
713 where
714 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
715 R: Future<Output = ()> + Send,
716 {
717 let workflow = document
719 .workflow()
720 .context("document does not contain a workflow")?;
721 inputs.validate(document, workflow, None).with_context(|| {
722 format!(
723 "failed to validate the inputs to workflow `{workflow}`",
724 workflow = workflow.name()
725 )
726 })?;
727
728 let ast = match document.root().morph().ast() {
729 Ast::V1(ast) => ast,
730 _ => {
731 return Err(
732 anyhow!("workflow evaluation is only supported for WDL 1.x documents").into(),
733 );
734 }
735 };
736
737 debug!(
738 workflow_id = id,
739 workflow_name = workflow.name(),
740 document = document.uri().as_str(),
741 "evaluating workflow",
742 );
743
744 let definition = ast
746 .workflows()
747 .next()
748 .expect("workflow should exist in the AST");
749
750 let mut diagnostics = Vec::new();
752
753 let graph = WorkflowGraphBuilder::default()
756 .build(&definition, &mut diagnostics, |name| inputs.contains(name));
757 assert!(
758 diagnostics.is_empty(),
759 "workflow evaluation graph should have no diagnostics"
760 );
761
762 let mut subgraph = Subgraph::new(&graph);
764 let subgraphs = subgraph.split(&graph);
765
766 let max_concurrency = self
767 .config
768 .workflow
769 .scatter
770 .concurrency
771 .unwrap_or_else(|| self.backend.max_concurrency());
772
773 let temp_dir = root_dir.join("tmp");
775 fs::create_dir_all(&temp_dir).with_context(|| {
776 format!(
777 "failed to create directory `{path}`",
778 path = temp_dir.display()
779 )
780 })?;
781
782 write_json_file(root_dir.join(INPUTS_FILE), &inputs)?;
784
785 let calls_dir = root_dir.join("calls");
786 fs::create_dir_all(&calls_dir).with_context(|| {
787 format!(
788 "failed to create directory `{path}`",
789 path = temp_dir.display()
790 )
791 })?;
792
793 let effective_output_dir = root_dir.to_path_buf();
794
795 let state = Arc::new(State {
796 config: self.config.clone(),
797 backend: self.backend.clone(),
798 token: self.token.clone(),
799 document: document.clone(),
800 inputs,
801 scopes: Default::default(),
802 graph,
803 subgraphs,
804 temp_dir,
805 calls_dir,
806 downloader: self.downloader.clone(),
807 });
808
809 Self::evaluate_subgraph(
811 state.clone(),
812 Scopes::ROOT_INDEX,
813 subgraph,
814 max_concurrency,
815 Arc::new(id.to_string()),
816 progress,
817 )
818 .await?;
819
820 if let Some(cleanup_fut) = self
821 .backend
822 .cleanup(&effective_output_dir, state.token.clone())
823 {
824 cleanup_fut.await;
825 }
826
827 let mut outputs: Outputs = state.scopes.write().await.take(Scopes::OUTPUT_INDEX).into();
828 if let Some(section) = definition.output() {
829 let indexes: HashMap<_, _> = section
830 .declarations()
831 .enumerate()
832 .map(|(i, d)| (d.name().hashable(), i))
833 .collect();
834 outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
835 }
836
837 write_json_file(root_dir.join(OUTPUTS_FILE), &outputs)?;
839 Ok(outputs)
840 }
841
842 fn evaluate_subgraph<P, R>(
850 state: Arc<State>,
851 scope: ScopeIndex,
852 subgraph: Subgraph,
853 max_concurrency: u64,
854 id: Arc<String>,
855 progress: Arc<P>,
856 ) -> BoxFuture<'static, EvaluationResult<()>>
857 where
858 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
859 R: Future<Output = ()> + Send,
860 {
861 async move {
862 let token = state.token.clone();
863 let mut futures = JoinSet::new();
864 match Self::perform_subgraph_evaluation(
865 state,
866 scope,
867 subgraph,
868 max_concurrency,
869 id,
870 progress,
871 &mut futures,
872 )
873 .await
874 {
875 Ok(_) => {
876 assert!(futures.is_empty());
878 Ok(())
879 }
880 Err(e) => {
881 token.cancel();
883 futures.join_all().await;
884 Err(e)
885 }
886 }
887 }
888 .boxed()
889 }
890
891 async fn perform_subgraph_evaluation<P, R>(
896 state: Arc<State>,
897 scope: ScopeIndex,
898 mut subgraph: Subgraph,
899 max_concurrency: u64,
900 id: Arc<String>,
901 progress: Arc<P>,
902 futures: &mut JoinSet<EvaluationResult<NodeIndex>>,
903 ) -> EvaluationResult<()>
904 where
905 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
906 R: Future<Output = ()> + Send,
907 {
908 let mut processing: Vec<NodeIndex> = Vec::new();
910 let mut awaiting: HashSet<NodeIndex> = HashSet::new();
912
913 while !subgraph.0.is_empty() {
914 processing.extend(subgraph.0.iter().filter_map(|(node, indegree)| {
916 if *indegree == 0 && !awaiting.contains(node) {
917 Some(*node)
918 } else {
919 None
920 }
921 }));
922
923 if processing.is_empty() {
925 let node: EvaluationResult<NodeIndex> = futures
926 .join_next()
927 .await
928 .expect("should have a future to wait on")
929 .expect("failed to join future");
930
931 let node = node?;
932 match &state.graph[node] {
933 WorkflowGraphNode::Call(stmt) => {
934 let call_name = stmt
935 .alias()
936 .map(|a| a.name())
937 .unwrap_or_else(|| stmt.target().names().last().unwrap());
938 debug!(
939 workflow_id = id.as_str(),
940 workflow_name = state.document.workflow().unwrap().name(),
941 document = state.document.uri().as_str(),
942 call_name = call_name.text(),
943 "evaluation of call statement has completed",
944 )
945 }
946 WorkflowGraphNode::Conditional(stmt, _) => debug!(
947 workflow_id = id.as_str(),
948 workflow_name = state.document.workflow().unwrap().name(),
949 document = state.document.uri().as_str(),
950 expr = {
951 let e = stmt.expr();
952 e.text().to_string()
953 },
954 "evaluation of conditional statement has completed",
955 ),
956 WorkflowGraphNode::Scatter(stmt, _) => {
957 let variable = stmt.variable();
958 debug!(
959 workflow_id = id.as_str(),
960 workflow_name = state.document.workflow().unwrap().name(),
961 document = state.document.uri().as_str(),
962 variable = variable.text(),
963 "evaluation of scatter statement has completed",
964 )
965 }
966 _ => unreachable!(),
967 }
968
969 awaiting.remove(&node);
970 subgraph.remove_node(&state.graph, node);
971
972 continue;
975 }
976
977 for node in processing.iter().copied() {
979 trace!(
980 workflow_id = id.as_str(),
981 workflow_name = state.document.workflow().unwrap().name(),
982 document = state.document.uri().as_str(),
983 "evaluating node `{n:?}` ({node:?})",
984 n = state.graph[node]
985 );
986 match &state.graph[node] {
987 WorkflowGraphNode::Input(decl) => Self::evaluate_input(&id, &state, decl)
988 .await
989 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
990 WorkflowGraphNode::Decl(decl) => Self::evaluate_decl(&id, &state, scope, decl)
991 .await
992 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
993 WorkflowGraphNode::Output(decl) => Self::evaluate_output(&id, &state, decl)
994 .await
995 .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
996 WorkflowGraphNode::Conditional(stmt, _) => {
997 let id = id.clone();
998 let state = state.clone();
999 let progress = progress.clone();
1000 let stmt = stmt.clone();
1001 futures.spawn(async move {
1002 Self::evaluate_conditional(
1003 id,
1004 state,
1005 scope,
1006 node,
1007 &stmt,
1008 max_concurrency,
1009 progress,
1010 )
1011 .await?;
1012 Ok(node)
1013 });
1014 awaiting.insert(node);
1015 }
1016 WorkflowGraphNode::Scatter(stmt, _) => {
1017 let id = id.clone();
1018 let state = state.clone();
1019 let progress = progress.clone();
1020 let stmt = stmt.clone();
1021 futures.spawn(async move {
1022 let token = state.token.clone();
1023 let mut futures = JoinSet::new();
1024 match Self::evaluate_scatter(
1025 id,
1026 state,
1027 scope,
1028 node,
1029 &stmt,
1030 max_concurrency,
1031 progress,
1032 &mut futures,
1033 )
1034 .await
1035 {
1036 Ok(_) => {
1037 assert!(futures.is_empty());
1039 Ok(node)
1040 }
1041 Err(e) => {
1042 token.cancel();
1044 futures.join_all().await;
1045 Err(e)
1046 }
1047 }
1048 });
1049 awaiting.insert(node);
1050 }
1051 WorkflowGraphNode::Call(stmt) => {
1052 let id = id.clone();
1053 let state = state.clone();
1054 let progress = progress.clone();
1055 let stmt = stmt.clone();
1056 futures.spawn(async move {
1057 Self::evaluate_call(&id, state, scope, &stmt, progress).await?;
1058 Ok(node)
1059 });
1060 awaiting.insert(node);
1061 }
1062 WorkflowGraphNode::ExitConditional(_) | WorkflowGraphNode::ExitScatter(_) => {
1063 continue;
1065 }
1066 }
1067 }
1068
1069 for node in processing.drain(..) {
1071 if awaiting.contains(&node) {
1072 continue;
1073 }
1074
1075 subgraph.remove_node(&state.graph, node);
1076 }
1077 }
1078
1079 Ok(())
1080 }
1081
1082 async fn evaluate_input(
1084 id: &str,
1085 state: &State,
1086 decl: &Decl<SyntaxNode>,
1087 ) -> Result<(), Diagnostic> {
1088 let name = decl.name();
1089 let expected_ty = crate::convert_ast_type_v1(&state.document, &decl.ty())?;
1090 let expr = decl.expr();
1091
1092 let (value, span) = match state.inputs.get(name.text()) {
1094 Some(input) => (input.clone(), name.span()),
1095 None => {
1096 if let Some(expr) = expr {
1097 debug!(
1098 workflow_id = id,
1099 workflow_name = state.document.workflow().unwrap().name(),
1100 document = state.document.uri().as_str(),
1101 input_name = name.text(),
1102 "evaluating input",
1103 );
1104
1105 (
1106 Self::evaluate_expr(state, Scopes::ROOT_INDEX, &expr).await?,
1107 expr.span(),
1108 )
1109 } else {
1110 assert!(expected_ty.is_optional(), "type should be optional");
1111 (Value::None, name.span())
1112 }
1113 }
1114 };
1115
1116 let value = value
1118 .coerce(&expected_ty)
1119 .map_err(|e| runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), span))?;
1120
1121 state
1123 .scopes
1124 .write()
1125 .await
1126 .get_mut(Scopes::ROOT_INDEX)
1127 .insert(name.text(), value);
1128 Ok(())
1129 }
1130
1131 async fn evaluate_decl(
1133 id: &str,
1134 state: &State,
1135 scope: ScopeIndex,
1136 decl: &Decl<SyntaxNode>,
1137 ) -> Result<(), Diagnostic> {
1138 let name = decl.name();
1139 let expected_ty = crate::convert_ast_type_v1(&state.document, &decl.ty())?;
1140 let expr = decl.expr().expect("declaration should have expression");
1141
1142 debug!(
1143 workflow_id = id,
1144 workflow_name = state.document.workflow().unwrap().name(),
1145 document = state.document.uri().as_str(),
1146 decl_name = name.text(),
1147 "evaluating private declaration",
1148 );
1149
1150 let value = Self::evaluate_expr(state, scope, &expr).await?;
1152
1153 let value = value.coerce(&expected_ty).map_err(|e| {
1155 runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), expr.span())
1156 })?;
1157
1158 state
1159 .scopes
1160 .write()
1161 .await
1162 .get_mut(scope)
1163 .insert(name.text(), value);
1164 Ok(())
1165 }
1166
1167 async fn evaluate_output(
1169 id: &str,
1170 state: &State,
1171 decl: &Decl<SyntaxNode>,
1172 ) -> Result<(), Diagnostic> {
1173 let name = decl.name();
1174 let expected_ty = crate::convert_ast_type_v1(&state.document, &decl.ty())?;
1175 let expr = decl.expr().expect("declaration should have expression");
1176
1177 debug!(
1178 workflow_id = id,
1179 workflow_name = state.document.workflow().unwrap().name(),
1180 document = state.document.uri().as_str(),
1181 output_name = name.text(),
1182 "evaluating output",
1183 );
1184
1185 let value = Self::evaluate_expr(state, Scopes::OUTPUT_INDEX, &expr).await?;
1187
1188 let mut value = value.coerce(&expected_ty).map_err(|e| {
1190 runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), expr.span())
1191 })?;
1192
1193 value
1195 .visit_paths_mut(expected_ty.is_optional(), &mut |optional, value| {
1196 let path = match value {
1197 PrimitiveValue::File(path) => path,
1198 PrimitiveValue::Directory(path) => path,
1199 _ => unreachable!("only file and directory values should be visited"),
1200 };
1201
1202 if !path::is_url(path) && Path::new(path.as_str()).is_relative() {
1203 bail!("relative path `{path}` cannot be a workflow output");
1204 }
1205
1206 value.ensure_path_exists(optional)
1207 })
1208 .map_err(|e| {
1209 output_evaluation_failed(
1210 e,
1211 state
1212 .document
1213 .workflow()
1214 .expect("should have workflow")
1215 .name(),
1216 false,
1217 name.text(),
1218 name.span(),
1219 )
1220 })?;
1221
1222 state
1224 .scopes
1225 .write()
1226 .await
1227 .get_mut(Scopes::OUTPUT_INDEX)
1228 .insert(name.text(), value);
1229 Ok(())
1230 }
1231
1232 async fn evaluate_conditional<P, R>(
1234 id: Arc<String>,
1235 state: Arc<State>,
1236 parent: ScopeIndex,
1237 entry: NodeIndex,
1238 stmt: &ConditionalStatement<SyntaxNode>,
1239 max_concurrency: u64,
1240 progress: Arc<P>,
1241 ) -> EvaluationResult<()>
1242 where
1243 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
1244 R: Future<Output = ()> + Send,
1245 {
1246 let expr = stmt.expr();
1247
1248 debug!(
1249 workflow_id = id.as_str(),
1250 workflow_name = state.document.workflow().unwrap().name(),
1251 document = state.document.uri().as_str(),
1252 expr = expr.text().to_string(),
1253 "evaluating conditional statement",
1254 );
1255
1256 let value = Self::evaluate_expr(&state, parent, &expr)
1258 .await
1259 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1260
1261 if value
1262 .coerce(&PrimitiveType::Boolean.into())
1263 .map_err(|e| {
1264 EvaluationError::new(
1265 state.document.clone(),
1266 if_conditional_mismatch(e, &value.ty(), expr.span()),
1267 )
1268 })?
1269 .unwrap_boolean()
1270 {
1271 debug!(
1272 workflow_id = id.as_str(),
1273 workflow_name = state.document.workflow().unwrap().name(),
1274 document = state.document.uri().as_str(),
1275 "conditional statement branch was taken and subgraph will be evaluated"
1276 );
1277
1278 let scope = { state.scopes.write().await.alloc(parent) };
1280
1281 Self::evaluate_subgraph(
1283 state.clone(),
1284 scope,
1285 state.subgraphs[&entry].clone(),
1286 max_concurrency,
1287 id,
1288 progress.clone(),
1289 )
1290 .await?;
1291
1292 let mut scopes = state.scopes.write().await;
1294 let (parent, child) = scopes.parent_mut(scope);
1295 for (name, value) in child.local() {
1296 parent.insert(name.to_string(), value.clone_as_optional());
1297 }
1298
1299 scopes.free(scope);
1300 } else {
1301 debug!(
1302 workflow_id = id.as_str(),
1303 workflow_name = state.document.workflow().unwrap().name(),
1304 document = state.document.uri().as_str(),
1305 "conditional statement branch was not taken and subgraph will be skipped"
1306 );
1307
1308 let mut scopes = state.scopes.write().await;
1311 let parent = scopes.get_mut(parent);
1312 let scope = state
1313 .document
1314 .find_scope_by_position(
1315 stmt.braced_scope_span()
1316 .expect("should have braced scope span")
1317 .start(),
1318 )
1319 .expect("should have scope");
1320
1321 for (name, n) in scope.names() {
1322 if let Type::Call(ty) = n.ty() {
1323 parent.insert(
1324 name.to_string(),
1325 CallValue::new_unchecked(
1326 ty.promote(PromotionKind::Conditional),
1327 Outputs::from_iter(
1328 ty.outputs().iter().map(|(n, _)| (n.clone(), Value::None)),
1329 )
1330 .into(),
1331 ),
1332 );
1333 } else {
1334 parent.insert(name.to_string(), Value::None);
1335 }
1336 }
1337 }
1338
1339 Ok(())
1340 }
1341
1342 #[allow(clippy::too_many_arguments)]
1344 async fn evaluate_scatter<P, R>(
1345 id: Arc<String>,
1346 state: Arc<State>,
1347 parent: ScopeIndex,
1348 entry: NodeIndex,
1349 stmt: &ScatterStatement<SyntaxNode>,
1350 max_concurrency: u64,
1351 progress: Arc<P>,
1352 futures: &mut JoinSet<EvaluationResult<(usize, ScopeIndex)>>,
1353 ) -> EvaluationResult<()>
1354 where
1355 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
1356 R: Future<Output = ()> + Send,
1357 {
1358 async fn await_next(
1360 futures: &mut JoinSet<EvaluationResult<(usize, ScopeIndex)>>,
1361 scopes: &RwLock<Scopes>,
1362 gathers: &mut HashMap<String, Gather>,
1363 capacity: usize,
1364 ) -> EvaluationResult<()> {
1365 let (index, scope) = futures
1366 .join_next()
1367 .await
1368 .expect("should have a future to wait on")
1369 .expect("failed to join future")?;
1370
1371 let mut scopes = scopes.write().await;
1374 for (name, value) in scopes.get_mut(scope).local().skip(2) {
1375 match gathers.get_mut(name) {
1376 Some(gather) => gather.set(index, value.clone())?,
1377 None => {
1378 let prev = gathers.insert(
1379 name.to_string(),
1380 Gather::new(capacity, index, value.clone()),
1381 );
1382 assert!(prev.is_none());
1383 }
1384 }
1385 }
1386
1387 scopes.free(scope);
1388 Ok(())
1389 }
1390
1391 let variable = stmt.variable();
1392 let expr = stmt.expr();
1393
1394 debug!(
1395 workflow_id = id.as_str(),
1396 workflow_name = state.document.workflow().unwrap().name(),
1397 document = state.document.uri().as_str(),
1398 variable = variable.text(),
1399 "evaluating scatter statement",
1400 );
1401
1402 let value = Self::evaluate_expr(&state, parent, &expr)
1404 .await
1405 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1406
1407 let array = value
1408 .as_array()
1409 .ok_or_else(|| {
1410 EvaluationError::new(
1411 state.document.clone(),
1412 type_is_not_array(&value.ty(), expr.span()),
1413 )
1414 })?
1415 .as_slice();
1416
1417 let mut gathers: HashMap<_, Gather> = HashMap::new();
1418 for (i, value) in array.iter().enumerate() {
1419 if state.token.is_cancelled() {
1420 return Err(anyhow!("workflow evaluation has been cancelled").into());
1421 }
1422
1423 let scope = {
1425 let mut scopes = state.scopes.write().await;
1426 let index = scopes.alloc(parent);
1427 let scope = scopes.get_mut(index);
1428 scope.insert(
1429 SCATTER_INDEX_VAR,
1430 i64::try_from(i).map_err(|_| anyhow!("array index out of bounds"))?,
1431 );
1432 scope.insert(variable.text(), value.clone());
1433 index
1434 };
1435
1436 {
1438 let state = state.clone();
1439 let subgraph = state.subgraphs[&entry].clone();
1440 let progress = progress.clone();
1441 let id = id.clone();
1442 futures.spawn(async move {
1443 Self::evaluate_subgraph(
1444 state.clone(),
1445 scope,
1446 subgraph,
1447 max_concurrency,
1448 id,
1449 progress,
1450 )
1451 .await?;
1452
1453 Ok((i, scope))
1454 });
1455 }
1456
1457 if futures.len() as u64 >= max_concurrency {
1459 await_next(futures, &state.scopes, &mut gathers, array.len()).await?;
1460 }
1461 }
1462
1463 while !futures.is_empty() {
1465 await_next(futures, &state.scopes, &mut gathers, array.len()).await?;
1466 }
1467
1468 let mut scopes = state.scopes.write().await;
1469 let scope = scopes.get_mut(parent);
1470 for (name, gather) in gathers {
1471 scope.insert(name, gather.into_value());
1472 }
1473
1474 Ok(())
1475 }
1476
1477 async fn evaluate_call<P, R>(
1479 id: &str,
1480 state: Arc<State>,
1481 scope: ScopeIndex,
1482 stmt: &CallStatement<SyntaxNode>,
1483 progress: Arc<P>,
1484 ) -> EvaluationResult<()>
1485 where
1486 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
1487 R: Future<Output = ()> + Send,
1488 {
1489 enum Evaluator<'a> {
1491 Task(&'a Task, TaskEvaluator),
1493 Workflow(WorkflowEvaluator),
1495 }
1496
1497 impl Evaluator<'_> {
1498 async fn evaluate<P, R>(
1502 self,
1503 caller_id: &str,
1504 document: &Document,
1505 inputs: Inputs,
1506 root_dir: &Path,
1507 callee_id: &str,
1508 progress: &Arc<P>,
1509 ) -> EvaluationResult<Outputs>
1510 where
1511 P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
1512 R: Future<Output = ()> + Send,
1513 {
1514 match self {
1515 Evaluator::Task(task, evaluator) => {
1516 debug!(caller_id, callee_id, "evaluating call to task");
1517 evaluator
1518 .evaluate_with_progress(
1519 document,
1520 task,
1521 &inputs.unwrap_task_inputs(),
1522 root_dir,
1523 callee_id,
1524 progress.clone(),
1525 )
1526 .await?
1527 .outputs
1528 }
1529 Evaluator::Workflow(evaluator) => {
1530 debug!(caller_id, callee_id, "evaluating call to workflow");
1531 evaluator
1532 .evaluate_with_progress(
1533 document,
1534 inputs.unwrap_workflow_inputs(),
1535 root_dir,
1536 callee_id,
1537 progress.clone(),
1538 )
1539 .await
1540 }
1541 }
1542 }
1543 }
1544
1545 let alias = stmt.alias();
1546 let target = stmt.target();
1547 let mut names = target.names().peekable();
1548 let mut namespace = None;
1549 let mut target = None;
1550
1551 while let Some(name) = names.next() {
1553 if names.peek().is_none() {
1554 target = Some(name);
1555 break;
1556 }
1557
1558 if namespace.is_some() {
1559 return Err(EvaluationError::new(
1560 state.document.clone(),
1561 only_one_namespace(name.span()),
1562 ));
1563 }
1564
1565 let ns = state.document.namespace(name.text()).ok_or_else(|| {
1566 EvaluationError::new(state.document.clone(), unknown_namespace(&name))
1567 })?;
1568
1569 namespace = Some((name, ns));
1570 }
1571
1572 let target = target.expect("expected at least one name");
1573
1574 let alias = alias
1575 .as_ref()
1576 .map(|t| t.name())
1577 .unwrap_or_else(|| target.clone());
1578
1579 debug!(
1580 workflow_id = id,
1581 workflow_name = state.document.workflow().unwrap().name(),
1582 document = state.document.uri().as_str(),
1583 call_name = alias.text(),
1584 "evaluating call statement",
1585 );
1586
1587 if namespace.is_none()
1589 && target.text()
1590 == state
1591 .document
1592 .workflow()
1593 .expect("should have workflow")
1594 .name()
1595 {
1596 return Err(EvaluationError::new(
1597 state.document.clone(),
1598 recursive_workflow_call(target.text(), target.span()),
1599 ));
1600 }
1601
1602 let inputs = state.inputs.calls().get(alias.text()).cloned();
1604 let document = namespace
1605 .as_ref()
1606 .map(|(_, ns)| ns.document())
1607 .unwrap_or(&state.document);
1608 let (mut inputs, evaluator) = match document.task_by_name(target.text()) {
1609 Some(task) => (
1610 inputs.unwrap_or_else(|| Inputs::Task(Default::default())),
1611 Evaluator::Task(
1612 task,
1613 TaskEvaluator::new_unchecked(
1614 state.config.clone(),
1615 state.backend.clone(),
1616 state.token.clone(),
1617 state.downloader.clone(),
1618 ),
1619 ),
1620 ),
1621 _ => match document.workflow() {
1622 Some(workflow) if workflow.name() == target.text() => (
1623 inputs.unwrap_or_else(|| Inputs::Workflow(Default::default())),
1624 Evaluator::Workflow(WorkflowEvaluator {
1625 config: state.config.clone(),
1626 backend: state.backend.clone(),
1627 token: state.token.clone(),
1628 downloader: state.downloader.clone(),
1629 }),
1630 ),
1631 _ => {
1632 return Err(EvaluationError::new(
1633 state.document.clone(),
1634 unknown_task_or_workflow(
1635 namespace.as_ref().map(|(_, ns)| ns.span()),
1636 target.text(),
1637 target.span(),
1638 ),
1639 ));
1640 }
1641 },
1642 };
1643
1644 let scatter_index = Self::evaluate_call_inputs(&state, stmt, scope, &mut inputs)
1646 .await
1647 .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1648
1649 let dir = format!(
1650 "{alias}{sep}{scatter_index}",
1651 alias = alias.text(),
1652 sep = if scatter_index.is_empty() { "" } else { "-" },
1653 );
1654
1655 let call_id = format_id(
1656 namespace.as_ref().map(|(n, _)| n.text()),
1657 target.text(),
1658 alias.text(),
1659 &scatter_index,
1660 );
1661
1662 let outputs = evaluator
1664 .evaluate(
1665 id,
1666 document,
1667 inputs,
1668 &state.calls_dir.join(&dir),
1669 &call_id,
1670 &progress,
1671 )
1672 .await
1673 .map_err(|mut e| {
1674 if let EvaluationError::Source(e) = &mut e {
1675 e.backtrace.push(CallLocation {
1676 document: state.document.clone(),
1677 span: stmt
1678 .token::<CallKeyword<SyntaxToken>>()
1679 .expect("should have call keyword")
1680 .span(),
1681 });
1682 }
1683
1684 e
1685 })?
1686 .with_name(alias.text());
1687
1688 let ty = state
1689 .document
1690 .workflow()
1691 .expect("should have workflow")
1692 .calls()
1693 .get(alias.text())
1694 .expect("should have call");
1695 state.scopes.write().await.get_mut(scope).insert(
1696 alias.text(),
1697 Value::Call(CallValue::new_unchecked(ty.clone(), Arc::new(outputs))),
1698 );
1699
1700 Ok(())
1701 }
1702
1703 async fn evaluate_expr(
1707 state: &State,
1708 scope: ScopeIndex,
1709 expr: &Expr<SyntaxNode>,
1710 ) -> Result<Value, Diagnostic> {
1711 let scopes = state.scopes.read().await;
1712 ExprEvaluator::new(WorkflowEvaluationContext::new(
1713 &state.document,
1714 scopes.reference(scope),
1715 &state.temp_dir,
1716 &state.downloader,
1717 ))
1718 .evaluate_expr(expr)
1719 .await
1720 }
1721
1722 async fn evaluate_call_inputs(
1728 state: &State,
1729 stmt: &CallStatement<SyntaxNode>,
1730 scope: ScopeIndex,
1731 inputs: &mut Inputs,
1732 ) -> Result<String, Diagnostic> {
1733 let scopes = state.scopes.read().await;
1734 for input in stmt.inputs() {
1735 let name = input.name();
1736 let value = match input.expr() {
1737 Some(expr) => {
1738 let mut evaluator = ExprEvaluator::new(WorkflowEvaluationContext::new(
1739 &state.document,
1740 scopes.reference(scope),
1741 &state.temp_dir,
1742 &state.downloader,
1743 ));
1744
1745 evaluator.evaluate_expr(&expr).await?
1746 }
1747 None => scopes
1748 .reference(scope)
1749 .lookup(name.text())
1750 .cloned()
1751 .ok_or_else(|| unknown_name(name.text(), name.span()))?,
1752 };
1753
1754 let prev = inputs.set(input.name().text(), value);
1755 assert!(
1756 prev.is_none(),
1757 "attempted to override a specified call input"
1758 );
1759 }
1760
1761 Ok(scopes.scatter_index(scope))
1762 }
1763}
1764
1765#[cfg(test)]
1766mod test {
1767 use std::fs::read_to_string;
1768 use std::sync::atomic::AtomicUsize;
1769 use std::sync::atomic::Ordering;
1770
1771 use pretty_assertions::assert_eq;
1772 use tempfile::TempDir;
1773 use wdl_analysis::Analyzer;
1774 use wdl_analysis::DiagnosticsConfig;
1775
1776 use super::*;
1777 use crate::config::BackendConfig;
1778
1779 #[tokio::test]
1780 async fn it_writes_input_and_output_files() {
1781 let root_dir = TempDir::new().expect("failed to create temporary directory");
1782 fs::write(
1783 root_dir.path().join("source.wdl"),
1784 r#"
1785version 1.2
1786
1787task foo {
1788 input {
1789 String a
1790 Int b
1791 Array[String] c
1792 }
1793
1794 command <<<>>>
1795
1796 output {
1797 String x = a
1798 Int y = b
1799 Array[String] z = c
1800 }
1801}
1802
1803workflow test {
1804 input {
1805 String a
1806 Int b
1807 Array[String] c
1808 }
1809
1810 call foo {
1811 a = "foo",
1812 b = 10,
1813 c = ["foo", "bar", "baz"]
1814 }
1815
1816 call foo as bar {
1817 a = "bar",
1818 b = 1,
1819 c = []
1820 }
1821
1822 output {
1823 String x = a
1824 Int y = b
1825 Array[String] z = c
1826 }
1827}
1828"#,
1829 )
1830 .expect("failed to write WDL source file");
1831
1832 let analyzer = Analyzer::new(DiagnosticsConfig::except_all(), |(), _, _, _| async {});
1834 analyzer
1835 .add_directory(root_dir.path().to_path_buf())
1836 .await
1837 .expect("failed to add directory");
1838 let results = analyzer
1839 .analyze(())
1840 .await
1841 .expect("failed to analyze document");
1842 assert_eq!(results.len(), 1, "expected only one result");
1843
1844 let config = Config {
1845 backend: BackendConfig::Local(Default::default()),
1846 ..Default::default()
1847 };
1848 let evaluator = WorkflowEvaluator::new(config, CancellationToken::new())
1849 .await
1850 .unwrap();
1851
1852 let mut inputs = WorkflowInputs::default();
1854 inputs.set("a", "qux".to_string());
1855 inputs.set("b", 1234);
1856 inputs.set(
1857 "c",
1858 Array::new(
1859 ArrayType::new(PrimitiveType::String),
1860 ["jam".to_string(), "cakes".to_string()],
1861 )
1862 .unwrap(),
1863 );
1864 let outputs_dir = root_dir.path().join("outputs");
1865 let outputs = evaluator
1866 .evaluate(
1867 results.first().expect("should have result").document(),
1868 inputs,
1869 &outputs_dir,
1870 |_| async {},
1871 )
1872 .await
1873 .expect("failed to evaluate workflow");
1874 assert_eq!(outputs.iter().count(), 3, "expected three outputs");
1875
1876 assert_eq!(
1878 read_to_string(outputs_dir.join("inputs.json"))
1879 .expect("failed to read workflow `inputs.json`"),
1880 "{\n \"a\": \"qux\",\n \"b\": 1234,\n \"c\": [\n \"jam\",\n \"cakes\"\n ]\n}"
1881 );
1882
1883 assert_eq!(
1885 read_to_string(outputs_dir.join("outputs.json"))
1886 .expect("failed to read workflow `outputs.json`"),
1887 "{\n \"x\": \"qux\",\n \"y\": 1234,\n \"z\": [\n \"jam\",\n \"cakes\"\n ]\n}"
1888 );
1889
1890 assert_eq!(
1892 read_to_string(outputs_dir.join("calls/foo/inputs.json"))
1893 .expect("failed to read foo `inputs.json`"),
1894 "{\n \"a\": \"foo\",\n \"b\": 10,\n \"c\": [\n \"foo\",\n \"bar\",\n \
1895 \"baz\"\n ]\n}"
1896 );
1897
1898 assert_eq!(
1900 read_to_string(outputs_dir.join("calls/foo/outputs.json"))
1901 .expect("failed to read foo `outputs.json`"),
1902 "{\n \"x\": \"foo\",\n \"y\": 10,\n \"z\": [\n \"foo\",\n \"bar\",\n \
1903 \"baz\"\n ]\n}"
1904 );
1905
1906 assert_eq!(
1908 read_to_string(outputs_dir.join("calls/bar/inputs.json"))
1909 .expect("failed to read bar `inputs.json`"),
1910 "{\n \"a\": \"bar\",\n \"b\": 1,\n \"c\": []\n}"
1911 );
1912
1913 assert_eq!(
1915 read_to_string(outputs_dir.join("calls/bar/outputs.json"))
1916 .expect("failed to read bar `outputs.json`"),
1917 "{\n \"x\": \"bar\",\n \"y\": 1,\n \"z\": []\n}"
1918 );
1919 }
1920
1921 #[tokio::test]
1922 async fn it_reports_progress() {
1923 let root_dir = TempDir::new().expect("failed to create temporary directory");
1926 fs::write(
1927 root_dir.path().join("other.wdl"),
1928 r#"
1929version 1.1
1930workflow w {}
1931"#,
1932 )
1933 .expect("failed to write WDL source file");
1934
1935 let source_path = root_dir.path().join("source.wdl");
1936 fs::write(
1937 &source_path,
1938 r#"
1939version 1.1
1940
1941import "other.wdl"
1942
1943task t {
1944 command <<<>>>
1945}
1946
1947workflow w {
1948 scatter (i in range(10)) {
1949 call t
1950 }
1951
1952 scatter (j in range(25)) {
1953 call other.w
1954 }
1955}
1956"#,
1957 )
1958 .expect("failed to write WDL source file");
1959
1960 let analyzer = Analyzer::new(DiagnosticsConfig::except_all(), |(), _, _, _| async {});
1962 analyzer
1963 .add_directory(root_dir.path().to_path_buf())
1964 .await
1965 .expect("failed to add directory");
1966 let results = analyzer
1967 .analyze(())
1968 .await
1969 .expect("failed to analyze document");
1970 assert_eq!(results.len(), 2, "expected only two results");
1971
1972 #[derive(Default)]
1974 struct State {
1975 tasks_started: AtomicUsize,
1976 tasks_executions_started: AtomicUsize,
1977 tasks_executions_completed: AtomicUsize,
1978 tasks_completed: AtomicUsize,
1979 workflows_started: AtomicUsize,
1980 workflows_completed: AtomicUsize,
1981 }
1982
1983 let config = Config {
1985 backend: BackendConfig::Local(Default::default()),
1986 ..Default::default()
1987 };
1988 let state = Arc::<State>::default();
1989 let state_cloned = state.clone();
1990 let evaluator = WorkflowEvaluator::new(config, CancellationToken::new())
1991 .await
1992 .unwrap();
1993
1994 let outputs = evaluator
1997 .evaluate(
1998 results
1999 .iter()
2000 .find(|r| r.document().uri().as_str().ends_with("source.wdl"))
2001 .expect("should have result")
2002 .document(),
2003 WorkflowInputs::default(),
2004 root_dir.path(),
2005 move |kind| {
2006 match kind {
2007 ProgressKind::TaskStarted { id, .. } => {
2008 assert!(id.starts_with("t-"));
2009 state_cloned.tasks_started.fetch_add(1, Ordering::SeqCst);
2010 }
2011 ProgressKind::TaskRetried { .. } => panic!("task should not be retried"),
2012 ProgressKind::TaskExecutionStarted { id, .. } => {
2013 assert!(id.starts_with("t-"));
2014 state_cloned
2015 .tasks_executions_started
2016 .fetch_add(1, Ordering::SeqCst);
2017 }
2018 ProgressKind::TaskExecutionCompleted { id, .. } => {
2019 assert!(id.starts_with("t-"));
2020 state_cloned
2021 .tasks_executions_completed
2022 .fetch_add(1, Ordering::SeqCst);
2023 }
2024 ProgressKind::TaskCompleted { id, .. } => {
2025 assert!(id.starts_with("t-"));
2026 state_cloned.tasks_completed.fetch_add(1, Ordering::SeqCst);
2027 }
2028 ProgressKind::WorkflowStarted { id, .. } => {
2029 assert!(id == "w" || id.starts_with("other-w-"));
2030 state_cloned
2031 .workflows_started
2032 .fetch_add(1, Ordering::SeqCst);
2033 }
2034 ProgressKind::WorkflowCompleted { id, .. } => {
2035 assert!(id == "w" || id.starts_with("other-w-"));
2036 state_cloned
2037 .workflows_completed
2038 .fetch_add(1, Ordering::SeqCst);
2039 }
2040 }
2041
2042 async {}
2043 },
2044 )
2045 .await
2046 .expect("failed to evaluate workflow");
2047 assert_eq!(outputs.iter().count(), 0, "expected no outputs");
2048
2049 assert_eq!(state.tasks_started.load(Ordering::SeqCst), 10);
2051 assert_eq!(state.tasks_executions_started.load(Ordering::SeqCst), 10);
2052 assert_eq!(state.tasks_executions_completed.load(Ordering::SeqCst), 10);
2053 assert_eq!(state.tasks_completed.load(Ordering::SeqCst), 10);
2054 assert_eq!(state.workflows_started.load(Ordering::SeqCst), 26);
2055 assert_eq!(state.workflows_completed.load(Ordering::SeqCst), 26);
2056 }
2057}