wdl_engine/eval/v1/
workflow.rs

1//! Implementation of evaluation for V1 workflows.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::fmt::Write;
7use std::fs;
8use std::future::Future;
9use std::mem;
10use std::path::Path;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use anyhow::Context;
15use anyhow::Result;
16use anyhow::anyhow;
17use anyhow::bail;
18use futures::FutureExt;
19use futures::future::BoxFuture;
20use indexmap::IndexMap;
21use petgraph::Direction;
22use petgraph::graph::DiGraph;
23use petgraph::graph::NodeIndex;
24use petgraph::visit::Bfs;
25use petgraph::visit::EdgeRef;
26use tokio::sync::RwLock;
27use tokio::task::JoinSet;
28use tokio_util::sync::CancellationToken;
29use tracing::debug;
30use tracing::trace;
31use wdl_analysis::Document;
32use wdl_analysis::diagnostics::only_one_namespace;
33use wdl_analysis::diagnostics::recursive_workflow_call;
34use wdl_analysis::diagnostics::type_is_not_array;
35use wdl_analysis::diagnostics::unknown_name;
36use wdl_analysis::diagnostics::unknown_namespace;
37use wdl_analysis::diagnostics::unknown_task_or_workflow;
38use wdl_analysis::document::Task;
39use wdl_analysis::eval::v1::WorkflowGraphBuilder;
40use wdl_analysis::eval::v1::WorkflowGraphNode;
41use wdl_analysis::types::ArrayType;
42use wdl_analysis::types::CallType;
43use wdl_analysis::types::Optional;
44use wdl_analysis::types::PrimitiveType;
45use wdl_analysis::types::PromotionKind;
46use wdl_analysis::types::Type;
47use wdl_ast::Ast;
48use wdl_ast::AstNode;
49use wdl_ast::AstToken;
50use wdl_ast::Diagnostic;
51use wdl_ast::Span;
52use wdl_ast::SupportedVersion;
53use wdl_ast::v1::CallKeyword;
54use wdl_ast::v1::CallStatement;
55use wdl_ast::v1::ConditionalStatement;
56use wdl_ast::v1::Decl;
57use wdl_ast::v1::Expr;
58use wdl_ast::v1::ScatterStatement;
59
60use super::ProgressKind;
61use crate::Array;
62use crate::CallLocation;
63use crate::CallValue;
64use crate::Coercible;
65use crate::EvaluationContext;
66use crate::EvaluationError;
67use crate::EvaluationResult;
68use crate::Inputs;
69use crate::Outputs;
70use crate::PrimitiveValue;
71use crate::Scope;
72use crate::ScopeIndex;
73use crate::ScopeRef;
74use crate::TaskExecutionBackend;
75use crate::Value;
76use crate::WorkflowInputs;
77use crate::config::Config;
78use crate::diagnostics::if_conditional_mismatch;
79use crate::diagnostics::output_evaluation_failed;
80use crate::diagnostics::runtime_type_mismatch;
81use crate::http::Downloader;
82use crate::http::HttpDownloader;
83use crate::path;
84use crate::path::EvaluationPath;
85use crate::tree::SyntaxNode;
86use crate::tree::SyntaxToken;
87use crate::v1::ExprEvaluator;
88use crate::v1::INPUTS_FILE;
89use crate::v1::OUTPUTS_FILE;
90use crate::v1::TaskEvaluator;
91use crate::v1::write_json_file;
92
93/// Helper for formatting a workflow or task identifier for a call statement.
94fn format_id(namespace: Option<&str>, target: &str, alias: &str, scatter_index: &str) -> String {
95    if alias != target {
96        match namespace {
97            Some(ns) => {
98                format!(
99                    "{ns}-{target}-{alias}{sep}{scatter_index}",
100                    sep = if scatter_index.is_empty() { "" } else { "-" },
101                )
102            }
103            None => {
104                format!(
105                    "{target}-{alias}{sep}{scatter_index}",
106                    sep = if scatter_index.is_empty() { "" } else { "-" },
107                )
108            }
109        }
110    } else {
111        match namespace {
112            Some(ns) => {
113                format!(
114                    "{ns}-{alias}{sep}{scatter_index}",
115                    sep = if scatter_index.is_empty() { "" } else { "-" },
116                )
117            }
118            None => {
119                format!(
120                    "{alias}{sep}{scatter_index}",
121                    sep = if scatter_index.is_empty() { "" } else { "-" },
122                )
123            }
124        }
125    }
126}
127
128/// A "hidden" scope variable for representing the scope's scatter index.
129///
130/// This is only present in the scope created for a scatter statement.
131///
132/// The name is intentionally not a valid WDL identifier so that it cannot
133/// conflict with any other variables in scope.
134const SCATTER_INDEX_VAR: &str = "$idx";
135
136/// Used to evaluate expressions in workflows.
137struct WorkflowEvaluationContext<'a, 'b> {
138    /// The document being evaluated.
139    document: &'a Document,
140    /// The scope being evaluated.
141    scope: ScopeRef<'b>,
142    /// The workflow's temporary directory.
143    temp_dir: &'a Path,
144    /// The downloader for expression evaluation.
145    downloader: &'a HttpDownloader,
146}
147
148impl<'a, 'b> WorkflowEvaluationContext<'a, 'b> {
149    /// Constructs a new expression evaluation context.
150    pub fn new(
151        document: &'a Document,
152        scope: ScopeRef<'b>,
153        temp_dir: &'a Path,
154        downloader: &'a HttpDownloader,
155    ) -> Self {
156        Self {
157            document,
158            scope,
159            temp_dir,
160            downloader,
161        }
162    }
163}
164
165impl EvaluationContext for WorkflowEvaluationContext<'_, '_> {
166    fn version(&self) -> SupportedVersion {
167        self.document
168            .version()
169            .expect("document should have a version")
170    }
171
172    fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
173        self.scope
174            .lookup(name)
175            .cloned()
176            .ok_or_else(|| unknown_name(name, span))
177    }
178
179    fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
180        crate::resolve_type_name(self.document, name, span)
181    }
182
183    fn work_dir(&self) -> Option<&EvaluationPath> {
184        None
185    }
186
187    fn temp_dir(&self) -> &Path {
188        self.temp_dir
189    }
190
191    fn stdout(&self) -> Option<&Value> {
192        None
193    }
194
195    fn stderr(&self) -> Option<&Value> {
196        None
197    }
198
199    fn task(&self) -> Option<&Task> {
200        None
201    }
202
203    fn translate_path(&self, _path: &str) -> Option<Cow<'_, Path>> {
204        None
205    }
206
207    fn downloader(&self) -> &dyn Downloader {
208        self.downloader
209    }
210}
211
212/// The scopes collection used for workflow evaluation.
213#[derive(Debug)]
214struct Scopes {
215    /// The scopes available in workflow evaluation.
216    ///
217    /// The first scope is always the root scope and the second scope is always
218    /// the output scope.
219    ///
220    /// An index in this collection might be either "in use" or "free"; if the
221    /// latter, the index will be recorded in the `free` collection.
222    all: Vec<Scope>,
223    /// Indexes into `scopes` that are currently "free".
224    ///
225    /// This helps reduce memory usage by reusing scopes from scatter
226    /// statements.
227    free: Vec<ScopeIndex>,
228}
229
230impl Scopes {
231    /// The index of a workflow's output scope.
232    const OUTPUT_INDEX: ScopeIndex = ScopeIndex::new(1);
233    /// The index of a workflow's root scope.
234    const ROOT_INDEX: ScopeIndex = ScopeIndex::new(0);
235
236    /// Allocates a new scope and returns the scope index.
237    fn alloc(&mut self, parent: ScopeIndex) -> ScopeIndex {
238        if let Some(index) = self.free.pop() {
239            self.all[index.0].set_parent(parent);
240            return index;
241        }
242
243        let index = self.all.len();
244        self.all.push(Scope::new(parent));
245        index.into()
246    }
247
248    /// Gets a reference to the given scope.
249    fn reference(&self, index: ScopeIndex) -> ScopeRef<'_> {
250        ScopeRef::new(&self.all, index)
251    }
252
253    /// Takes a scope from the collection, replacing it with a default.
254    ///
255    /// Note that this does not free the scope.
256    fn take(&mut self, index: ScopeIndex) -> Scope {
257        mem::take(&mut self.all[index.0])
258    }
259
260    /// Gets a mutable reference to the given scope index.
261    fn get_mut(&mut self, index: ScopeIndex) -> &mut Scope {
262        &mut self.all[index.0]
263    }
264
265    /// Gets a mutable reference to the given scope's parent and a reference to
266    /// the given scope.
267    fn parent_mut(&mut self, index: ScopeIndex) -> (&mut Scope, &Scope) {
268        let parent = self.all[index.0].parent.expect("should have parent");
269        if index.0 < parent.0 {
270            let (left, right) = self.all.split_at_mut(index.0 + 1);
271            (&mut right[parent.0 - index.0 - 1], &left[index.0])
272        } else {
273            let (left, right) = self.all.split_at_mut(parent.0 + 1);
274            (&mut left[parent.0], &right[index.0 - parent.0 - 1])
275        }
276    }
277
278    /// Gets the scatter index for the given scope as a string.
279    fn scatter_index(&self, scope: ScopeIndex) -> String {
280        let mut scope = ScopeRef::new(&self.all, scope);
281        let mut s = String::new();
282        loop {
283            if let Some(value) = scope.local(SCATTER_INDEX_VAR) {
284                if !s.is_empty() {
285                    s.push('-');
286                }
287
288                write!(
289                    &mut s,
290                    "{i}",
291                    i = value.as_integer().expect("index should be an integer")
292                )
293                .expect("failed to write to string");
294            }
295
296            match scope.parent() {
297                Some(parent) => scope = parent,
298                None => break,
299            }
300        }
301
302        s
303    }
304
305    /// Frees a scope that is no longer used.
306    ///
307    /// The scope isn't actually deallocated, just cleared and marked as free to
308    /// be reused.
309    fn free(&mut self, index: ScopeIndex) {
310        let scope = &mut self.all[index.0];
311        scope.clear();
312        self.free.push(index);
313    }
314}
315
316impl Default for Scopes {
317    fn default() -> Self {
318        Self {
319            // Create both the root and output scopes
320            all: vec![Scope::default(), Scope::new(Self::ROOT_INDEX)],
321            free: Default::default(),
322        }
323    }
324}
325
326/// Represents an array being gathered for a scatter statement.
327struct GatherArray {
328    /// The element type of the gather array.
329    element_ty: Type,
330    /// The elements of the gather array.
331    elements: Vec<Value>,
332}
333
334impl GatherArray {
335    /// Constructs a new gather array given the first completed element and
336    /// capacity of the array.
337    fn new(index: usize, value: Value, capacity: usize) -> Self {
338        let element_ty = value.ty();
339        let mut elements = vec![Value::None; capacity];
340        elements[index] = value;
341        Self {
342            element_ty,
343            elements,
344        }
345    }
346
347    /// Converts the gather array into a WDL array value.
348    fn into_array(self) -> Array {
349        Array::new_unchecked(ArrayType::new(self.element_ty).into(), self.elements)
350    }
351}
352
353/// Represents the result of gathering the scatter.
354enum Gather {
355    /// The values are being gathered into an array value.
356    Array(GatherArray),
357    /// The values are being gathered into a call value.
358    Call {
359        /// The type of the call being gathered.
360        call_ty: CallType,
361        /// The gathered outputs of the call.
362        outputs: IndexMap<String, GatherArray>,
363    },
364}
365
366impl Gather {
367    /// Constructs a new gather from the first scatter result with the given
368    /// index.
369    fn new(capacity: usize, index: usize, value: Value) -> Self {
370        if let Value::Call(call) = value {
371            return Self::Call {
372                call_ty: call.ty().promote(PromotionKind::Scatter),
373                outputs: call
374                    .outputs()
375                    .iter()
376                    .map(|(n, v)| (n.to_string(), GatherArray::new(index, v.clone(), capacity)))
377                    .collect(),
378            };
379        }
380
381        Self::Array(GatherArray::new(index, value, capacity))
382    }
383
384    /// Sets the value with the given gather array index.
385    fn set(&mut self, index: usize, value: Value) -> EvaluationResult<()> {
386        match self {
387            Self::Array(array) => {
388                assert!(value.as_call().is_none(), "value should not be a call");
389                if let Some(ty) = array.element_ty.common_type(&value.ty()) {
390                    array.element_ty = ty;
391                }
392
393                array.elements[index] = value;
394            }
395            Self::Call { outputs, .. } => {
396                for (k, v) in value.unwrap_call().outputs().iter() {
397                    let output = outputs
398                        .get_mut(k)
399                        .expect("expected call output to be present");
400                    if let Some(ty) = output.element_ty.common_type(&v.ty()) {
401                        output.element_ty = ty;
402                    }
403
404                    output.elements[index] = v.clone();
405                }
406            }
407        }
408
409        Ok(())
410    }
411
412    /// Converts the gather into a WDL value.
413    fn into_value(self) -> Value {
414        match self {
415            Self::Array(array) => array.into_array().into(),
416            Self::Call { call_ty, outputs } => CallValue::new_unchecked(
417                call_ty,
418                Outputs::from_iter(outputs.into_iter().map(|(n, v)| (n, v.into_array().into())))
419                    .into(),
420            )
421            .into(),
422        }
423    }
424}
425
426/// Represents a subgraph of a workflow evaluation graph.
427///
428/// The subgraph stores relevant node indexes mapped to their current indegrees.
429///
430/// Scatter and conditional statements introduce new subgraphs for evaluation.
431///
432/// Subgraphs are entirely disjoint; no two subgraphs will share the same node
433/// index from the original evaluation graph.
434#[derive(Debug, Clone, Default)]
435struct Subgraph(HashMap<NodeIndex, usize>);
436
437impl Subgraph {
438    /// Constructs a new subgraph from the given evaluation graph.
439    ///
440    /// Initially, the subgraph will contain every node in the evaluation graph
441    /// until it is split.
442    fn new(graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>) -> Self {
443        let mut nodes = HashMap::with_capacity(graph.node_count());
444        for index in graph.node_indices() {
445            nodes.insert(
446                index,
447                graph.edges_directed(index, Direction::Incoming).count(),
448            );
449        }
450
451        Self(nodes)
452    }
453
454    /// Splits this subgraph and returns a map of entry nodes to the
455    /// corresponding subgraph.
456    ///
457    /// This subgraph is modified to replace any direct subgraphs with only the
458    /// entry and exit nodes.
459    fn split(
460        &mut self,
461        graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
462    ) -> HashMap<NodeIndex, Subgraph> {
463        /// Splits a parent subgraph for a scatter or conditional statement.
464        ///
465        /// This works by "stealing" the parent's nodes between the entry and
466        /// exit nodes into a new subgraph.
467        ///
468        /// The exit node of the parent graph is reduced to an indegree of 1;
469        /// only the connection between the entry and exit node will
470        /// remain.
471        ///
472        /// Returns the nodes that comprise the new subgraph.
473        fn split(
474            graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
475            parent: &mut HashMap<NodeIndex, usize>,
476            entry: NodeIndex,
477            exit: NodeIndex,
478        ) -> HashMap<NodeIndex, usize> {
479            let mut nodes = HashMap::new();
480            let mut bfs = Bfs::new(graph, entry);
481            while let Some(node) = {
482                // Don't visit the exit node
483                if bfs.stack.front() == Some(&exit) {
484                    bfs.stack.pop_front();
485                }
486                bfs.next(graph)
487            } {
488                // Don't include the entry or exit nodes in the subgraph
489                if node == entry || node == exit {
490                    continue;
491                }
492
493                // Steal the node from the parent
494                let prev = nodes.insert(
495                    node,
496                    parent.remove(&node).expect("node should exist in parent"),
497                );
498                assert!(prev.is_none());
499            }
500
501            // Decrement the indegree the nodes connected to the entry as we're not
502            // including it in the subgraph
503            for edge in graph.edges_directed(entry, Direction::Outgoing) {
504                if edge.target() != exit {
505                    *nodes
506                        .get_mut(&edge.target())
507                        .expect("should be in subgraph") -= 1;
508                }
509            }
510
511            // Set the exit node to an indegree of 1 (incoming from the entry node)
512            *parent.get_mut(&exit).expect("should have exit node") = 1;
513            nodes
514        }
515
516        /// Used to recursively split the subgraph.
517        fn split_recurse(
518            graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
519            nodes: &mut HashMap<NodeIndex, usize>,
520            subgraphs: &mut HashMap<NodeIndex, Subgraph>,
521        ) {
522            for index in graph.node_indices() {
523                if !nodes.contains_key(&index) {
524                    continue;
525                }
526
527                match &graph[index] {
528                    WorkflowGraphNode::Conditional(_, exit)
529                    | WorkflowGraphNode::Scatter(_, exit) => {
530                        let mut nodes = split(graph, nodes, index, *exit);
531                        split_recurse(graph, &mut nodes, subgraphs);
532                        subgraphs.insert(index, Subgraph(nodes));
533                    }
534                    _ => {}
535                }
536            }
537        }
538
539        let mut subgraphs = HashMap::new();
540        split_recurse(graph, &mut self.0, &mut subgraphs);
541        subgraphs
542    }
543
544    /// Removes the given node from the subgraph.
545    ///
546    /// # Panics
547    ///
548    /// Panics if the node's indegree is not 0.
549    fn remove_node(&mut self, graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>, node: NodeIndex) {
550        let indegree = self.0.remove(&node);
551        assert_eq!(
552            indegree,
553            Some(0),
554            "removed a node with an indegree greater than 0"
555        );
556
557        // Decrement the indegrees of connected nodes
558        for edge in graph.edges_directed(node, Direction::Outgoing) {
559            if let Some(indegree) = self.0.get_mut(&edge.target()) {
560                *indegree -= 1;
561            }
562        }
563    }
564}
565
566/// Represents workflow evaluation state.
567struct State {
568    /// The evaluation configuration to use.
569    config: Arc<Config>,
570    /// The task execution backend to use.
571    backend: Arc<dyn TaskExecutionBackend>,
572    /// The cancellation token for cancelling workflow evaluation.
573    token: CancellationToken,
574    /// The document containing the workflow being evaluated.
575    document: Document,
576    /// The workflow's inputs.
577    inputs: WorkflowInputs,
578    /// The scopes used in workflow evaluation.
579    scopes: RwLock<Scopes>,
580    /// The workflow evaluation graph.
581    graph: DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
582    /// The map from graph node index to subgraph.
583    subgraphs: HashMap<NodeIndex, Subgraph>,
584    /// The workflow evaluation temp directory path.
585    temp_dir: PathBuf,
586    /// The calls directory path.
587    calls_dir: PathBuf,
588    /// The downloader for expression evaluation.
589    downloader: HttpDownloader,
590}
591
592/// Represents a WDL V1 workflow evaluator.
593///
594/// This type is cheaply cloned.
595#[derive(Clone)]
596pub struct WorkflowEvaluator {
597    /// The configuration for evaluation.
598    config: Arc<Config>,
599    /// The associated task execution backend.
600    backend: Arc<dyn TaskExecutionBackend>,
601    /// The cancellation token for cancelling workflow evaluation.
602    token: CancellationToken,
603    /// The downloader for expression evaluation.
604    downloader: HttpDownloader,
605}
606
607impl WorkflowEvaluator {
608    /// Constructs a new workflow evaluator with the given evaluation
609    /// configuration and cancellation token.
610    ///
611    /// This method creates a default task execution backend.
612    ///
613    /// Returns an error if the configuration isn't valid.
614    pub async fn new(config: Config, token: CancellationToken) -> Result<Self> {
615        let backend = config.create_backend().await?;
616        Self::new_with_backend(config, backend, token)
617    }
618
619    /// Constructs a new workflow evaluator with the given evaluation
620    /// configuration, task execution backend, and cancellation token.
621    ///
622    /// Returns an error if the configuration isn't valid.
623    pub fn new_with_backend(
624        config: Config,
625        backend: Arc<dyn TaskExecutionBackend>,
626        token: CancellationToken,
627    ) -> Result<Self> {
628        config.validate()?;
629
630        let config = Arc::new(config);
631        let downloader = HttpDownloader::new(config.clone())?;
632
633        Ok(Self {
634            config,
635            backend,
636            token,
637            downloader,
638        })
639    }
640
641    /// Evaluates the workflow of the given document.
642    ///
643    /// Upon success, returns the outputs of the workflow.
644    pub async fn evaluate<P, R>(
645        &self,
646        document: &Document,
647        inputs: WorkflowInputs,
648        root_dir: impl AsRef<Path>,
649        progress: P,
650    ) -> EvaluationResult<Outputs>
651    where
652        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
653        R: Future<Output = ()> + Send,
654    {
655        let workflow = document
656            .workflow()
657            .context("document does not contain a workflow")?;
658
659        self.evaluate_with_progress(
660            document,
661            inputs,
662            root_dir.as_ref(),
663            workflow.name(),
664            Arc::new(progress),
665        )
666        .await
667    }
668
669    /// Evaluates the workflow of the given document with the given shared
670    /// progress callback.
671    async fn evaluate_with_progress<P, R>(
672        &self,
673        document: &Document,
674        inputs: WorkflowInputs,
675        root_dir: &Path,
676        id: &str,
677        progress: Arc<P>,
678    ) -> EvaluationResult<Outputs>
679    where
680        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
681        R: Future<Output = ()> + Send,
682    {
683        // We cannot evaluate a document with errors
684        if document.has_errors() {
685            return Err(anyhow!("cannot evaluate a document with errors").into());
686        }
687
688        progress(ProgressKind::WorkflowStarted { id }).await;
689
690        let result = self
691            .perform_evaluation(document, inputs, root_dir, id, progress.clone())
692            .await;
693
694        progress(ProgressKind::WorkflowCompleted {
695            id,
696            result: &result,
697        })
698        .await;
699
700        result
701    }
702
703    /// Evaluates the workflow of the given document with the given shared
704    /// progress callback.
705    async fn perform_evaluation<P, R>(
706        &self,
707        document: &Document,
708        inputs: WorkflowInputs,
709        root_dir: &Path,
710        id: &str,
711        progress: Arc<P>,
712    ) -> EvaluationResult<Outputs>
713    where
714        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
715        R: Future<Output = ()> + Send,
716    {
717        // Validate the inputs for the workflow
718        let workflow = document
719            .workflow()
720            .context("document does not contain a workflow")?;
721        inputs.validate(document, workflow, None).with_context(|| {
722            format!(
723                "failed to validate the inputs to workflow `{workflow}`",
724                workflow = workflow.name()
725            )
726        })?;
727
728        let ast = match document.root().morph().ast() {
729            Ast::V1(ast) => ast,
730            _ => {
731                return Err(
732                    anyhow!("workflow evaluation is only supported for WDL 1.x documents").into(),
733                );
734            }
735        };
736
737        debug!(
738            workflow_id = id,
739            workflow_name = workflow.name(),
740            document = document.uri().as_str(),
741            "evaluating workflow",
742        );
743
744        // Find the workflow in the AST
745        let definition = ast
746            .workflows()
747            .next()
748            .expect("workflow should exist in the AST");
749
750        // Build an evaluation graph for the workflow
751        let mut diagnostics = Vec::new();
752
753        // We need to provide inputs to the workflow graph builder to avoid adding
754        // dependency edges from the default expressions if a value was provided
755        let graph = WorkflowGraphBuilder::default()
756            .build(&definition, &mut diagnostics, |name| inputs.contains(name));
757        assert!(
758            diagnostics.is_empty(),
759            "workflow evaluation graph should have no diagnostics"
760        );
761
762        // Split the root subgraph for every conditional and scatter statement
763        let mut subgraph = Subgraph::new(&graph);
764        let subgraphs = subgraph.split(&graph);
765
766        let max_concurrency = self
767            .config
768            .workflow
769            .scatter
770            .concurrency
771            .unwrap_or_else(|| self.backend.max_concurrency());
772
773        // Create the temp directory now as it may be needed for workflow evaluation
774        let temp_dir = root_dir.join("tmp");
775        fs::create_dir_all(&temp_dir).with_context(|| {
776            format!(
777                "failed to create directory `{path}`",
778                path = temp_dir.display()
779            )
780        })?;
781
782        // Write the inputs to the workflow's root directory
783        write_json_file(root_dir.join(INPUTS_FILE), &inputs)?;
784
785        let calls_dir = root_dir.join("calls");
786        fs::create_dir_all(&calls_dir).with_context(|| {
787            format!(
788                "failed to create directory `{path}`",
789                path = temp_dir.display()
790            )
791        })?;
792
793        let effective_output_dir = root_dir.to_path_buf();
794
795        let state = Arc::new(State {
796            config: self.config.clone(),
797            backend: self.backend.clone(),
798            token: self.token.clone(),
799            document: document.clone(),
800            inputs,
801            scopes: Default::default(),
802            graph,
803            subgraphs,
804            temp_dir,
805            calls_dir,
806            downloader: self.downloader.clone(),
807        });
808
809        // Evaluate the root graph to completion
810        Self::evaluate_subgraph(
811            state.clone(),
812            Scopes::ROOT_INDEX,
813            subgraph,
814            max_concurrency,
815            Arc::new(id.to_string()),
816            progress,
817        )
818        .await?;
819
820        if let Some(cleanup_fut) = self
821            .backend
822            .cleanup(&effective_output_dir, state.token.clone())
823        {
824            cleanup_fut.await;
825        }
826
827        let mut outputs: Outputs = state.scopes.write().await.take(Scopes::OUTPUT_INDEX).into();
828        if let Some(section) = definition.output() {
829            let indexes: HashMap<_, _> = section
830                .declarations()
831                .enumerate()
832                .map(|(i, d)| (d.name().hashable(), i))
833                .collect();
834            outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
835        }
836
837        // Write the outputs to the workflow's root directory
838        write_json_file(root_dir.join(OUTPUTS_FILE), &outputs)?;
839        Ok(outputs)
840    }
841
842    /// Evaluates a subgraph to completion.
843    ///
844    /// Note that this method is not `async` because it is indirectly recursive.
845    ///
846    /// The boxed future breaks the cycle that would otherwise exist when trying
847    /// to have the Rust compiler create an opaque type for the future returned
848    /// by an `async` method.
849    fn evaluate_subgraph<P, R>(
850        state: Arc<State>,
851        scope: ScopeIndex,
852        subgraph: Subgraph,
853        max_concurrency: u64,
854        id: Arc<String>,
855        progress: Arc<P>,
856    ) -> BoxFuture<'static, EvaluationResult<()>>
857    where
858        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
859        R: Future<Output = ()> + Send,
860    {
861        async move {
862            let token = state.token.clone();
863            let mut futures = JoinSet::new();
864            match Self::perform_subgraph_evaluation(
865                state,
866                scope,
867                subgraph,
868                max_concurrency,
869                id,
870                progress,
871                &mut futures,
872            )
873            .await
874            {
875                Ok(_) => {
876                    // There should be no more pending futures
877                    assert!(futures.is_empty());
878                    Ok(())
879                }
880                Err(e) => {
881                    // Cancel any outstanding futures and join them
882                    token.cancel();
883                    futures.join_all().await;
884                    Err(e)
885                }
886            }
887        }
888        .boxed()
889    }
890
891    /// Performs subgraph evaluation.
892    ///
893    /// This exists as a separate function from `evaluate_subgraph` so that we
894    /// can gracefully cancel outstanding futures on error.
895    async fn perform_subgraph_evaluation<P, R>(
896        state: Arc<State>,
897        scope: ScopeIndex,
898        mut subgraph: Subgraph,
899        max_concurrency: u64,
900        id: Arc<String>,
901        progress: Arc<P>,
902        futures: &mut JoinSet<EvaluationResult<NodeIndex>>,
903    ) -> EvaluationResult<()>
904    where
905        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
906        R: Future<Output = ()> + Send,
907    {
908        // The set of nodes being processed
909        let mut processing: Vec<NodeIndex> = Vec::new();
910        // The set of graph nodes being awaited on
911        let mut awaiting: HashSet<NodeIndex> = HashSet::new();
912
913        while !subgraph.0.is_empty() {
914            // Add nodes with indegree 0 that we aren't already waiting on
915            processing.extend(subgraph.0.iter().filter_map(|(node, indegree)| {
916                if *indegree == 0 && !awaiting.contains(node) {
917                    Some(*node)
918                } else {
919                    None
920                }
921            }));
922
923            // If no graph nodes can be processed, await on any futures
924            if processing.is_empty() {
925                let node: EvaluationResult<NodeIndex> = futures
926                    .join_next()
927                    .await
928                    .expect("should have a future to wait on")
929                    .expect("failed to join future");
930
931                let node = node?;
932                match &state.graph[node] {
933                    WorkflowGraphNode::Call(stmt) => {
934                        let call_name = stmt
935                            .alias()
936                            .map(|a| a.name())
937                            .unwrap_or_else(|| stmt.target().names().last().unwrap());
938                        debug!(
939                            workflow_id = id.as_str(),
940                            workflow_name = state.document.workflow().unwrap().name(),
941                            document = state.document.uri().as_str(),
942                            call_name = call_name.text(),
943                            "evaluation of call statement has completed",
944                        )
945                    }
946                    WorkflowGraphNode::Conditional(stmt, _) => debug!(
947                        workflow_id = id.as_str(),
948                        workflow_name = state.document.workflow().unwrap().name(),
949                        document = state.document.uri().as_str(),
950                        expr = {
951                            let e = stmt.expr();
952                            e.text().to_string()
953                        },
954                        "evaluation of conditional statement has completed",
955                    ),
956                    WorkflowGraphNode::Scatter(stmt, _) => {
957                        let variable = stmt.variable();
958                        debug!(
959                            workflow_id = id.as_str(),
960                            workflow_name = state.document.workflow().unwrap().name(),
961                            document = state.document.uri().as_str(),
962                            variable = variable.text(),
963                            "evaluation of scatter statement has completed",
964                        )
965                    }
966                    _ => unreachable!(),
967                }
968
969                awaiting.remove(&node);
970                subgraph.remove_node(&state.graph, node);
971
972                // Continue to see if we can progress further in the subgraph; if not we'll
973                // await more futures
974                continue;
975            }
976
977            // Process the node now or spawn a future
978            for node in processing.iter().copied() {
979                trace!(
980                    workflow_id = id.as_str(),
981                    workflow_name = state.document.workflow().unwrap().name(),
982                    document = state.document.uri().as_str(),
983                    "evaluating node `{n:?}` ({node:?})",
984                    n = state.graph[node]
985                );
986                match &state.graph[node] {
987                    WorkflowGraphNode::Input(decl) => Self::evaluate_input(&id, &state, decl)
988                        .await
989                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
990                    WorkflowGraphNode::Decl(decl) => Self::evaluate_decl(&id, &state, scope, decl)
991                        .await
992                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
993                    WorkflowGraphNode::Output(decl) => Self::evaluate_output(&id, &state, decl)
994                        .await
995                        .map_err(|d| EvaluationError::new(state.document.clone(), d))?,
996                    WorkflowGraphNode::Conditional(stmt, _) => {
997                        let id = id.clone();
998                        let state = state.clone();
999                        let progress = progress.clone();
1000                        let stmt = stmt.clone();
1001                        futures.spawn(async move {
1002                            Self::evaluate_conditional(
1003                                id,
1004                                state,
1005                                scope,
1006                                node,
1007                                &stmt,
1008                                max_concurrency,
1009                                progress,
1010                            )
1011                            .await?;
1012                            Ok(node)
1013                        });
1014                        awaiting.insert(node);
1015                    }
1016                    WorkflowGraphNode::Scatter(stmt, _) => {
1017                        let id = id.clone();
1018                        let state = state.clone();
1019                        let progress = progress.clone();
1020                        let stmt = stmt.clone();
1021                        futures.spawn(async move {
1022                            let token = state.token.clone();
1023                            let mut futures = JoinSet::new();
1024                            match Self::evaluate_scatter(
1025                                id,
1026                                state,
1027                                scope,
1028                                node,
1029                                &stmt,
1030                                max_concurrency,
1031                                progress,
1032                                &mut futures,
1033                            )
1034                            .await
1035                            {
1036                                Ok(_) => {
1037                                    // All futures should have completed
1038                                    assert!(futures.is_empty());
1039                                    Ok(node)
1040                                }
1041                                Err(e) => {
1042                                    // Cancel any outstanding futures and join them
1043                                    token.cancel();
1044                                    futures.join_all().await;
1045                                    Err(e)
1046                                }
1047                            }
1048                        });
1049                        awaiting.insert(node);
1050                    }
1051                    WorkflowGraphNode::Call(stmt) => {
1052                        let id = id.clone();
1053                        let state = state.clone();
1054                        let progress = progress.clone();
1055                        let stmt = stmt.clone();
1056                        futures.spawn(async move {
1057                            Self::evaluate_call(&id, state, scope, &stmt, progress).await?;
1058                            Ok(node)
1059                        });
1060                        awaiting.insert(node);
1061                    }
1062                    WorkflowGraphNode::ExitConditional(_) | WorkflowGraphNode::ExitScatter(_) => {
1063                        // Handled directly in `evaluate_conditional` and `evaluate_scatter`
1064                        continue;
1065                    }
1066                }
1067            }
1068
1069            // Remove nodes that have completed
1070            for node in processing.drain(..) {
1071                if awaiting.contains(&node) {
1072                    continue;
1073                }
1074
1075                subgraph.remove_node(&state.graph, node);
1076            }
1077        }
1078
1079        Ok(())
1080    }
1081
1082    /// Evaluates a workflow input.
1083    async fn evaluate_input(
1084        id: &str,
1085        state: &State,
1086        decl: &Decl<SyntaxNode>,
1087    ) -> Result<(), Diagnostic> {
1088        let name = decl.name();
1089        let expected_ty = crate::convert_ast_type_v1(&state.document, &decl.ty())?;
1090        let expr = decl.expr();
1091
1092        // Either use the specified input or evaluate the input's expression
1093        let (value, span) = match state.inputs.get(name.text()) {
1094            Some(input) => (input.clone(), name.span()),
1095            None => {
1096                if let Some(expr) = expr {
1097                    debug!(
1098                        workflow_id = id,
1099                        workflow_name = state.document.workflow().unwrap().name(),
1100                        document = state.document.uri().as_str(),
1101                        input_name = name.text(),
1102                        "evaluating input",
1103                    );
1104
1105                    (
1106                        Self::evaluate_expr(state, Scopes::ROOT_INDEX, &expr).await?,
1107                        expr.span(),
1108                    )
1109                } else {
1110                    assert!(expected_ty.is_optional(), "type should be optional");
1111                    (Value::None, name.span())
1112                }
1113            }
1114        };
1115
1116        // Coerce the value to the expected type
1117        let value = value
1118            .coerce(&expected_ty)
1119            .map_err(|e| runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), span))?;
1120
1121        // Write the value into the root scope
1122        state
1123            .scopes
1124            .write()
1125            .await
1126            .get_mut(Scopes::ROOT_INDEX)
1127            .insert(name.text(), value);
1128        Ok(())
1129    }
1130
1131    /// Evaluates a workflow private declaration.
1132    async fn evaluate_decl(
1133        id: &str,
1134        state: &State,
1135        scope: ScopeIndex,
1136        decl: &Decl<SyntaxNode>,
1137    ) -> Result<(), Diagnostic> {
1138        let name = decl.name();
1139        let expected_ty = crate::convert_ast_type_v1(&state.document, &decl.ty())?;
1140        let expr = decl.expr().expect("declaration should have expression");
1141
1142        debug!(
1143            workflow_id = id,
1144            workflow_name = state.document.workflow().unwrap().name(),
1145            document = state.document.uri().as_str(),
1146            decl_name = name.text(),
1147            "evaluating private declaration",
1148        );
1149
1150        // Evaluate the decl's expression
1151        let value = Self::evaluate_expr(state, scope, &expr).await?;
1152
1153        // Coerce the value to the expected type
1154        let value = value.coerce(&expected_ty).map_err(|e| {
1155            runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), expr.span())
1156        })?;
1157
1158        state
1159            .scopes
1160            .write()
1161            .await
1162            .get_mut(scope)
1163            .insert(name.text(), value);
1164        Ok(())
1165    }
1166
1167    /// Evaluates a workflow output.
1168    async fn evaluate_output(
1169        id: &str,
1170        state: &State,
1171        decl: &Decl<SyntaxNode>,
1172    ) -> Result<(), Diagnostic> {
1173        let name = decl.name();
1174        let expected_ty = crate::convert_ast_type_v1(&state.document, &decl.ty())?;
1175        let expr = decl.expr().expect("declaration should have expression");
1176
1177        debug!(
1178            workflow_id = id,
1179            workflow_name = state.document.workflow().unwrap().name(),
1180            document = state.document.uri().as_str(),
1181            output_name = name.text(),
1182            "evaluating output",
1183        );
1184
1185        // Evaluate the decl's expression
1186        let value = Self::evaluate_expr(state, Scopes::OUTPUT_INDEX, &expr).await?;
1187
1188        // Coerce the value to the expected type
1189        let mut value = value.coerce(&expected_ty).map_err(|e| {
1190            runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), expr.span())
1191        })?;
1192
1193        // Finally ensure output files exist
1194        value
1195            .visit_paths_mut(expected_ty.is_optional(), &mut |optional, value| {
1196                let path = match value {
1197                    PrimitiveValue::File(path) => path,
1198                    PrimitiveValue::Directory(path) => path,
1199                    _ => unreachable!("only file and directory values should be visited"),
1200                };
1201
1202                if !path::is_url(path) && Path::new(path.as_str()).is_relative() {
1203                    bail!("relative path `{path}` cannot be a workflow output");
1204                }
1205
1206                value.ensure_path_exists(optional)
1207            })
1208            .map_err(|e| {
1209                output_evaluation_failed(
1210                    e,
1211                    state
1212                        .document
1213                        .workflow()
1214                        .expect("should have workflow")
1215                        .name(),
1216                    false,
1217                    name.text(),
1218                    name.span(),
1219                )
1220            })?;
1221
1222        // Write the value into the output scope
1223        state
1224            .scopes
1225            .write()
1226            .await
1227            .get_mut(Scopes::OUTPUT_INDEX)
1228            .insert(name.text(), value);
1229        Ok(())
1230    }
1231
1232    /// Evaluates a workflow conditional statement.
1233    async fn evaluate_conditional<P, R>(
1234        id: Arc<String>,
1235        state: Arc<State>,
1236        parent: ScopeIndex,
1237        entry: NodeIndex,
1238        stmt: &ConditionalStatement<SyntaxNode>,
1239        max_concurrency: u64,
1240        progress: Arc<P>,
1241    ) -> EvaluationResult<()>
1242    where
1243        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
1244        R: Future<Output = ()> + Send,
1245    {
1246        let expr = stmt.expr();
1247
1248        debug!(
1249            workflow_id = id.as_str(),
1250            workflow_name = state.document.workflow().unwrap().name(),
1251            document = state.document.uri().as_str(),
1252            expr = expr.text().to_string(),
1253            "evaluating conditional statement",
1254        );
1255
1256        // Evaluate the conditional expression
1257        let value = Self::evaluate_expr(&state, parent, &expr)
1258            .await
1259            .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1260
1261        if value
1262            .coerce(&PrimitiveType::Boolean.into())
1263            .map_err(|e| {
1264                EvaluationError::new(
1265                    state.document.clone(),
1266                    if_conditional_mismatch(e, &value.ty(), expr.span()),
1267                )
1268            })?
1269            .unwrap_boolean()
1270        {
1271            debug!(
1272                workflow_id = id.as_str(),
1273                workflow_name = state.document.workflow().unwrap().name(),
1274                document = state.document.uri().as_str(),
1275                "conditional statement branch was taken and subgraph will be evaluated"
1276            );
1277
1278            // Intentionally drop the write lock before evaluating the subgraph
1279            let scope = { state.scopes.write().await.alloc(parent) };
1280
1281            // Evaluate the subgraph
1282            Self::evaluate_subgraph(
1283                state.clone(),
1284                scope,
1285                state.subgraphs[&entry].clone(),
1286                max_concurrency,
1287                id,
1288                progress.clone(),
1289            )
1290            .await?;
1291
1292            // Promote all values in the scope to the parent scope as optional
1293            let mut scopes = state.scopes.write().await;
1294            let (parent, child) = scopes.parent_mut(scope);
1295            for (name, value) in child.local() {
1296                parent.insert(name.to_string(), value.clone_as_optional());
1297            }
1298
1299            scopes.free(scope);
1300        } else {
1301            debug!(
1302                workflow_id = id.as_str(),
1303                workflow_name = state.document.workflow().unwrap().name(),
1304                document = state.document.uri().as_str(),
1305                "conditional statement branch was not taken and subgraph will be skipped"
1306            );
1307
1308            // Conditional evaluated to false; set the expected names to `None` in the
1309            // parent scope
1310            let mut scopes = state.scopes.write().await;
1311            let parent = scopes.get_mut(parent);
1312            let scope = state
1313                .document
1314                .find_scope_by_position(
1315                    stmt.braced_scope_span()
1316                        .expect("should have braced scope span")
1317                        .start(),
1318                )
1319                .expect("should have scope");
1320
1321            for (name, n) in scope.names() {
1322                if let Type::Call(ty) = n.ty() {
1323                    parent.insert(
1324                        name.to_string(),
1325                        CallValue::new_unchecked(
1326                            ty.promote(PromotionKind::Conditional),
1327                            Outputs::from_iter(
1328                                ty.outputs().iter().map(|(n, _)| (n.clone(), Value::None)),
1329                            )
1330                            .into(),
1331                        ),
1332                    );
1333                } else {
1334                    parent.insert(name.to_string(), Value::None);
1335                }
1336            }
1337        }
1338
1339        Ok(())
1340    }
1341
1342    /// Evaluates a workflow scatter statement.
1343    #[allow(clippy::too_many_arguments)]
1344    async fn evaluate_scatter<P, R>(
1345        id: Arc<String>,
1346        state: Arc<State>,
1347        parent: ScopeIndex,
1348        entry: NodeIndex,
1349        stmt: &ScatterStatement<SyntaxNode>,
1350        max_concurrency: u64,
1351        progress: Arc<P>,
1352        futures: &mut JoinSet<EvaluationResult<(usize, ScopeIndex)>>,
1353    ) -> EvaluationResult<()>
1354    where
1355        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
1356        R: Future<Output = ()> + Send,
1357    {
1358        /// Awaits the next future in the set of futures.
1359        async fn await_next(
1360            futures: &mut JoinSet<EvaluationResult<(usize, ScopeIndex)>>,
1361            scopes: &RwLock<Scopes>,
1362            gathers: &mut HashMap<String, Gather>,
1363            capacity: usize,
1364        ) -> EvaluationResult<()> {
1365            let (index, scope) = futures
1366                .join_next()
1367                .await
1368                .expect("should have a future to wait on")
1369                .expect("failed to join future")?;
1370
1371            // Append the result to the gather (the first two variables in scope are always
1372            // the scatter index and variable)
1373            let mut scopes = scopes.write().await;
1374            for (name, value) in scopes.get_mut(scope).local().skip(2) {
1375                match gathers.get_mut(name) {
1376                    Some(gather) => gather.set(index, value.clone())?,
1377                    None => {
1378                        let prev = gathers.insert(
1379                            name.to_string(),
1380                            Gather::new(capacity, index, value.clone()),
1381                        );
1382                        assert!(prev.is_none());
1383                    }
1384                }
1385            }
1386
1387            scopes.free(scope);
1388            Ok(())
1389        }
1390
1391        let variable = stmt.variable();
1392        let expr = stmt.expr();
1393
1394        debug!(
1395            workflow_id = id.as_str(),
1396            workflow_name = state.document.workflow().unwrap().name(),
1397            document = state.document.uri().as_str(),
1398            variable = variable.text(),
1399            "evaluating scatter statement",
1400        );
1401
1402        // Evaluate the scatter array expression
1403        let value = Self::evaluate_expr(&state, parent, &expr)
1404            .await
1405            .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1406
1407        let array = value
1408            .as_array()
1409            .ok_or_else(|| {
1410                EvaluationError::new(
1411                    state.document.clone(),
1412                    type_is_not_array(&value.ty(), expr.span()),
1413                )
1414            })?
1415            .as_slice();
1416
1417        let mut gathers: HashMap<_, Gather> = HashMap::new();
1418        for (i, value) in array.iter().enumerate() {
1419            if state.token.is_cancelled() {
1420                return Err(anyhow!("workflow evaluation has been cancelled").into());
1421            }
1422
1423            // Allocate a scope
1424            let scope = {
1425                let mut scopes = state.scopes.write().await;
1426                let index = scopes.alloc(parent);
1427                let scope = scopes.get_mut(index);
1428                scope.insert(
1429                    SCATTER_INDEX_VAR,
1430                    i64::try_from(i).map_err(|_| anyhow!("array index out of bounds"))?,
1431                );
1432                scope.insert(variable.text(), value.clone());
1433                index
1434            };
1435
1436            // Evaluate the subgraph
1437            {
1438                let state = state.clone();
1439                let subgraph = state.subgraphs[&entry].clone();
1440                let progress = progress.clone();
1441                let id = id.clone();
1442                futures.spawn(async move {
1443                    Self::evaluate_subgraph(
1444                        state.clone(),
1445                        scope,
1446                        subgraph,
1447                        max_concurrency,
1448                        id,
1449                        progress,
1450                    )
1451                    .await?;
1452
1453                    Ok((i, scope))
1454                });
1455            }
1456
1457            // If we've reached the concurrency limit, await one of the futures to complete
1458            if futures.len() as u64 >= max_concurrency {
1459                await_next(futures, &state.scopes, &mut gathers, array.len()).await?;
1460            }
1461        }
1462
1463        // Complete any outstanding futures
1464        while !futures.is_empty() {
1465            await_next(futures, &state.scopes, &mut gathers, array.len()).await?;
1466        }
1467
1468        let mut scopes = state.scopes.write().await;
1469        let scope = scopes.get_mut(parent);
1470        for (name, gather) in gathers {
1471            scope.insert(name, gather.into_value());
1472        }
1473
1474        Ok(())
1475    }
1476
1477    /// Evaluates a workflow call statement.
1478    async fn evaluate_call<P, R>(
1479        id: &str,
1480        state: Arc<State>,
1481        scope: ScopeIndex,
1482        stmt: &CallStatement<SyntaxNode>,
1483        progress: Arc<P>,
1484    ) -> EvaluationResult<()>
1485    where
1486        P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
1487        R: Future<Output = ()> + Send,
1488    {
1489        /// Abstracts evaluation for both task and workflow calls.
1490        enum Evaluator<'a> {
1491            /// Used to evaluate a task call.
1492            Task(&'a Task, TaskEvaluator),
1493            /// Used to evaluate a workflow call.
1494            Workflow(WorkflowEvaluator),
1495        }
1496
1497        impl Evaluator<'_> {
1498            /// Runs evaluation with the given inputs.
1499            ///
1500            /// Returns the passed in context and the result of the evaluation.
1501            async fn evaluate<P, R>(
1502                self,
1503                caller_id: &str,
1504                document: &Document,
1505                inputs: Inputs,
1506                root_dir: &Path,
1507                callee_id: &str,
1508                progress: &Arc<P>,
1509            ) -> EvaluationResult<Outputs>
1510            where
1511                P: Fn(ProgressKind<'_>) -> R + Send + Sync + 'static,
1512                R: Future<Output = ()> + Send,
1513            {
1514                match self {
1515                    Evaluator::Task(task, evaluator) => {
1516                        debug!(caller_id, callee_id, "evaluating call to task");
1517                        evaluator
1518                            .evaluate_with_progress(
1519                                document,
1520                                task,
1521                                &inputs.unwrap_task_inputs(),
1522                                root_dir,
1523                                callee_id,
1524                                progress.clone(),
1525                            )
1526                            .await?
1527                            .outputs
1528                    }
1529                    Evaluator::Workflow(evaluator) => {
1530                        debug!(caller_id, callee_id, "evaluating call to workflow");
1531                        evaluator
1532                            .evaluate_with_progress(
1533                                document,
1534                                inputs.unwrap_workflow_inputs(),
1535                                root_dir,
1536                                callee_id,
1537                                progress.clone(),
1538                            )
1539                            .await
1540                    }
1541                }
1542            }
1543        }
1544
1545        let alias = stmt.alias();
1546        let target = stmt.target();
1547        let mut names = target.names().peekable();
1548        let mut namespace = None;
1549        let mut target = None;
1550
1551        // Resolve the target and namespace for the call
1552        while let Some(name) = names.next() {
1553            if names.peek().is_none() {
1554                target = Some(name);
1555                break;
1556            }
1557
1558            if namespace.is_some() {
1559                return Err(EvaluationError::new(
1560                    state.document.clone(),
1561                    only_one_namespace(name.span()),
1562                ));
1563            }
1564
1565            let ns = state.document.namespace(name.text()).ok_or_else(|| {
1566                EvaluationError::new(state.document.clone(), unknown_namespace(&name))
1567            })?;
1568
1569            namespace = Some((name, ns));
1570        }
1571
1572        let target = target.expect("expected at least one name");
1573
1574        let alias = alias
1575            .as_ref()
1576            .map(|t| t.name())
1577            .unwrap_or_else(|| target.clone());
1578
1579        debug!(
1580            workflow_id = id,
1581            workflow_name = state.document.workflow().unwrap().name(),
1582            document = state.document.uri().as_str(),
1583            call_name = alias.text(),
1584            "evaluating call statement",
1585        );
1586
1587        // Check for a directly recursive workflow call
1588        if namespace.is_none()
1589            && target.text()
1590                == state
1591                    .document
1592                    .workflow()
1593                    .expect("should have workflow")
1594                    .name()
1595        {
1596            return Err(EvaluationError::new(
1597                state.document.clone(),
1598                recursive_workflow_call(target.text(), target.span()),
1599            ));
1600        }
1601
1602        // Determine the inputs and evaluator to use for the task or workflow call
1603        let inputs = state.inputs.calls().get(alias.text()).cloned();
1604        let document = namespace
1605            .as_ref()
1606            .map(|(_, ns)| ns.document())
1607            .unwrap_or(&state.document);
1608        let (mut inputs, evaluator) = match document.task_by_name(target.text()) {
1609            Some(task) => (
1610                inputs.unwrap_or_else(|| Inputs::Task(Default::default())),
1611                Evaluator::Task(
1612                    task,
1613                    TaskEvaluator::new_unchecked(
1614                        state.config.clone(),
1615                        state.backend.clone(),
1616                        state.token.clone(),
1617                        state.downloader.clone(),
1618                    ),
1619                ),
1620            ),
1621            _ => match document.workflow() {
1622                Some(workflow) if workflow.name() == target.text() => (
1623                    inputs.unwrap_or_else(|| Inputs::Workflow(Default::default())),
1624                    Evaluator::Workflow(WorkflowEvaluator {
1625                        config: state.config.clone(),
1626                        backend: state.backend.clone(),
1627                        token: state.token.clone(),
1628                        downloader: state.downloader.clone(),
1629                    }),
1630                ),
1631                _ => {
1632                    return Err(EvaluationError::new(
1633                        state.document.clone(),
1634                        unknown_task_or_workflow(
1635                            namespace.as_ref().map(|(_, ns)| ns.span()),
1636                            target.text(),
1637                            target.span(),
1638                        ),
1639                    ));
1640                }
1641            },
1642        };
1643
1644        // Evaluate the inputs
1645        let scatter_index = Self::evaluate_call_inputs(&state, stmt, scope, &mut inputs)
1646            .await
1647            .map_err(|d| EvaluationError::new(state.document.clone(), d))?;
1648
1649        let dir = format!(
1650            "{alias}{sep}{scatter_index}",
1651            alias = alias.text(),
1652            sep = if scatter_index.is_empty() { "" } else { "-" },
1653        );
1654
1655        let call_id = format_id(
1656            namespace.as_ref().map(|(n, _)| n.text()),
1657            target.text(),
1658            alias.text(),
1659            &scatter_index,
1660        );
1661
1662        // Finally, evaluate the task or workflow and return the outputs
1663        let outputs = evaluator
1664            .evaluate(
1665                id,
1666                document,
1667                inputs,
1668                &state.calls_dir.join(&dir),
1669                &call_id,
1670                &progress,
1671            )
1672            .await
1673            .map_err(|mut e| {
1674                if let EvaluationError::Source(e) = &mut e {
1675                    e.backtrace.push(CallLocation {
1676                        document: state.document.clone(),
1677                        span: stmt
1678                            .token::<CallKeyword<SyntaxToken>>()
1679                            .expect("should have call keyword")
1680                            .span(),
1681                    });
1682                }
1683
1684                e
1685            })?
1686            .with_name(alias.text());
1687
1688        let ty = state
1689            .document
1690            .workflow()
1691            .expect("should have workflow")
1692            .calls()
1693            .get(alias.text())
1694            .expect("should have call");
1695        state.scopes.write().await.get_mut(scope).insert(
1696            alias.text(),
1697            Value::Call(CallValue::new_unchecked(ty.clone(), Arc::new(outputs))),
1698        );
1699
1700        Ok(())
1701    }
1702
1703    /// Evaluates an expression.
1704    ///
1705    /// This takes a read lock on the scopes.
1706    async fn evaluate_expr(
1707        state: &State,
1708        scope: ScopeIndex,
1709        expr: &Expr<SyntaxNode>,
1710    ) -> Result<Value, Diagnostic> {
1711        let scopes = state.scopes.read().await;
1712        ExprEvaluator::new(WorkflowEvaluationContext::new(
1713            &state.document,
1714            scopes.reference(scope),
1715            &state.temp_dir,
1716            &state.downloader,
1717        ))
1718        .evaluate_expr(expr)
1719        .await
1720    }
1721
1722    /// Evaluates the call inputs of a call statement.
1723    ///
1724    /// Returns the scatter index for the provided scope.
1725    ///
1726    /// This takes a read lock on the scopes.
1727    async fn evaluate_call_inputs(
1728        state: &State,
1729        stmt: &CallStatement<SyntaxNode>,
1730        scope: ScopeIndex,
1731        inputs: &mut Inputs,
1732    ) -> Result<String, Diagnostic> {
1733        let scopes = state.scopes.read().await;
1734        for input in stmt.inputs() {
1735            let name = input.name();
1736            let value = match input.expr() {
1737                Some(expr) => {
1738                    let mut evaluator = ExprEvaluator::new(WorkflowEvaluationContext::new(
1739                        &state.document,
1740                        scopes.reference(scope),
1741                        &state.temp_dir,
1742                        &state.downloader,
1743                    ));
1744
1745                    evaluator.evaluate_expr(&expr).await?
1746                }
1747                None => scopes
1748                    .reference(scope)
1749                    .lookup(name.text())
1750                    .cloned()
1751                    .ok_or_else(|| unknown_name(name.text(), name.span()))?,
1752            };
1753
1754            let prev = inputs.set(input.name().text(), value);
1755            assert!(
1756                prev.is_none(),
1757                "attempted to override a specified call input"
1758            );
1759        }
1760
1761        Ok(scopes.scatter_index(scope))
1762    }
1763}
1764
1765#[cfg(test)]
1766mod test {
1767    use std::fs::read_to_string;
1768    use std::sync::atomic::AtomicUsize;
1769    use std::sync::atomic::Ordering;
1770
1771    use pretty_assertions::assert_eq;
1772    use tempfile::TempDir;
1773    use wdl_analysis::Analyzer;
1774    use wdl_analysis::DiagnosticsConfig;
1775
1776    use super::*;
1777    use crate::config::BackendConfig;
1778
1779    #[tokio::test]
1780    async fn it_writes_input_and_output_files() {
1781        let root_dir = TempDir::new().expect("failed to create temporary directory");
1782        fs::write(
1783            root_dir.path().join("source.wdl"),
1784            r#"
1785version 1.2
1786
1787task foo {
1788    input {
1789        String a
1790        Int b
1791        Array[String] c
1792    }
1793
1794    command <<<>>>
1795
1796    output {
1797        String x = a
1798        Int y = b
1799        Array[String] z = c
1800    }
1801}
1802
1803workflow test {
1804    input {
1805        String a
1806        Int b
1807        Array[String] c
1808    }
1809
1810    call foo {
1811        a = "foo",
1812        b = 10,
1813        c = ["foo", "bar", "baz"]
1814    }
1815
1816    call foo as bar {
1817        a = "bar",
1818        b = 1,
1819        c = []
1820    }
1821
1822    output {
1823        String x = a
1824        Int y = b
1825        Array[String] z = c
1826    }
1827}
1828"#,
1829        )
1830        .expect("failed to write WDL source file");
1831
1832        // Analyze the source file
1833        let analyzer = Analyzer::new(DiagnosticsConfig::except_all(), |(), _, _, _| async {});
1834        analyzer
1835            .add_directory(root_dir.path().to_path_buf())
1836            .await
1837            .expect("failed to add directory");
1838        let results = analyzer
1839            .analyze(())
1840            .await
1841            .expect("failed to analyze document");
1842        assert_eq!(results.len(), 1, "expected only one result");
1843
1844        let config = Config {
1845            backend: BackendConfig::Local(Default::default()),
1846            ..Default::default()
1847        };
1848        let evaluator = WorkflowEvaluator::new(config, CancellationToken::new())
1849            .await
1850            .unwrap();
1851
1852        // Evaluate the `test` workflow in `source.wdl` using the default local backend
1853        let mut inputs = WorkflowInputs::default();
1854        inputs.set("a", "qux".to_string());
1855        inputs.set("b", 1234);
1856        inputs.set(
1857            "c",
1858            Array::new(
1859                ArrayType::new(PrimitiveType::String),
1860                ["jam".to_string(), "cakes".to_string()],
1861            )
1862            .unwrap(),
1863        );
1864        let outputs_dir = root_dir.path().join("outputs");
1865        let outputs = evaluator
1866            .evaluate(
1867                results.first().expect("should have result").document(),
1868                inputs,
1869                &outputs_dir,
1870                |_| async {},
1871            )
1872            .await
1873            .expect("failed to evaluate workflow");
1874        assert_eq!(outputs.iter().count(), 3, "expected three outputs");
1875
1876        // Check the workflow inputs.json
1877        assert_eq!(
1878            read_to_string(outputs_dir.join("inputs.json"))
1879                .expect("failed to read workflow `inputs.json`"),
1880            "{\n  \"a\": \"qux\",\n  \"b\": 1234,\n  \"c\": [\n    \"jam\",\n    \"cakes\"\n  ]\n}"
1881        );
1882
1883        // Check the workflow outputs.json
1884        assert_eq!(
1885            read_to_string(outputs_dir.join("outputs.json"))
1886                .expect("failed to read workflow `outputs.json`"),
1887            "{\n  \"x\": \"qux\",\n  \"y\": 1234,\n  \"z\": [\n    \"jam\",\n    \"cakes\"\n  ]\n}"
1888        );
1889
1890        // Check the `foo` call inputs.json
1891        assert_eq!(
1892            read_to_string(outputs_dir.join("calls/foo/inputs.json"))
1893                .expect("failed to read foo `inputs.json`"),
1894            "{\n  \"a\": \"foo\",\n  \"b\": 10,\n  \"c\": [\n    \"foo\",\n    \"bar\",\n    \
1895             \"baz\"\n  ]\n}"
1896        );
1897
1898        // Check the `foo` call outputs.json
1899        assert_eq!(
1900            read_to_string(outputs_dir.join("calls/foo/outputs.json"))
1901                .expect("failed to read foo `outputs.json`"),
1902            "{\n  \"x\": \"foo\",\n  \"y\": 10,\n  \"z\": [\n    \"foo\",\n    \"bar\",\n    \
1903             \"baz\"\n  ]\n}"
1904        );
1905
1906        // Check the `bar` call inputs.json
1907        assert_eq!(
1908            read_to_string(outputs_dir.join("calls/bar/inputs.json"))
1909                .expect("failed to read bar `inputs.json`"),
1910            "{\n  \"a\": \"bar\",\n  \"b\": 1,\n  \"c\": []\n}"
1911        );
1912
1913        // Check the `bar` call outputs.json
1914        assert_eq!(
1915            read_to_string(outputs_dir.join("calls/bar/outputs.json"))
1916                .expect("failed to read bar `outputs.json`"),
1917            "{\n  \"x\": \"bar\",\n  \"y\": 1,\n  \"z\": []\n}"
1918        );
1919    }
1920
1921    #[tokio::test]
1922    async fn it_reports_progress() {
1923        // Create two test WDL files: one with a no-op workflow to be called and another
1924        // with a no-op task to be called
1925        let root_dir = TempDir::new().expect("failed to create temporary directory");
1926        fs::write(
1927            root_dir.path().join("other.wdl"),
1928            r#"
1929version 1.1
1930workflow w {}
1931"#,
1932        )
1933        .expect("failed to write WDL source file");
1934
1935        let source_path = root_dir.path().join("source.wdl");
1936        fs::write(
1937            &source_path,
1938            r#"
1939version 1.1
1940
1941import "other.wdl"
1942
1943task t {
1944  command <<<>>>
1945}
1946
1947workflow w {
1948  scatter (i in range(10)) {
1949    call t
1950  }
1951
1952  scatter (j in range(25)) {
1953    call other.w
1954  }
1955}
1956"#,
1957        )
1958        .expect("failed to write WDL source file");
1959
1960        // Analyze the source files
1961        let analyzer = Analyzer::new(DiagnosticsConfig::except_all(), |(), _, _, _| async {});
1962        analyzer
1963            .add_directory(root_dir.path().to_path_buf())
1964            .await
1965            .expect("failed to add directory");
1966        let results = analyzer
1967            .analyze(())
1968            .await
1969            .expect("failed to analyze document");
1970        assert_eq!(results.len(), 2, "expected only two results");
1971
1972        // Keep track of how many progress events we saw for evaluation
1973        #[derive(Default)]
1974        struct State {
1975            tasks_started: AtomicUsize,
1976            tasks_executions_started: AtomicUsize,
1977            tasks_executions_completed: AtomicUsize,
1978            tasks_completed: AtomicUsize,
1979            workflows_started: AtomicUsize,
1980            workflows_completed: AtomicUsize,
1981        }
1982
1983        // Use a progress callback that simply increments the appropriate counter
1984        let config = Config {
1985            backend: BackendConfig::Local(Default::default()),
1986            ..Default::default()
1987        };
1988        let state = Arc::<State>::default();
1989        let state_cloned = state.clone();
1990        let evaluator = WorkflowEvaluator::new(config, CancellationToken::new())
1991            .await
1992            .unwrap();
1993
1994        // Evaluate the `w` workflow in `source.wdl` using the default local
1995        // backend
1996        let outputs = evaluator
1997            .evaluate(
1998                results
1999                    .iter()
2000                    .find(|r| r.document().uri().as_str().ends_with("source.wdl"))
2001                    .expect("should have result")
2002                    .document(),
2003                WorkflowInputs::default(),
2004                root_dir.path(),
2005                move |kind| {
2006                    match kind {
2007                        ProgressKind::TaskStarted { id, .. } => {
2008                            assert!(id.starts_with("t-"));
2009                            state_cloned.tasks_started.fetch_add(1, Ordering::SeqCst);
2010                        }
2011                        ProgressKind::TaskRetried { .. } => panic!("task should not be retried"),
2012                        ProgressKind::TaskExecutionStarted { id, .. } => {
2013                            assert!(id.starts_with("t-"));
2014                            state_cloned
2015                                .tasks_executions_started
2016                                .fetch_add(1, Ordering::SeqCst);
2017                        }
2018                        ProgressKind::TaskExecutionCompleted { id, .. } => {
2019                            assert!(id.starts_with("t-"));
2020                            state_cloned
2021                                .tasks_executions_completed
2022                                .fetch_add(1, Ordering::SeqCst);
2023                        }
2024                        ProgressKind::TaskCompleted { id, .. } => {
2025                            assert!(id.starts_with("t-"));
2026                            state_cloned.tasks_completed.fetch_add(1, Ordering::SeqCst);
2027                        }
2028                        ProgressKind::WorkflowStarted { id, .. } => {
2029                            assert!(id == "w" || id.starts_with("other-w-"));
2030                            state_cloned
2031                                .workflows_started
2032                                .fetch_add(1, Ordering::SeqCst);
2033                        }
2034                        ProgressKind::WorkflowCompleted { id, .. } => {
2035                            assert!(id == "w" || id.starts_with("other-w-"));
2036                            state_cloned
2037                                .workflows_completed
2038                                .fetch_add(1, Ordering::SeqCst);
2039                        }
2040                    }
2041
2042                    async {}
2043                },
2044            )
2045            .await
2046            .expect("failed to evaluate workflow");
2047        assert_eq!(outputs.iter().count(), 0, "expected no outputs");
2048
2049        // Ensure the counters are what is expected based on the WDL
2050        assert_eq!(state.tasks_started.load(Ordering::SeqCst), 10);
2051        assert_eq!(state.tasks_executions_started.load(Ordering::SeqCst), 10);
2052        assert_eq!(state.tasks_executions_completed.load(Ordering::SeqCst), 10);
2053        assert_eq!(state.tasks_completed.load(Ordering::SeqCst), 10);
2054        assert_eq!(state.workflows_started.load(Ordering::SeqCst), 26);
2055        assert_eq!(state.workflows_completed.load(Ordering::SeqCst), 26);
2056    }
2057}