use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Write;
use std::fs;
use std::mem;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use futures::FutureExt;
use futures::future::BoxFuture;
use indexmap::IndexMap;
use petgraph::Direction;
use petgraph::graph::DiGraph;
use petgraph::graph::NodeIndex;
use petgraph::visit::Bfs;
use petgraph::visit::EdgeRef;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tracing::debug;
use tracing::trace;
use wdl_analysis::Document;
use wdl_analysis::diagnostics::Io;
use wdl_analysis::diagnostics::only_one_namespace;
use wdl_analysis::diagnostics::recursive_workflow_call;
use wdl_analysis::diagnostics::type_is_not_array;
use wdl_analysis::diagnostics::unknown_name;
use wdl_analysis::diagnostics::unknown_namespace;
use wdl_analysis::diagnostics::unknown_task_or_workflow;
use wdl_analysis::document::ScopeUnion;
use wdl_analysis::document::Task;
use wdl_analysis::eval::v1::WorkflowGraphBuilder;
use wdl_analysis::eval::v1::WorkflowGraphNode;
use wdl_analysis::types::ArrayType;
use wdl_analysis::types::CallType;
use wdl_analysis::types::Optional;
use wdl_analysis::types::PrimitiveType;
use wdl_analysis::types::Type;
use wdl_ast::Ast;
use wdl_ast::AstNode;
use wdl_ast::AstToken;
use wdl_ast::Diagnostic;
use wdl_ast::Span;
use wdl_ast::SupportedVersion;
use wdl_ast::v1::CallKeyword;
use wdl_ast::v1::CallStatement;
use wdl_ast::v1::ConditionalStatement;
use wdl_ast::v1::ConditionalStatementClauseKind;
use wdl_ast::v1::Decl;
use wdl_ast::v1::Expr;
use wdl_ast::v1::ScatterStatement;
use wdl_ast::version::V1;
use crate::Array;
use crate::CallLocation;
use crate::CallValue;
use crate::CancellationContextState;
use crate::Coercible;
use crate::EvaluationContext;
use crate::EvaluationError;
use crate::EvaluationPath;
use crate::EvaluationResult;
use crate::Inputs;
use crate::Outputs;
use crate::TypeNameRefValue;
use crate::Value;
use crate::WorkflowInputs;
use crate::diagnostics::decl_evaluation_failed;
use crate::diagnostics::if_conditional_mismatch;
use crate::diagnostics::runtime_type_mismatch;
use crate::diagnostics::unknown_enum;
use crate::eval::Scope;
use crate::eval::ScopeIndex;
use crate::eval::ScopeRef;
use crate::http::Transferer;
use crate::tree::SyntaxNode;
use crate::tree::SyntaxToken;
use crate::v1::Evaluator;
use crate::v1::ExprEvaluator;
use crate::v1::INPUTS_FILE;
use crate::v1::OUTPUTS_FILE;
use crate::v1::resolve_enum_variant_value;
use crate::v1::write_json_file;
fn format_id(namespace: Option<&str>, target: &str, alias: &str, scatter_index: &str) -> String {
if alias != target {
match namespace {
Some(ns) => {
format!(
"{ns}-{target}-{alias}{sep}{scatter_index}",
sep = if scatter_index.is_empty() { "" } else { "-" },
)
}
None => {
format!(
"{target}-{alias}{sep}{scatter_index}",
sep = if scatter_index.is_empty() { "" } else { "-" },
)
}
}
} else {
match namespace {
Some(ns) => {
format!(
"{ns}-{alias}{sep}{scatter_index}",
sep = if scatter_index.is_empty() { "" } else { "-" },
)
}
None => {
format!(
"{alias}{sep}{scatter_index}",
sep = if scatter_index.is_empty() { "" } else { "-" },
)
}
}
}
}
const SCATTER_INDEX_VAR: &str = "$idx";
struct WorkflowEvaluationContext<'a, 'b> {
state: &'a State,
scope: ScopeRef<'b>,
}
impl<'a, 'b> WorkflowEvaluationContext<'a, 'b> {
pub fn new(state: &'a State, scope: ScopeRef<'b>) -> Self {
Self { state, scope }
}
}
impl EvaluationContext for WorkflowEvaluationContext<'_, '_> {
fn version(&self) -> SupportedVersion {
self.state
.document
.version()
.expect("document should have a version")
}
fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
if let Some(var) = self.scope.lookup(name).cloned() {
return Ok(var);
}
if let Some(ty) = self.state.document.get_custom_type(name) {
return Ok(Value::TypeNameRef(TypeNameRefValue::new(ty)));
}
Err(unknown_name(name, span))
}
fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
crate::resolve_type_name(&self.state.document, name, span)
}
fn enum_variant_value(&self, enum_name: &str, variant_name: &str) -> Result<Value, Diagnostic> {
let cache_key = self
.state
.document
.get_variant_cache_key(enum_name, variant_name)
.ok_or_else(|| unknown_enum(enum_name))?;
let cache = self.state.evaluator.variant_cache.lock().unwrap();
if let Some(cached_value) = cache.get(&cache_key) {
return Ok(cached_value.clone());
}
drop(cache);
let r#enum = self
.state
.document
.enum_by_name(enum_name)
.ok_or(unknown_enum(enum_name))?;
let value = resolve_enum_variant_value(r#enum, variant_name)?;
let mut cache = self.state.evaluator.variant_cache.lock().unwrap();
cache.insert(cache_key, value.clone());
drop(cache);
Ok(value)
}
fn base_dir(&self) -> &EvaluationPath {
&self.state.base_dir
}
fn temp_dir(&self) -> &Path {
&self.state.temp_dir
}
fn transferer(&self) -> &dyn Transferer {
self.state.transferer()
}
}
#[derive(Debug)]
struct Scopes {
all: Vec<Scope>,
free: Vec<ScopeIndex>,
}
impl Scopes {
const OUTPUT_INDEX: ScopeIndex = ScopeIndex::new(1);
const ROOT_INDEX: ScopeIndex = ScopeIndex::new(0);
fn alloc(&mut self, parent: ScopeIndex) -> ScopeIndex {
if let Some(index) = self.free.pop() {
self.all[index.0].set_parent(parent);
return index;
}
let index = self.all.len();
self.all.push(Scope::new(parent));
index.into()
}
fn reference(&self, index: ScopeIndex) -> ScopeRef<'_> {
ScopeRef::new(&self.all, index)
}
fn take(&mut self, index: ScopeIndex) -> Scope {
mem::take(&mut self.all[index.0])
}
fn get_mut(&mut self, index: ScopeIndex) -> &mut Scope {
&mut self.all[index.0]
}
fn parent_mut(&mut self, index: ScopeIndex) -> (&mut Scope, &Scope) {
let parent = self.all[index.0].parent.expect("should have parent");
if index.0 < parent.0 {
let (left, right) = self.all.split_at_mut(index.0 + 1);
(&mut right[parent.0 - index.0 - 1], &left[index.0])
} else {
let (left, right) = self.all.split_at_mut(parent.0 + 1);
(&mut left[parent.0], &right[index.0 - parent.0 - 1])
}
}
fn scatter_index(&self, scope: ScopeIndex) -> String {
let mut scope = ScopeRef::new(&self.all, scope);
let mut s = String::new();
loop {
if let Some(value) = scope.local(SCATTER_INDEX_VAR) {
if !s.is_empty() {
s.push('-');
}
write!(
&mut s,
"{i}",
i = value.as_integer().expect("index should be an integer")
)
.expect("failed to write to string");
}
match scope.parent() {
Some(parent) => scope = parent,
None => break,
}
}
s
}
fn free(&mut self, index: ScopeIndex) {
let scope = &mut self.all[index.0];
scope.clear();
self.free.push(index);
}
}
impl Default for Scopes {
fn default() -> Self {
Self {
all: vec![Scope::default(), Scope::new(Self::ROOT_INDEX)],
free: Default::default(),
}
}
}
struct GatherArray {
element_ty: Type,
elements: Vec<Value>,
}
impl GatherArray {
fn new(index: usize, value: Value, capacity: usize) -> Self {
let element_ty = value.ty();
let mut elements = vec![Value::new_none(element_ty.optional()); capacity];
elements[index] = value;
Self {
element_ty,
elements,
}
}
fn into_array(self) -> Array {
Array::new_unchecked(ArrayType::new(self.element_ty), self.elements)
}
}
enum Gather {
Array(GatherArray),
Call {
call_ty: CallType,
outputs: IndexMap<String, GatherArray>,
},
}
impl Gather {
fn new(capacity: usize, index: usize, value: Value) -> Self {
if let Value::Call(call) = value {
return Self::Call {
call_ty: call.ty().promote_scatter(),
outputs: call
.outputs()
.iter()
.map(|(n, v)| (n.to_string(), GatherArray::new(index, v.clone(), capacity)))
.collect(),
};
}
Self::Array(GatherArray::new(index, value, capacity))
}
fn set(&mut self, index: usize, value: Value) -> EvaluationResult<()> {
match self {
Self::Array(array) => {
assert!(value.as_call().is_none(), "value should not be a call");
if let Some(ty) = array.element_ty.common_type(&value.ty()) {
array.element_ty = ty;
}
array.elements[index] = value;
}
Self::Call { outputs, .. } => {
for (k, v) in value.unwrap_call().outputs().iter() {
let output = outputs
.get_mut(k)
.expect("expected call output to be present");
if let Some(ty) = output.element_ty.common_type(&v.ty()) {
output.element_ty = ty;
}
output.elements[index] = v.clone();
}
}
}
Ok(())
}
fn into_value(self) -> Value {
match self {
Self::Array(array) => array.into_array().into(),
Self::Call { call_ty, outputs } => CallValue::new_unchecked(
call_ty,
Outputs::from_iter(outputs.into_iter().map(|(n, v)| (n, v.into_array().into())))
.into(),
)
.into(),
}
}
}
#[derive(Debug, Clone, Default)]
struct Subgraph(HashMap<NodeIndex, usize>);
impl Subgraph {
fn new(graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>) -> Self {
let mut nodes = HashMap::with_capacity(graph.node_count());
for index in graph.node_indices() {
nodes.insert(
index,
graph.edges_directed(index, Direction::Incoming).count(),
);
}
Self(nodes)
}
fn split(
&mut self,
graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
) -> HashMap<NodeIndex, Subgraph> {
fn split(
graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
parent: &mut HashMap<NodeIndex, usize>,
entry: NodeIndex,
exit: NodeIndex,
) -> HashMap<NodeIndex, usize> {
let mut nodes = HashMap::new();
let mut bfs = Bfs::new(graph, entry);
while let Some(node) = {
if bfs.stack.front() == Some(&exit) {
bfs.stack.pop_front();
}
bfs.next(graph)
} {
if node == entry || node == exit {
continue;
}
let prev = nodes.insert(
node,
parent.remove(&node).expect("node should exist in parent"),
);
assert!(prev.is_none());
}
for edge in graph.edges_directed(entry, Direction::Outgoing) {
if edge.target() != exit {
*nodes
.get_mut(&edge.target())
.expect("should be in subgraph") -= 1;
}
}
*parent.get_mut(&exit).expect("should have exit node") = 1;
nodes
}
fn split_recurse(
graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
nodes: &mut HashMap<NodeIndex, usize>,
subgraphs: &mut HashMap<NodeIndex, Subgraph>,
) {
for index in graph.node_indices() {
if !nodes.contains_key(&index) {
continue;
}
match &graph[index] {
WorkflowGraphNode::ConditionalClause(_, exit) => {
nodes.remove(&index);
let mut clause_nodes = split(graph, nodes, index, *exit);
split_recurse(graph, &mut clause_nodes, subgraphs);
subgraphs.insert(index, Subgraph(clause_nodes));
}
WorkflowGraphNode::Scatter(_, exit) => {
let mut nodes = split(graph, nodes, index, *exit);
split_recurse(graph, &mut nodes, subgraphs);
subgraphs.insert(index, Subgraph(nodes));
}
_ => {}
}
}
}
let mut subgraphs = HashMap::new();
split_recurse(graph, &mut self.0, &mut subgraphs);
subgraphs
}
fn remove_node(&mut self, graph: &DiGraph<WorkflowGraphNode<SyntaxNode>, ()>, node: NodeIndex) {
let indegree = self.0.remove(&node);
assert_eq!(
indegree,
Some(0),
"removed a node with an indegree greater than 0"
);
for edge in graph.edges_directed(node, Direction::Outgoing) {
if let Some(indegree) = self.0.get_mut(&edge.target()) {
*indegree -= 1;
}
}
}
}
struct State {
evaluator: Evaluator,
document: Document,
inputs: WorkflowInputs,
scopes: RwLock<Scopes>,
graph: DiGraph<WorkflowGraphNode<SyntaxNode>, ()>,
subgraphs: HashMap<NodeIndex, Subgraph>,
base_dir: EvaluationPath,
temp_dir: PathBuf,
calls_dir: PathBuf,
}
impl State {
fn transferer(&self) -> &dyn Transferer {
self.evaluator.transferer.as_ref()
}
}
impl Evaluator {
pub async fn evaluate_workflow(
&self,
document: &Document,
inputs: WorkflowInputs,
eval_root_dir: impl AsRef<Path>,
) -> EvaluationResult<Outputs> {
let workflow = document
.workflow()
.context("document does not contain a workflow")?;
if document.has_errors() {
return Err(anyhow!("cannot evaluate a document with errors").into());
}
let result = self
.perform_workflow_evaluation(document, inputs, eval_root_dir.as_ref(), workflow.name())
.await;
if self.cancellation.user_canceled()
&& self.cancellation.state() == CancellationContextState::Canceling
{
return Err(EvaluationError::Canceled);
}
result
}
async fn perform_workflow_evaluation(
&self,
document: &Document,
inputs: WorkflowInputs,
eval_root_dir: &Path,
id: &str,
) -> EvaluationResult<Outputs> {
let workflow = document
.workflow()
.context("document does not contain a workflow")?;
inputs.validate(document, workflow, None).with_context(|| {
format!(
"failed to validate the inputs to workflow `{workflow}`",
workflow = workflow.name()
)
})?;
let ast = match document
.root()
.morph()
.ast_with_version_fallback(document.config().fallback_version())
{
Ast::V1(ast) => ast,
_ => {
return Err(
anyhow!("workflow evaluation is only supported for WDL 1.x documents").into(),
);
}
};
debug!(
workflow_id = id,
workflow_name = workflow.name(),
document = document.uri().as_str(),
"evaluating workflow",
);
let definition = ast
.workflows()
.next()
.expect("workflow should exist in the AST");
let mut diagnostics = Vec::new();
let graph = WorkflowGraphBuilder::default().build(
&definition,
&mut diagnostics,
|name| inputs.contains(name),
|name| document.struct_by_name(name).is_some() || document.enum_by_name(name).is_some(),
);
assert!(
diagnostics.is_empty(),
"workflow evaluation graph should have no diagnostics"
);
let mut subgraph = Subgraph::new(&graph);
let subgraphs = subgraph.split(&graph);
let temp_dir = eval_root_dir.join("tmp");
fs::create_dir_all(&temp_dir).with_context(|| {
format!(
"failed to create directory `{path}`",
path = temp_dir.display()
)
})?;
write_json_file(eval_root_dir.join(INPUTS_FILE), &inputs)?;
let calls_dir = eval_root_dir.join("calls");
fs::create_dir_all(&calls_dir).with_context(|| {
format!(
"failed to create directory `{path}`",
path = temp_dir.display()
)
})?;
let document_path = document.uri();
let base_dir = EvaluationPath::parent_of(document_path.as_str()).with_context(|| {
format!(
"document `{path}` does not have a parent directory",
path = document.path()
)
})?;
let state = Arc::new(State {
evaluator: self.clone(),
document: document.clone(),
inputs,
scopes: Default::default(),
graph,
subgraphs,
base_dir,
temp_dir,
calls_dir,
});
state
.clone()
.evaluate_subgraph(Scopes::ROOT_INDEX, subgraph, Arc::new(id.to_string()))
.await?;
let mut outputs: Outputs = state.scopes.write().await.take(Scopes::OUTPUT_INDEX).into();
if let Some(section) = definition.output() {
let indexes: HashMap<_, _> = section
.declarations()
.enumerate()
.map(|(i, d)| (d.name().hashable(), i))
.collect();
outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
}
write_json_file(eval_root_dir.join(OUTPUTS_FILE), &outputs)?;
Ok(outputs)
}
}
impl State {
fn evaluate_subgraph(
self: Arc<State>,
scope: ScopeIndex,
subgraph: Subgraph,
id: Arc<String>,
) -> BoxFuture<'static, EvaluationResult<()>> {
async move {
let cancellation = self.evaluator.cancellation.clone();
let mut futures = JoinSet::new();
match self
.perform_subgraph_evaluation(scope, subgraph, id, &mut futures)
.await
{
Ok(_) => {
assert!(futures.is_empty());
Ok(())
}
Err(e) => {
cancellation.error(&e);
futures.join_all().await;
Err(e)
}
}
}
.boxed()
}
async fn perform_subgraph_evaluation(
self: Arc<State>,
scope: ScopeIndex,
mut subgraph: Subgraph,
id: Arc<String>,
futures: &mut JoinSet<EvaluationResult<NodeIndex>>,
) -> EvaluationResult<()> {
let mut processing: Vec<NodeIndex> = Vec::new();
let mut awaiting: HashSet<NodeIndex> = HashSet::new();
while !subgraph.0.is_empty() {
processing.extend(subgraph.0.iter().filter_map(|(node, indegree)| {
if *indegree == 0 && !awaiting.contains(node) {
Some(*node)
} else {
None
}
}));
if processing.is_empty() {
let node: EvaluationResult<NodeIndex> = futures
.join_next()
.await
.expect("should have a future to wait on")
.expect("failed to join future");
let node = node?;
match &self.graph[node] {
WorkflowGraphNode::Call(stmt) => {
let call_name = stmt
.alias()
.map(|a| a.name())
.unwrap_or_else(|| stmt.target().names().last().unwrap());
debug!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
call_name = call_name.text(),
"evaluation of call statement has completed",
)
}
WorkflowGraphNode::ConditionalClause(clause, _) => debug!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
clause_kind = format!("{}", clause.kind()),
expr = clause.expr().as_ref().map(|e| e.text().to_string()),
"evaluation of conditional clause has completed"
),
WorkflowGraphNode::Conditional(..) => debug!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
"evaluation of conditional statement has completed",
),
WorkflowGraphNode::Scatter(stmt, _) => {
let variable = stmt.variable();
debug!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
variable = variable.text(),
"evaluation of scatter statement has completed",
)
}
_ => unreachable!(),
}
awaiting.remove(&node);
subgraph.remove_node(&self.graph, node);
continue;
}
for node in processing.iter().copied() {
trace!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
"evaluating node `{n:?}` ({node:?})",
n = self.graph[node]
);
match &self.graph[node] {
WorkflowGraphNode::Input(decl) => self
.evaluate_input(&id, decl)
.await
.map_err(|d| EvaluationError::new(self.document.clone(), d))?,
WorkflowGraphNode::Decl(decl) => self
.evaluate_decl(&id, scope, decl)
.await
.map_err(|d| EvaluationError::new(self.document.clone(), d))?,
WorkflowGraphNode::Output(decl) => self
.evaluate_output(&id, decl)
.await
.map_err(|d| EvaluationError::new(self.document.clone(), d))?,
WorkflowGraphNode::Conditional(stmt, _) => {
let id = id.clone();
let state = self.clone();
let stmt = stmt.clone();
futures.spawn(async move {
state.evaluate_conditional(id, scope, node, &stmt).await?;
Ok(node)
});
awaiting.insert(node);
}
WorkflowGraphNode::Scatter(stmt, _) => {
let id = id.clone();
let state = self.clone();
let stmt = stmt.clone();
futures.spawn(async move {
let cancellation = state.evaluator.cancellation.clone();
let mut futures = JoinSet::new();
match state
.evaluate_scatter(id, scope, node, &stmt, &mut futures)
.await
{
Ok(_) => {
assert!(futures.is_empty());
Ok(node)
}
Err(e) => {
cancellation.error(&e);
futures.join_all().await;
Err(e)
}
}
});
awaiting.insert(node);
}
WorkflowGraphNode::Call(stmt) => {
let id = id.clone();
let state = self.clone();
let stmt = stmt.clone();
futures.spawn(async move {
state.evaluate_call(&id, scope, &stmt).await?;
Ok(node)
});
awaiting.insert(node);
}
WorkflowGraphNode::ConditionalClause(..)
| WorkflowGraphNode::ExitConditional(_)
| WorkflowGraphNode::ExitScatter(_) => {
continue;
}
}
}
for node in processing.drain(..) {
if awaiting.contains(&node) {
continue;
}
subgraph.remove_node(&self.graph, node);
}
}
Ok(())
}
async fn evaluate_input(&self, id: &str, decl: &Decl<SyntaxNode>) -> Result<(), Diagnostic> {
let name = decl.name();
let expected_ty = crate::convert_ast_type_v1(&self.document, &decl.ty())?;
let expr = decl.expr();
let (value, span) = match self.inputs.get(name.text()) {
Some(input) => {
if input.is_none()
&& !expected_ty.is_optional()
&& let Some(expr) = decl.expr()
{
debug!(
workflow_id = id,
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
input_name = name.text(),
"evaluating input default expression",
);
(
self.evaluate_expr(Scopes::ROOT_INDEX, &expr).await?,
expr.span(),
)
} else {
(input.clone(), name.span())
}
}
None => {
if let Some(expr) = expr {
debug!(
workflow_id = id,
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
input_name = name.text(),
"evaluating input default expression",
);
(
self.evaluate_expr(Scopes::ROOT_INDEX, &expr).await?,
expr.span(),
)
} else {
assert!(expected_ty.is_optional(), "type should be optional");
(Value::new_none(expected_ty.clone()), name.span())
}
}
};
let scopes = self.scopes.read().await;
let context = WorkflowEvaluationContext::new(self, scopes.reference(Scopes::ROOT_INDEX));
let mut value = value
.coerce(Some(&context), &expected_ty)
.map_err(|e| runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), span))?;
drop(scopes);
if self
.document
.version()
.expect("document should have a version")
>= SupportedVersion::V1(V1::Two)
{
value = value
.resolve_paths(
expected_ty.is_optional(),
self.base_dir.as_local(),
Some(self.transferer()),
&|path| Ok(path.clone()),
)
.await
.map_err(|e| {
decl_evaluation_failed(
e,
self.document
.workflow()
.expect("should have workflow")
.name(),
false,
name.text(),
Some(Io::Input),
name.span(),
)
})?;
}
self.scopes
.write()
.await
.get_mut(Scopes::ROOT_INDEX)
.insert(name.text(), value);
Ok(())
}
async fn evaluate_decl(
&self,
id: &str,
scope: ScopeIndex,
decl: &Decl<SyntaxNode>,
) -> Result<(), Diagnostic> {
let name = decl.name();
let expected_ty = crate::convert_ast_type_v1(&self.document, &decl.ty())?;
let expr = decl.expr().expect("declaration should have expression");
debug!(
workflow_id = id,
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
decl_name = name.text(),
"evaluating private declaration",
);
let value = self.evaluate_expr(scope, &expr).await?;
let scopes = self.scopes.read().await;
let context = WorkflowEvaluationContext::new(self, scopes.reference(scope));
let mut value = value.coerce(Some(&context), &expected_ty).map_err(|e| {
runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), expr.span())
})?;
drop(scopes);
if self
.document
.version()
.expect("document should have a version")
>= SupportedVersion::V1(V1::Two)
{
value = value
.resolve_paths(
expected_ty.is_optional(),
self.base_dir.as_local(),
Some(self.transferer()),
&|path| Ok(path.clone()),
)
.await
.map_err(|e| {
decl_evaluation_failed(
e,
self.document
.workflow()
.expect("should have workflow")
.name(),
false,
name.text(),
None,
name.span(),
)
})?;
}
self.scopes
.write()
.await
.get_mut(scope)
.insert(name.text(), value);
Ok(())
}
async fn evaluate_output(&self, id: &str, decl: &Decl<SyntaxNode>) -> Result<(), Diagnostic> {
let name = decl.name();
let expected_ty = crate::convert_ast_type_v1(&self.document, &decl.ty())?;
let expr = decl.expr().expect("declaration should have expression");
debug!(
workflow_id = id,
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
output_name = name.text(),
"evaluating output",
);
let value = self.evaluate_expr(Scopes::OUTPUT_INDEX, &expr).await?;
let scopes = self.scopes.read().await;
let context = WorkflowEvaluationContext::new(self, scopes.reference(Scopes::OUTPUT_INDEX));
let mut value = value.coerce(Some(&context), &expected_ty).map_err(|e| {
runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), expr.span())
})?;
drop(scopes);
value = value
.resolve_paths(
expected_ty.is_optional(),
self.base_dir.as_local(),
Some(self.transferer()),
&|path| {
if path.is_relative() {
bail!("relative path `{path}` cannot be used as a workflow output");
}
Ok(path.clone())
},
)
.await
.map_err(|e| {
decl_evaluation_failed(
e,
self.document
.workflow()
.expect("should have workflow")
.name(),
false,
name.text(),
Some(Io::Output),
name.span(),
)
})?;
self.scopes
.write()
.await
.get_mut(Scopes::OUTPUT_INDEX)
.insert(name.text(), value);
Ok(())
}
async fn evaluate_conditional(
self: Arc<State>,
id: Arc<String>,
parent: ScopeIndex,
entry: NodeIndex,
stmt: &ConditionalStatement<SyntaxNode>,
) -> EvaluationResult<()> {
let mut scope_union = ScopeUnion::new();
for clause in stmt.clauses() {
if let Some(braced_scope_span) = clause.braced_scope_span() {
let clause_scope = self
.document
.find_scope_by_position(braced_scope_span.start())
.expect("should have scope");
scope_union.insert(
clause_scope,
matches!(clause.kind(), ConditionalStatementClauseKind::Else),
);
}
}
let all_names = scope_union
.resolve()
.expect("scope union should resolve without errors");
for clause in stmt.clauses() {
if let Some(expr) = clause.expr() {
debug!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
expr = expr.text().to_string(),
"evaluating conditional statement expression",
);
let value = self
.evaluate_expr(parent, &expr)
.await
.map_err(|d| EvaluationError::new(self.document.clone(), d))?;
if !value
.coerce(None, &PrimitiveType::Boolean.into())
.map_err(|e| {
EvaluationError::new(
self.document.clone(),
if_conditional_mismatch(e, &value.ty(), expr.span()),
)
})?
.unwrap_boolean()
{
debug!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
"conditional statement branch was not taken and subgraph will be skipped"
);
continue;
}
debug!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
expr = expr.text().to_string(),
"conditional statement branch was taken and subgraph will be evaluated"
);
} else {
debug!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
"else branch was taken and subgraph will be evaluated"
);
}
let clause_node = self
.graph
.edges_directed(entry, Direction::Outgoing)
.find_map(|edge| {
let target = edge.target();
match &self.graph[target] {
WorkflowGraphNode::ConditionalClause(c, _)
if c.inner() == clause.inner() =>
{
Some(target)
}
_ => None,
}
})
.expect("clause node should exist in graph");
let scope = { self.scopes.write().await.alloc(parent) };
self.clone()
.evaluate_subgraph(scope, self.subgraphs[&clause_node].clone(), id)
.await?;
let mut scopes = self.scopes.write().await;
let (parent, child) = scopes.parent_mut(scope);
for (name, name_info) in all_names {
let value = child
.local()
.find(|(n, _)| *n == name)
.map(|(_, v)| v.clone());
if let Some(value) = value {
parent.insert(name, value);
} else if let Type::Call(call_ty) = &name_info.ty() {
parent.insert(
name,
CallValue::new_unchecked(
call_ty.clone(),
Outputs::from_iter(
call_ty
.outputs()
.iter()
.map(|(n, o)| (n.clone(), Value::new_none(o.ty().clone()))),
)
.into(),
),
);
} else {
parent.insert(name, Value::new_none(name_info.ty().clone()));
}
}
scopes.free(scope);
return Ok(());
}
debug!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
"no conditional statement branch was taken"
);
let mut scopes = self.scopes.write().await;
let parent = scopes.get_mut(parent);
for (name, name_info) in all_names {
if let Type::Call(call_ty) = name_info.ty() {
parent.insert(
name,
CallValue::new_unchecked(
call_ty.clone(),
Outputs::from_iter(
call_ty
.outputs()
.iter()
.map(|(n, o)| (n.clone(), Value::new_none(o.ty().clone()))),
)
.into(),
),
);
} else {
parent.insert(name, Value::new_none(name_info.ty().clone()));
}
}
Ok(())
}
async fn evaluate_scatter(
self: Arc<State>,
id: Arc<String>,
parent: ScopeIndex,
entry: NodeIndex,
stmt: &ScatterStatement<SyntaxNode>,
futures: &mut JoinSet<EvaluationResult<(usize, ScopeIndex)>>,
) -> EvaluationResult<()> {
async fn await_next(
futures: &mut JoinSet<EvaluationResult<(usize, ScopeIndex)>>,
scopes: &RwLock<Scopes>,
gathers: &mut HashMap<String, Gather>,
capacity: usize,
) -> EvaluationResult<()> {
let (index, scope) = futures
.join_next()
.await
.expect("should have a future to wait on")
.expect("failed to join future")?;
let mut scopes = scopes.write().await;
for (name, value) in scopes.get_mut(scope).local().skip(2) {
match gathers.get_mut(name) {
Some(gather) => gather.set(index, value.clone())?,
None => {
let prev = gathers.insert(
name.to_string(),
Gather::new(capacity, index, value.clone()),
);
assert!(prev.is_none());
}
}
}
scopes.free(scope);
Ok(())
}
let variable = stmt.variable();
let expr = stmt.expr();
debug!(
workflow_id = id.as_str(),
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
variable = variable.text(),
"evaluating scatter statement",
);
let value = self
.evaluate_expr(parent, &expr)
.await
.map_err(|d| EvaluationError::new(self.document.clone(), d))?;
let array = value
.as_array()
.ok_or_else(|| {
EvaluationError::new(
self.document.clone(),
type_is_not_array(&value.ty(), expr.span()),
)
})?
.as_slice();
if array.is_empty() {
return self.evaluate_empty_scatter(stmt, parent).await;
}
let max_concurrency = self.evaluator.config.workflow.scatter.concurrency;
let mut gathers: HashMap<_, Gather> = HashMap::new();
for (i, value) in array.iter().enumerate() {
if self.evaluator.cancellation.state() != CancellationContextState::NotCanceled {
break;
}
let scope = {
let mut scopes = self.scopes.write().await;
let index = scopes.alloc(parent);
let scope = scopes.get_mut(index);
scope.insert(
SCATTER_INDEX_VAR,
i64::try_from(i).map_err(|_| anyhow!("array index out of bounds"))?,
);
scope.insert(variable.text(), value.clone());
index
};
{
let state = self.clone();
let subgraph = self.subgraphs[&entry].clone();
let id = id.clone();
futures.spawn(async move {
state.evaluate_subgraph(scope, subgraph, id).await?;
Ok((i, scope))
});
}
if futures.len() as u64 >= max_concurrency {
await_next(futures, &self.scopes, &mut gathers, array.len()).await?;
}
}
while !futures.is_empty() {
await_next(futures, &self.scopes, &mut gathers, array.len()).await?;
}
if self.evaluator.cancellation.state() != CancellationContextState::NotCanceled {
return Err(EvaluationError::Canceled);
}
let mut scopes = self.scopes.write().await;
let scope = scopes.get_mut(parent);
for (name, gather) in gathers {
scope.insert(name, gather.into_value());
}
Ok(())
}
async fn evaluate_empty_scatter(
&self,
stmt: &ScatterStatement<SyntaxNode>,
parent: ScopeIndex,
) -> EvaluationResult<()> {
let stmt_scope = self
.document
.find_scope_by_position(
stmt.braced_scope_span()
.expect("should have braces")
.start(),
)
.expect("should have scope for scatter statement");
let mut scopes = self.scopes.write().await;
let scope = scopes.get_mut(parent);
for (name, local) in stmt_scope.names().skip(1) {
let value: Value = if let Type::Call(call_ty) = local.ty() {
CallValue::new_unchecked(
call_ty.clone(),
Outputs::from_iter(call_ty.outputs().iter().map(|(n, o)| {
(
n.clone(),
Array::new_unchecked(ArrayType::new(o.ty().clone()), Vec::new()).into(),
)
}))
.into(),
)
.into()
} else {
Array::new_unchecked(ArrayType::new(local.ty().clone()), Vec::new()).into()
};
scope.insert(name.to_string(), value);
}
Ok(())
}
async fn evaluate_call(
self: Arc<State>,
id: &str,
scope: ScopeIndex,
stmt: &CallStatement<SyntaxNode>,
) -> EvaluationResult<()> {
#[allow(clippy::missing_docs_in_private_items)]
enum Target<'a> {
Task(&'a Task),
Workflow,
}
impl Target<'_> {
async fn evaluate(
self,
evaluator: &Evaluator,
caller_id: &str,
document: &Document,
inputs: Inputs,
root_dir: &Path,
callee_id: &str,
) -> EvaluationResult<Outputs> {
match self {
Target::Task(task) => {
debug!(caller_id, callee_id, "evaluating call to task");
evaluator
.perform_task_evaluation(
document,
task,
inputs.unwrap_task_inputs(),
root_dir,
callee_id,
)
.await?
.into_outputs()
}
Target::Workflow => {
debug!(caller_id, callee_id, "evaluating call to workflow");
evaluator
.perform_workflow_evaluation(
document,
inputs.unwrap_workflow_inputs(),
root_dir,
callee_id,
)
.await
}
}
}
}
let alias = stmt.alias();
let target = stmt.target();
let mut names = target.names().peekable();
let mut namespace = None;
let mut target = None;
while let Some(name) = names.next() {
if names.peek().is_none() {
target = Some(name);
break;
}
if namespace.is_some() {
return Err(EvaluationError::new(
self.document.clone(),
only_one_namespace(name.span()),
));
}
let ns = self.document.namespace(name.text()).ok_or_else(|| {
EvaluationError::new(self.document.clone(), unknown_namespace(&name))
})?;
namespace = Some((name, ns));
}
let target = target.expect("expected at least one name");
let alias = alias
.as_ref()
.map(|t| t.name())
.unwrap_or_else(|| target.clone());
debug!(
workflow_id = id,
workflow_name = self.document.workflow().unwrap().name(),
document = self.document.uri().as_str(),
call_name = alias.text(),
"evaluating call statement",
);
if namespace.is_none()
&& target.text()
== self
.document
.workflow()
.expect("should have workflow")
.name()
{
return Err(EvaluationError::new(
self.document.clone(),
recursive_workflow_call(target.text(), target.span()),
));
}
let inputs = self.inputs.calls().get(alias.text()).cloned();
let document = namespace
.as_ref()
.map(|(_, ns)| ns.document())
.unwrap_or(&self.document);
let (mut inputs, call_target) = match document.task_by_name(target.text()) {
Some(task) => (
inputs.unwrap_or_else(|| Inputs::Task(Default::default())),
Target::Task(task),
),
_ => match document.workflow() {
Some(workflow) if workflow.name() == target.text() => (
inputs.unwrap_or_else(|| Inputs::Workflow(Default::default())),
Target::Workflow,
),
_ => {
return Err(EvaluationError::new(
self.document.clone(),
unknown_task_or_workflow(
namespace.as_ref().map(|(_, ns)| ns.span()),
target.text(),
target.span(),
),
));
}
},
};
let scatter_index = self
.evaluate_call_inputs(stmt, scope, &mut inputs)
.await
.map_err(|d| EvaluationError::new(self.document.clone(), d))?;
let dir = format!(
"{alias}{sep}{scatter_index}",
alias = alias.text(),
sep = if scatter_index.is_empty() { "" } else { "-" },
);
let call_id = format_id(
namespace.as_ref().map(|(n, _)| n.text()),
target.text(),
alias.text(),
&scatter_index,
);
let outputs = call_target
.evaluate(
&self.evaluator,
id,
document,
inputs,
&self.calls_dir.join(&dir),
&call_id,
)
.await
.map_err(|mut e| {
if let EvaluationError::Source(e) = &mut e {
e.backtrace.push(CallLocation {
document: self.document.clone(),
span: stmt
.token::<CallKeyword<SyntaxToken>>()
.expect("should have call keyword")
.span(),
});
}
e
})?
.with_name(alias.text());
let ty = self
.document
.workflow()
.expect("should have workflow")
.calls()
.get(alias.text())
.expect("should have call");
self.scopes.write().await.get_mut(scope).insert(
alias.text(),
Value::Call(CallValue::new_unchecked(ty.clone(), Arc::new(outputs))),
);
Ok(())
}
async fn evaluate_expr(
&self,
scope: ScopeIndex,
expr: &Expr<SyntaxNode>,
) -> Result<Value, Diagnostic> {
let scopes = self.scopes.read().await;
ExprEvaluator::new(WorkflowEvaluationContext::new(
self,
scopes.reference(scope),
))
.evaluate_expr(expr)
.await
}
async fn evaluate_call_inputs(
&self,
stmt: &CallStatement<SyntaxNode>,
scope: ScopeIndex,
inputs: &mut Inputs,
) -> Result<String, Diagnostic> {
let scopes = self.scopes.read().await;
for input in stmt.inputs() {
let name = input.name();
let value = match input.expr() {
Some(expr) => {
let mut evaluator = ExprEvaluator::new(WorkflowEvaluationContext::new(
self,
scopes.reference(scope),
));
evaluator.evaluate_expr(&expr).await?
}
None => scopes
.reference(scope)
.lookup(name.text())
.cloned()
.ok_or_else(|| unknown_name(name.text(), name.span()))?,
};
let prev = inputs.set(input.name().text(), value);
assert!(
prev.is_none(),
"attempted to override a specified call input"
);
}
Ok(scopes.scatter_index(scope))
}
}
#[cfg(test)]
mod test {
use std::fs::read_to_string;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use crankshaft::events::Event;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::sync::broadcast::error::RecvError;
use wdl_analysis::Analyzer;
use wdl_analysis::Config as AnalysisConfig;
use wdl_analysis::DiagnosticsConfig;
use super::*;
use crate::CancellationContext;
use crate::Events;
use crate::config::BackendConfig;
use crate::config::Config;
use crate::config::FailureMode;
#[tokio::test]
async fn it_writes_input_and_output_files() {
let root_dir = TempDir::new().expect("failed to create temporary directory");
fs::write(
root_dir.path().join("source.wdl"),
r#"
version 1.2
task foo {
input {
String a
Int b
Array[String] c
}
command <<<>>>
output {
String x = a
Int y = b
Array[String] z = c
}
}
workflow test {
input {
String a
Int b
Array[String] c
}
call foo {
a = "foo",
b = 10,
c = ["foo", "bar", "baz"]
}
call foo as bar {
a = "bar",
b = 1,
c = []
}
output {
String x = a
Int y = b
Array[String] z = c
}
}
"#,
)
.expect("failed to write WDL source file");
let analyzer = Analyzer::new(
AnalysisConfig::default().with_diagnostics_config(DiagnosticsConfig::except_all()),
|(), _, _, _| async {},
);
analyzer
.add_directory(root_dir.path())
.await
.expect("failed to add directory");
let results = analyzer
.analyze(())
.await
.expect("failed to analyze document");
assert_eq!(results.len(), 1, "expected only one result");
let config = Config {
backends: [(
"default".to_string(),
BackendConfig::Local(Default::default()),
)]
.into(),
..Default::default()
};
let evaluator = Evaluator::new(
root_dir.path(),
config.into(),
Default::default(),
Events::disabled(),
)
.await
.unwrap();
let mut inputs = WorkflowInputs::default();
inputs.set("a", "qux".to_string());
inputs.set("b", 1234);
inputs.set(
"c",
Array::new(
ArrayType::new(PrimitiveType::String),
["jam".to_string(), "cakes".to_string()],
)
.unwrap(),
);
let outputs_dir = root_dir.path().join("outputs");
let outputs = evaluator
.evaluate_workflow(
results.first().expect("should have result").document(),
inputs,
&outputs_dir,
)
.await
.map_err(|e| e.to_string())
.expect("failed to evaluate workflow");
assert_eq!(outputs.iter().count(), 3, "expected three outputs");
assert_eq!(
read_to_string(outputs_dir.join("inputs.json"))
.expect("failed to read workflow `inputs.json`"),
"{\n \"a\": \"qux\",\n \"b\": 1234,\n \"c\": [\n \"jam\",\n \"cakes\"\n ]\n}"
);
assert_eq!(
read_to_string(outputs_dir.join("outputs.json"))
.expect("failed to read workflow `outputs.json`"),
"{\n \"x\": \"qux\",\n \"y\": 1234,\n \"z\": [\n \"jam\",\n \"cakes\"\n ]\n}"
);
assert_eq!(
read_to_string(outputs_dir.join("calls/foo/inputs.json"))
.expect("failed to read foo `inputs.json`"),
"{\n \"a\": \"foo\",\n \"b\": 10,\n \"c\": [\n \"foo\",\n \"bar\",\n \
\"baz\"\n ]\n}"
);
assert_eq!(
read_to_string(outputs_dir.join("calls/foo/outputs.json"))
.expect("failed to read foo `outputs.json`"),
"{\n \"x\": \"foo\",\n \"y\": 10,\n \"z\": [\n \"foo\",\n \"bar\",\n \
\"baz\"\n ]\n}"
);
assert_eq!(
read_to_string(outputs_dir.join("calls/bar/inputs.json"))
.expect("failed to read bar `inputs.json`"),
"{\n \"a\": \"bar\",\n \"b\": 1,\n \"c\": []\n}"
);
assert_eq!(
read_to_string(outputs_dir.join("calls/bar/outputs.json"))
.expect("failed to read bar `outputs.json`"),
"{\n \"x\": \"bar\",\n \"y\": 1,\n \"z\": []\n}"
);
}
#[tokio::test]
async fn it_handles_conditional_with_different_variables() {
let root_dir = TempDir::new().expect("failed to create temporary directory");
fs::write(
root_dir.path().join("source.wdl"),
r#"
version 1.3
task sayColor {
input {
String color
}
command <<<
echo "Hello, ~{color} color!"
>>>
output {
String out = read_string(stdout())
}
}
workflow foo {
input {
Boolean useRed = false
Boolean useGreen = false
Boolean useBlue = false
}
if (useRed) {
String my_variable = "foo"
String my_greeting = "Hello"
call sayColor { input: color = "red" }
} else if (useGreen) {
String my_variable = "bar"
String my_greeting = "Hi"
call sayColor { input: color = "green" }
} else if (useBlue) {
String my_variable = "baz"
call sayColor { input: color = "blue" }
} else {
String my_variable = "quux"
String my_greeting = "Salutations"
call sayColor { input: color = "unknown" }
}
output {
String variable = my_variable
String? greeting = my_greeting
String out = sayColor.out
}
}
"#,
)
.expect("failed to write WDL source file");
let analyzer = Analyzer::new(
AnalysisConfig::default().with_diagnostics_config(DiagnosticsConfig::except_all()),
|(), _, _, _| async {},
);
analyzer
.add_directory(root_dir.path())
.await
.expect("failed to add directory");
let results = analyzer
.analyze(())
.await
.expect("failed to analyze document");
assert_eq!(results.len(), 1, "expected only one result");
let config = Config {
backends: [(
"default".to_string(),
BackendConfig::Local(Default::default()),
)]
.into(),
experimental_features_enabled: true,
..Default::default()
};
let evaluator = Evaluator::new(
root_dir.path(),
config.into(),
Default::default(),
Events::disabled(),
)
.await
.unwrap();
let mut inputs = WorkflowInputs::default();
inputs.set("useBlue", true);
let outputs_dir = root_dir.path().join("outputs_blue");
let outputs = evaluator
.evaluate_workflow(
results.first().expect("should have result").document(),
inputs,
&outputs_dir,
)
.await
.map_err(|e| e.to_string())
.expect("failed to evaluate workflow with useBlue");
assert_eq!(
outputs
.get("variable")
.unwrap()
.clone()
.unwrap_string()
.as_str(),
"baz"
);
assert!(
outputs.get("greeting").unwrap().is_none(),
"greeting should be `None`"
);
assert_eq!(
outputs.get("out").unwrap().clone().unwrap_string().as_str(),
"Hello, blue color!"
);
let mut inputs = WorkflowInputs::default();
inputs.set("useRed", true);
let outputs_dir = root_dir.path().join("outputs_red");
let outputs = evaluator
.evaluate_workflow(
results.first().expect("should have result").document(),
inputs,
&outputs_dir,
)
.await
.map_err(|e| e.to_string())
.expect("failed to evaluate workflow with useRed");
assert_eq!(
outputs
.get("variable")
.unwrap()
.clone()
.unwrap_string()
.as_str(),
"foo"
);
assert_eq!(
outputs
.get("greeting")
.unwrap()
.clone()
.unwrap_string()
.as_str(),
"Hello"
);
assert_eq!(
outputs.get("out").unwrap().clone().unwrap_string().as_str(),
"Hello, red color!"
);
let mut inputs = WorkflowInputs::default();
inputs.set("useGreen", true);
let outputs_dir = root_dir.path().join("outputs_green");
let outputs = evaluator
.evaluate_workflow(
results.first().expect("should have result").document(),
inputs,
&outputs_dir,
)
.await
.map_err(|e| e.to_string())
.expect("failed to evaluate workflow with useGreen");
assert_eq!(
outputs
.get("variable")
.unwrap()
.clone()
.unwrap_string()
.as_str(),
"bar"
);
assert_eq!(
outputs
.get("greeting")
.unwrap()
.clone()
.unwrap_string()
.as_str(),
"Hi"
);
assert_eq!(
outputs.get("out").unwrap().clone().unwrap_string().as_str(),
"Hello, green color!"
);
let inputs = WorkflowInputs::default();
let outputs_dir = root_dir.path().join("outputs_else");
let outputs = evaluator
.evaluate_workflow(
results.first().expect("should have result").document(),
inputs,
&outputs_dir,
)
.await
.map_err(|e| e.to_string())
.expect("failed to evaluate workflow with else");
assert_eq!(
outputs
.get("variable")
.unwrap()
.clone()
.unwrap_string()
.as_str(),
"quux"
);
assert_eq!(
outputs
.get("greeting")
.unwrap()
.clone()
.unwrap_string()
.as_str(),
"Salutations"
);
assert_eq!(
outputs.get("out").unwrap().clone().unwrap_string().as_str(),
"Hello, unknown color!"
);
}
#[tokio::test]
async fn it_reports_progress() {
let root_dir = TempDir::new().expect("failed to create temporary directory");
fs::write(
root_dir.path().join("other.wdl"),
r#"
version 1.1
workflow w {}
"#,
)
.expect("failed to write WDL source file");
let source_path = root_dir.path().join("source.wdl");
fs::write(
&source_path,
r#"
version 1.1
import "other.wdl"
task t {
command <<<>>>
}
workflow w {
scatter (i in range(10)) {
call t
}
scatter (j in range(25)) {
call other.w
}
}
"#,
)
.expect("failed to write WDL source file");
let analyzer = Analyzer::new(
AnalysisConfig::default().with_diagnostics_config(DiagnosticsConfig::except_all()),
|(), _, _, _| async {},
);
analyzer
.add_directory(root_dir.path())
.await
.expect("failed to add directory");
let results = analyzer
.analyze(())
.await
.expect("failed to analyze document");
assert_eq!(results.len(), 2, "expected only two results");
#[derive(Default)]
struct State {
tasks_created: AtomicUsize,
tasks_started: AtomicUsize,
tasks_completed: AtomicUsize,
}
let config = Config {
backends: [(
"default".to_string(),
BackendConfig::Local(Default::default()),
)]
.into(),
..Default::default()
};
let state = Arc::<State>::default();
let events_state = state.clone();
let events = Events::new(100);
let mut crankshaft_rx = events.subscribe_crankshaft().unwrap();
let task = tokio::spawn(async move {
loop {
match crankshaft_rx.recv().await {
Ok(event) => match event {
Event::TaskCreated { name, tes_id, .. } => {
assert!(name.starts_with("t-"));
assert!(tes_id.is_none());
events_state.tasks_created.fetch_add(1, Ordering::SeqCst);
}
Event::TaskStarted { .. } => {
events_state.tasks_started.fetch_add(1, Ordering::SeqCst);
}
Event::TaskCompleted { exit_statuses, .. } => {
assert_eq!(exit_statuses.len(), 1);
assert_eq!(exit_statuses[0].code().expect("should have code"), 0);
events_state.tasks_completed.fetch_add(1, Ordering::SeqCst);
}
_ => panic!("unexpected task event"),
},
Err(RecvError::Closed) => break,
Err(e) => panic!("failed to receive event: {e}"),
}
}
});
let evaluator = Evaluator::new(root_dir.path(), config.into(), Default::default(), events)
.await
.unwrap();
let outputs = evaluator
.evaluate_workflow(
results
.iter()
.find(|r| r.document().uri().as_str().ends_with("source.wdl"))
.expect("should have result")
.document(),
WorkflowInputs::default(),
root_dir.path(),
)
.await
.map_err(|e| e.to_string())
.expect("failed to evaluate workflow");
drop(evaluator);
task.await.expect("failed to await events task");
assert_eq!(outputs.iter().count(), 0, "expected no outputs");
assert_eq!(state.tasks_created.load(Ordering::SeqCst), 10);
assert_eq!(state.tasks_started.load(Ordering::SeqCst), 10);
assert_eq!(state.tasks_completed.load(Ordering::SeqCst), 10);
}
#[tokio::test]
async fn it_cancels_evaluation() {
let root_dir = TempDir::new().expect("failed to create temporary directory");
let source_path = root_dir.path().join("source.wdl");
fs::write(
&source_path,
r#"
version 1.1
task t {
command <<<sleep 30; exit 1>>>
}
workflow w {
scatter (i in range(10)) {
call t
}
}
"#,
)
.expect("failed to write WDL source file");
let analyzer = Analyzer::new(
AnalysisConfig::default().with_diagnostics_config(DiagnosticsConfig::except_all()),
|(), _, _, _| async {},
);
analyzer
.add_directory(root_dir.path())
.await
.expect("failed to add directory");
let results = analyzer
.analyze(())
.await
.expect("failed to analyze document");
assert_eq!(results.len(), 1, "expected only one result");
let config = Config {
backends: [(
"default".to_string(),
BackendConfig::Local(Default::default()),
)]
.into(),
..Default::default()
};
let cancellation = CancellationContext::new(FailureMode::Slow);
let evaluator = Evaluator::new(
root_dir.path(),
config.into(),
cancellation.clone(),
Events::disabled(),
)
.await
.unwrap();
let mut evaluation = evaluator
.evaluate_workflow(
results
.iter()
.find(|r| r.document().uri().as_str().ends_with("source.wdl"))
.expect("should have result")
.document(),
WorkflowInputs::default(),
root_dir.path(),
)
.boxed();
let mut state = CancellationContextState::NotCanceled;
loop {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
match (state, cancellation.cancel()) {
(CancellationContextState::NotCanceled, CancellationContextState::Waiting) => {
state = CancellationContextState::Waiting;
}
(CancellationContextState::Waiting, CancellationContextState::Canceling) => {
state = CancellationContextState::Canceling;
}
(CancellationContextState::Canceling, CancellationContextState::Canceling) => {}
(_, _) => panic!("unexpected state transition"),
}
},
res = &mut evaluation => {
match res {
Ok(_) => panic!("evaluation should not complete"),
Err(EvaluationError::Canceled) => break,
Err(e) => panic!("expected evaluation to be canceled: {e}", e = e.to_string()),
}
}
}
}
}
}