use std::borrow::Cow;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::fs;
use std::fs::read_link;
use std::fs::remove_file;
use std::io::BufRead;
use std::mem;
use std::path::Path;
use std::path::absolute;
use std::sync::Arc;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use bimap::BiHashMap;
use indexmap::IndexMap;
use itertools::Itertools;
use path_clean::clean;
use petgraph::algo::toposort;
use rev_buf_reader::RevBufReader;
use tokio::task::JoinSet;
use tracing::Level;
use tracing::debug;
use tracing::enabled;
use tracing::info;
use tracing::warn;
use walkdir::WalkDir;
use wdl_analysis::Document;
use wdl_analysis::diagnostics::Io;
use wdl_analysis::diagnostics::multiple_type_mismatch;
use wdl_analysis::diagnostics::unknown_name;
use wdl_analysis::document::TASK_VAR_NAME;
use wdl_analysis::document::Task;
use wdl_analysis::eval::v1::TaskGraphBuilder;
use wdl_analysis::eval::v1::TaskGraphNode;
use wdl_analysis::types::Optional;
use wdl_analysis::types::Type;
use wdl_analysis::types::v1::task_hint_types;
use wdl_analysis::types::v1::task_requirement_types;
use wdl_ast::Ast;
use wdl_ast::AstNode;
use wdl_ast::AstToken;
use wdl_ast::Diagnostic;
use wdl_ast::Span;
use wdl_ast::SupportedVersion;
use wdl_ast::v1::CommandPart;
use wdl_ast::v1::CommandSection;
use wdl_ast::v1::Decl;
use wdl_ast::v1::RequirementsSection;
use wdl_ast::v1::RuntimeSection;
use wdl_ast::v1::StrippedCommandPart;
use wdl_ast::v1::TASK_REQUIREMENT_RETURN_CODES;
use wdl_ast::v1::TASK_REQUIREMENT_RETURN_CODES_ALIAS;
use wdl_ast::v1::TaskDefinition;
use wdl_ast::v1::TaskHintsSection;
use wdl_ast::version::V1;
use super::Evaluator;
use crate::CancellationContextState;
use crate::Coercible;
use crate::CompoundValue;
use crate::ContentKind;
use crate::EngineEvent;
use crate::EvaluationContext;
use crate::EvaluationError;
use crate::EvaluationPath;
use crate::EvaluationResult;
use crate::GuestPath;
use crate::HiddenValue;
use crate::HostPath;
use crate::ONE_GIBIBYTE;
use crate::Object;
use crate::Outputs;
use crate::PrimitiveValue;
use crate::TaskInputs;
use crate::TaskPostEvaluationData;
use crate::TaskPostEvaluationValue;
use crate::TaskPreEvaluationValue;
use crate::TypeNameRefValue;
use crate::Value;
use crate::backend::ExecuteTaskRequest;
use crate::backend::TaskExecutionConstraints;
use crate::backend::TaskExecutionResult;
use crate::cache::KeyRequest;
use crate::config::CallCachingMode;
use crate::config::MAX_RETRIES;
use crate::diagnostics::decl_evaluation_failed;
use crate::diagnostics::runtime_type_mismatch;
use crate::diagnostics::task_execution_failed;
use crate::diagnostics::task_localization_failed;
use crate::diagnostics::unknown_enum;
use crate::eval::EvaluatedTask;
use crate::eval::Scope;
use crate::eval::ScopeIndex;
use crate::eval::ScopeRef;
use crate::eval::trie::InputTrie;
use crate::http::Transferer;
use crate::path::is_file_url;
use crate::path::is_supported_url;
use crate::stdlib::download_file;
use crate::tree::SyntaxNode;
use crate::units::convert_unit_string;
use crate::v1::INPUTS_FILE;
use crate::v1::OUTPUTS_FILE;
use crate::v1::expr::ExprEvaluator;
use crate::v1::resolve_enum_variant_value;
use crate::v1::write_json_file;
pub(crate) mod hints;
pub(crate) mod requirements;
const MAX_STDERR_LINES: usize = 10;
const DEFAULT_TASK_REQUIREMENT_CPU: f64 = 1.0;
const DEFAULT_TASK_REQUIREMENT_MEMORY: i64 = 2 * (ONE_GIBIBYTE as i64);
const DEFAULT_TASK_REQUIREMENT_MAX_RETRIES: u64 = 0;
pub(crate) const DEFAULT_TASK_REQUIREMENT_DISKS: f64 = 1.0;
pub(crate) const DEFAULT_DISK_MOUNT_POINT: &str = "/";
const DEFAULT_GPU_COUNT: u64 = 1;
const ROOT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(0);
const OUTPUT_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(1);
const TASK_SCOPE_INDEX: ScopeIndex = ScopeIndex::new(2);
fn find_key_value<'a, 'b, F>(keys: &[&'a str], lookup: F) -> Option<(&'a str, &'b Value)>
where
F: Fn(&str) -> Option<&'b Value>,
{
keys.iter()
.find_map(|key| lookup(key).map(|value| (*key, value)))
}
fn parse_storage_value(value: &Value, error_message: impl Fn(&str) -> String) -> Result<i64> {
if let Some(v) = value.as_integer() {
return Ok(v);
}
if let Some(s) = value.as_string() {
return convert_unit_string(s)
.and_then(|v| v.try_into().ok())
.with_context(|| error_message(s));
}
unreachable!("value should be an integer or string");
}
struct TaskEvaluationContext<'a, 'b> {
state: &'a mut State<'b>,
scope: ScopeIndex,
work_dir: Option<&'a EvaluationPath>,
stdout: Option<&'a Value>,
stderr: Option<&'a Value>,
task: bool,
}
impl<'a, 'b> TaskEvaluationContext<'a, 'b> {
pub fn new(state: &'a mut State<'b>, scope: ScopeIndex) -> Self {
Self {
state,
scope,
work_dir: None,
stdout: None,
stderr: None,
task: false,
}
}
pub fn with_work_dir(mut self, work_dir: &'a EvaluationPath) -> Self {
self.work_dir = Some(work_dir);
self
}
pub fn with_stdout(mut self, stdout: &'a Value) -> Self {
self.stdout = Some(stdout);
self
}
pub fn with_stderr(mut self, stderr: &'a Value) -> Self {
self.stderr = Some(stderr);
self
}
pub fn with_task(mut self) -> Self {
self.task = true;
self
}
}
impl EvaluationContext for TaskEvaluationContext<'_, '_> {
fn version(&self) -> SupportedVersion {
self.state
.document
.version()
.expect("document should have a version")
}
fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic> {
if let Some(var) = ScopeRef::new(&self.state.scopes, self.scope)
.lookup(name)
.cloned()
{
return Ok(var);
}
if let Some(ty) = self.state.document.get_custom_type(name) {
return Ok(Value::TypeNameRef(TypeNameRefValue::new(ty)));
}
Err(unknown_name(name, span))
}
fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic> {
crate::resolve_type_name(self.state.document, name, span)
}
fn enum_variant_value(&self, enum_name: &str, variant_name: &str) -> Result<Value, Diagnostic> {
let cache_key = self
.state
.document
.get_variant_cache_key(enum_name, variant_name)
.ok_or_else(|| unknown_enum(enum_name))?;
let cache = self.state.evaluator.variant_cache.lock().unwrap();
if let Some(cached_value) = cache.get(&cache_key) {
return Ok(cached_value.clone());
}
drop(cache);
let r#enum = self
.state
.document
.enum_by_name(enum_name)
.ok_or(unknown_enum(enum_name))?;
let value = resolve_enum_variant_value(r#enum, variant_name)?;
let mut cache = self.state.evaluator.variant_cache.lock().unwrap();
cache.insert(cache_key, value.clone());
drop(cache);
Ok(value)
}
fn base_dir(&self) -> &EvaluationPath {
self.work_dir.unwrap_or(&self.state.base_dir)
}
fn temp_dir(&self) -> &Path {
self.state.temp_dir
}
fn stdout(&self) -> Option<&Value> {
self.stdout
}
fn stderr(&self) -> Option<&Value> {
self.stderr
}
fn task(&self) -> Option<&Task> {
if self.task {
Some(self.state.task)
} else {
None
}
}
fn transferer(&self) -> &dyn Transferer {
self.state.transferer().as_ref()
}
fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
self.state.path_map.get_by_right(path).cloned()
}
fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
self.state.path_map.get_by_left(path).cloned()
}
fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
self.state.insert_backend_input(ContentKind::File, path)?;
Ok(())
}
}
struct State<'a> {
evaluator: &'a Evaluator,
temp_dir: &'a Path,
base_dir: EvaluationPath,
document: &'a Document,
task: &'a Task,
scopes: [Scope; 3],
env: IndexMap<String, String>,
inputs: BTreeMap<String, Value>,
backend_inputs: InputTrie,
path_map: BiHashMap<HostPath, GuestPath>,
}
impl<'a> State<'a> {
fn transferer(&self) -> &Arc<dyn Transferer> {
&self.evaluator.transferer
}
fn new(
evaluator: &'a Evaluator,
document: &'a Document,
task: &'a Task,
temp_dir: &'a Path,
) -> Result<Self> {
let scopes = [
Scope::default(),
Scope::new(ROOT_SCOPE_INDEX),
Scope::new(OUTPUT_SCOPE_INDEX),
];
let backend_inputs = if let Some(guest_inputs_dir) = evaluator.backend.guest_inputs_dir() {
InputTrie::new_with_guest_dir(guest_inputs_dir)
} else {
InputTrie::new()
};
let document_path = document.uri();
let base_dir = EvaluationPath::parent_of(document_path.as_str()).with_context(|| {
format!(
"document `{path}` does not have a parent directory",
path = document.path()
)
})?;
Ok(Self {
evaluator,
temp_dir,
base_dir,
document,
task,
scopes,
env: Default::default(),
inputs: Default::default(),
backend_inputs,
path_map: Default::default(),
})
}
async fn add_backend_inputs(
&mut self,
is_optional: bool,
value: &mut Value,
transferer: Arc<dyn Transferer>,
needs_local_inputs: bool,
) -> Result<()> {
if self
.document
.version()
.expect("document should have a version")
>= SupportedVersion::V1(V1::Two)
{
*value = value
.resolve_paths(
is_optional,
self.base_dir.as_local(),
Some(transferer.as_ref()),
&|path| Ok(path.clone()),
)
.await?;
}
let mut urls = Vec::new();
value.visit_paths(&mut |is_file, path| {
if let Some(index) = self.insert_backend_input(
if is_file {
ContentKind::File
} else {
ContentKind::Directory
},
path,
)? {
if needs_local_inputs
&& self.backend_inputs.as_slice()[index].guest_path().is_none()
&& is_supported_url(path.as_str())
&& !is_file_url(path.as_str())
{
urls.push((path.clone(), index));
}
}
Ok(())
})?;
if urls.is_empty() {
return Ok(());
}
let mut downloads = JoinSet::new();
for (url, index) in urls {
let transferer = transferer.clone();
downloads.spawn(async move {
transferer
.download(
&url.as_str()
.parse()
.with_context(|| format!("invalid URL `{url}`"))?,
)
.await
.with_context(|| anyhow!("failed to localize `{url}`"))
.map(|l| (url, l, index))
});
}
while let Some(result) = downloads.join_next().await {
let (url, location, index) =
result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}")))?;
let guest_path = GuestPath::new(location.to_str().with_context(|| {
format!(
"download location `{location}` is not UTF-8",
location = location.display()
)
})?);
self.path_map.insert(url, guest_path);
self.backend_inputs.as_slice_mut()[index].set_location(location);
}
Ok(())
}
fn insert_backend_input(
&mut self,
kind: ContentKind,
path: &HostPath,
) -> Result<Option<usize>> {
if let Some(index) = self
.backend_inputs
.insert(kind, path.as_str(), &self.base_dir)?
{
let input = &self.backend_inputs.as_slice()[index];
if let Some(guest_path) = input.guest_path() {
self.path_map.insert(path.clone(), guest_path.clone());
}
return Ok(Some(index));
}
Ok(None)
}
}
struct EvaluatedSections {
command: String,
requirements: HashMap<String, Value>,
hints: HashMap<String, Value>,
constraints: TaskExecutionConstraints,
}
impl Evaluator {
pub async fn evaluate_task(
&self,
document: &Document,
task: &Task,
inputs: TaskInputs,
eval_root_dir: impl AsRef<Path>,
) -> EvaluationResult<EvaluatedTask> {
if document.has_errors() {
return Err(anyhow!("cannot evaluate a document with errors").into());
}
let result = self
.perform_task_evaluation(document, task, inputs, eval_root_dir.as_ref(), task.name())
.await;
if self.cancellation.user_canceled()
&& self.cancellation.state() == CancellationContextState::Canceling
{
return Err(EvaluationError::Canceled);
}
result
}
pub(crate) async fn perform_task_evaluation(
&self,
document: &Document,
task: &Task,
inputs: TaskInputs,
eval_root_dir: &Path,
id: &str,
) -> EvaluationResult<EvaluatedTask> {
inputs.validate(document, task, None).with_context(|| {
format!(
"failed to validate the inputs to task `{task}`",
task = task.name()
)
})?;
let ast = match document
.root()
.morph()
.ast_with_version_fallback(document.config().fallback_version())
{
Ast::V1(ast) => ast,
_ => {
return Err(
anyhow!("task evaluation is only supported for WDL 1.x documents").into(),
);
}
};
let definition = ast
.tasks()
.find(|t| t.name().text() == task.name())
.expect("task should exist in the AST");
let version = document.version().expect("document should have version");
let mut diagnostics = Vec::new();
let graph =
TaskGraphBuilder::default().build(version, &definition, &mut diagnostics, |name| {
document.struct_by_name(name).is_some() || document.enum_by_name(name).is_some()
});
assert!(
diagnostics.is_empty(),
"task evaluation graph should have no diagnostics"
);
debug!(
task_id = id,
task_name = task.name(),
document = document.uri().as_str(),
"evaluating task"
);
let task_eval_root = absolute(eval_root_dir).with_context(|| {
format!(
"failed to determine absolute path of `{path}`",
path = eval_root_dir.display()
)
})?;
let temp_dir = task_eval_root.join("tmp");
fs::create_dir_all(&temp_dir).with_context(|| {
format!(
"failed to create directory `{path}`",
path = temp_dir.display()
)
})?;
write_json_file(task_eval_root.join(INPUTS_FILE), &inputs)?;
let mut state = State::new(self, document, task, &temp_dir)?;
let nodes = toposort(&graph, None).expect("graph should be acyclic");
let mut current = 0;
while current < nodes.len() {
match &graph[nodes[current]] {
TaskGraphNode::Input(decl) => {
state
.evaluate_input(id, decl, &inputs)
.await
.map_err(|d| EvaluationError::new(state.document.clone(), d))?;
}
TaskGraphNode::Decl(decl) => {
state
.evaluate_decl(id, decl)
.await
.map_err(|d| EvaluationError::new(state.document.clone(), d))?;
}
TaskGraphNode::Output(_) => {
break;
}
TaskGraphNode::Command(_)
| TaskGraphNode::Runtime(_)
| TaskGraphNode::Requirements(_)
| TaskGraphNode::Hints(_) => {
}
}
current += 1;
}
let mut cached;
let mut attempt = 0;
let mut previous_task_data: Option<Arc<TaskPostEvaluationData>> = None;
let mut evaluated = loop {
if self.cancellation.state() != CancellationContextState::NotCanceled {
return Err(EvaluationError::Canceled);
}
let EvaluatedSections {
command,
requirements,
hints,
constraints,
} = state
.evaluate_sections(
id,
&definition,
&inputs,
attempt,
previous_task_data.clone(),
)
.await?;
let max_retries = requirements::max_retries(&inputs, &requirements, &self.config)?;
if max_retries > MAX_RETRIES {
return Err(anyhow!(
"task `max_retries` requirement of {max_retries} cannot exceed {MAX_RETRIES}"
)
.into());
}
state.localize_inputs(id).await?;
let mut key = if attempt == 0
&& let Some(cache) = &self.cache
{
if hints::cacheable(&inputs, &hints, &self.config) {
let default_container =
if requirements::has_container_requirement(&inputs, &requirements) {
None
} else {
Some(self.config.task.container.as_str())
};
let request = KeyRequest {
document_uri: state.document.uri().as_ref(),
task_name: task.name(),
inputs: &state.inputs,
command: &command,
requirements: &requirements,
hints: &hints,
default_container,
shell: &self.config.task.shell,
backend_inputs: state.backend_inputs.as_slice(),
};
match cache.key(request).await {
Ok(key) => {
debug!(
task_id = id,
task_name = state.task.name(),
document = state.document.uri().as_str(),
"task cache key is `{key}`"
);
Some(key)
}
Err(e) => {
warn!(
task_id = id,
task_name = state.task.name(),
document = state.document.uri().as_str(),
"call caching disabled due to cache key calculation failure: {e:#}"
);
None
}
}
} else {
match self.config.task.cache {
CallCachingMode::Off => {
unreachable!("cache was used despite not being enabled")
}
CallCachingMode::On => debug!(
task_id = id,
task_name = state.task.name(),
document = state.document.uri().as_str(),
"task is not cacheable due to `cacheable` hint being set to `false`"
),
CallCachingMode::Explicit => debug!(
task_id = id,
task_name = state.task.name(),
document = state.document.uri().as_str(),
"task is not cacheable due to `cacheable` hint not being explicitly \
set to `true`"
),
}
None
}
} else {
None
};
cached = false;
let result = if let Some(cache_key) = &key {
match self
.cache
.as_ref()
.expect("should have cache")
.get(cache_key)
.await
{
Ok(Some(results)) => {
info!(
task_id = id,
task_name = state.task.name(),
document = state.document.uri().as_str(),
"task execution was skipped due to previous result being present in \
the call cache"
);
cached = true;
if let Some(sender) = &self.events {
let _ = sender.send(EngineEvent::ReusedCachedExecutionResult {
id: id.to_string(),
});
}
key = None;
Some(results)
}
Ok(None) => {
debug!(
task_id = id,
task_name = state.task.name(),
document = state.document.uri().as_str(),
"call cache miss for key `{cache_key}`"
);
None
}
Err(e) => {
info!(
task_id = id,
task_name = state.task.name(),
document = state.document.uri().as_str(),
"ignoring call cache entry: {e:#}"
);
None
}
}
} else {
None
};
let result = match result {
Some(result) => result,
None => {
let mut attempt_dir = task_eval_root.clone();
attempt_dir.push("attempts");
attempt_dir.push(attempt.to_string());
match self
.backend
.execute(
&self.transferer,
ExecuteTaskRequest {
id,
command: &command,
inputs: &inputs,
backend_inputs: state.backend_inputs.as_slice(),
requirements: &requirements,
hints: &hints,
env: &state.env,
constraints: &constraints,
attempt_dir: &attempt_dir,
temp_dir: &temp_dir,
},
)
.await
{
Ok(None) => return Err(EvaluationError::Canceled),
Ok(Some(result)) => result,
Err(e) => {
return Err(EvaluationError::new(
state.document.clone(),
task_execution_failed(&e, task.name(), id, task.name_span()),
));
}
}
}
};
if version >= SupportedVersion::V1(V1::Two) {
let task = state.scopes[TASK_SCOPE_INDEX.0]
.get_mut(TASK_VAR_NAME)
.expect("task variable should exist in scope for WDL v1.2+")
.as_task_post_evaluation_mut()
.expect("task should be a post evaluation task at this point");
task.set_attempt(attempt.try_into().with_context(|| {
format!(
"too many attempts were made to run task `{task}`",
task = state.task.name()
)
})?);
if let Some(container) = &result.container {
task.set_container(container.to_string());
}
task.set_return_code(result.exit_code);
}
if Self::did_task_fail(&requirements, result.exit_code) {
if attempt >= max_retries {
let error =
Self::task_failure_error(&state, id, &result, state.transferer().as_ref())
.await;
break EvaluatedTask::new(cached, result, Some(error));
}
attempt += 1;
if let Some(task) = state.scopes[TASK_SCOPE_INDEX.0].names.get(TASK_VAR_NAME) {
let task = task.as_task_post_evaluation().unwrap();
previous_task_data = Some(task.data().clone());
}
info!(
"retrying execution of task `{name}` (retry {attempt})",
name = state.task.name()
);
continue;
}
if !cached && let Err(e) = self.remap_links(&state, &result.work_dir) {
return Err(EvaluationError::new(
state.document.clone(),
task_execution_failed(&e, state.task.name(), id, state.task.name_span()),
));
}
if let Some(key) = key {
match self
.cache
.as_ref()
.expect("should have cache")
.put(key, &result)
.await
{
Ok(key) => {
debug!(
task_id = id,
task_name = state.task.name(),
document = state.document.uri().as_str(),
"updated call cache entry for key `{key}`"
);
}
Err(e) => {
warn!(
"failed to update call cache entry for task `{name}` (task id \
`{id}`): cache entry has been discarded: {e:#}",
name = task.name()
);
}
}
}
break EvaluatedTask::new(cached, result, None);
};
if !evaluated.failed() {
for index in &nodes[current..] {
match &graph[*index] {
TaskGraphNode::Decl(decl) => {
state
.evaluate_decl(id, decl)
.await
.map_err(|d| EvaluationError::new(state.document.clone(), d))?;
}
TaskGraphNode::Output(decl) => {
state
.evaluate_output(id, decl, &evaluated)
.await
.map_err(|d| EvaluationError::new(state.document.clone(), d))?;
}
_ => {
unreachable!(
"only declarations and outputs should be evaluated after the command"
)
}
}
}
let mut outputs: Outputs = mem::take(&mut state.scopes[OUTPUT_SCOPE_INDEX.0]).into();
if let Some(section) = definition.output() {
let indexes: HashMap<_, _> = section
.declarations()
.enumerate()
.map(|(i, d)| (d.name().hashable(), i))
.collect();
outputs.sort_by(move |a, b| indexes[a].cmp(&indexes[b]))
}
write_json_file(task_eval_root.join(OUTPUTS_FILE), &outputs)?;
evaluated.outputs = outputs;
}
Ok(evaluated)
}
fn did_task_fail(requirements: &HashMap<String, Value>, exit_code: i32) -> bool {
if let Some(return_codes) = requirements
.get(TASK_REQUIREMENT_RETURN_CODES)
.or_else(|| requirements.get(TASK_REQUIREMENT_RETURN_CODES_ALIAS))
{
match return_codes {
Value::Primitive(PrimitiveValue::String(s)) => s.as_ref() != "*",
Value::Primitive(PrimitiveValue::Integer(ok)) => {
exit_code != i32::try_from(*ok).unwrap_or_default()
}
Value::Compound(CompoundValue::Array(codes)) => !codes.as_slice().iter().any(|v| {
v.as_integer()
.map(|i| i32::try_from(i).unwrap_or_default() == exit_code)
.unwrap_or(false)
}),
_ => unreachable!("unexpected return codes value"),
}
} else {
exit_code != 0
}
}
fn remap_links(&self, state: &State<'_>, work_dir: &EvaluationPath) -> Result<()> {
if self.backend.guest_inputs_dir().is_none() {
return Ok(());
}
let Some(work_dir) = work_dir.as_local() else {
return Ok(());
};
for entry in WalkDir::new(work_dir).follow_links(false) {
let entry = entry.with_context(|| {
format!("failed to read directory `{dir}`", dir = work_dir.display())
})?;
if !entry.path_is_symlink() {
continue;
}
let path = entry.path();
let link_path = read_link(path)
.with_context(|| format!("failed to read link `{path}`", path = path.display()))?;
let symlink_guest_path = clean(work_dir.join(&link_path));
if symlink_guest_path.starts_with(work_dir) {
continue;
}
let Some(guest) = state
.path_map
.right_values()
.find(|p| symlink_guest_path.starts_with(p.0.as_str()))
else {
bail!(
"`{path}` links to guest path `{link_path}` but it is not to a task input or \
inside of the task's work directory",
path = path.display(),
link_path = link_path.display()
);
};
let host = state.path_map.get_by_right(guest).unwrap();
let base_host_path =
if self.backend.needs_local_inputs() && is_supported_url(host.as_str()) {
state
.backend_inputs
.as_slice()
.iter()
.find_map(|i| {
let url = i.path().as_remote()?.as_str();
let host = host.as_str();
if url.strip_suffix('/').unwrap_or(url)
== host.strip_suffix('/').unwrap_or(host)
{
Some(i.local_path()?)
} else {
None
}
})
.with_context(|| {
format!(
"cannot remap symbolic link for guest path `{guest}` because a \
localized path for URL `{host}` was not found"
)
})?
} else {
Path::new(host.0.as_str())
};
let symlink_host_path: Cow<'_, Path> = if let Ok(stripped) =
symlink_guest_path.strip_prefix(guest.0.as_str())
&& !stripped.as_os_str().is_empty()
{
Cow::Owned(base_host_path.join(stripped))
} else {
Cow::Borrowed(base_host_path)
};
remove_file(path).with_context(|| {
format!(
"failed to remove symbolic link `{path}`",
path = path.display()
)
})?;
#[cfg(unix)]
{
std::os::unix::fs::symlink(&symlink_host_path, path).with_context(|| {
format!(
"failed to create symlink `{path}` to `{symlink_path}`",
path = path.display(),
symlink_path = symlink_host_path.display()
)
})?;
}
#[cfg(windows)]
{
if symlink_host_path.is_dir() {
std::os::windows::fs::symlink_dir(&symlink_host_path, path).with_context(
|| {
format!(
"failed to create directory symlink `{path}` to `{symlink_path}`",
path = path.display(),
symlink_path = symlink_host_path.display()
)
},
)?;
} else {
std::os::windows::fs::symlink_file(&symlink_host_path, path).with_context(
|| {
format!(
"failed to create file symlink `{path}` to `{symlink_path}`",
path = path.display(),
symlink_path = symlink_host_path.display()
)
},
)?;
}
}
}
Ok(())
}
async fn task_failure_error(
state: &State<'_>,
id: &str,
result: &TaskExecutionResult,
transferer: &dyn Transferer,
) -> EvaluationError {
let stderr = download_file(
transferer,
&result.work_dir,
result.stderr.as_file().unwrap(),
)
.await
.ok()
.and_then(|l| {
fs::File::open(l).ok().map(|f| {
let reader = RevBufReader::new(f);
let lines: Vec<_> = reader
.lines()
.take(MAX_STDERR_LINES)
.map_while(|l| l.ok())
.collect();
lines
.iter()
.rev()
.format_with("\n", |l, f| f(&format_args!(" {l}")))
.to_string()
})
})
.unwrap_or_default();
let error = anyhow!(
"process terminated with exit code {code}: see `{stdout_path}` and `{stderr_path}` \
for task output{header}{stderr}{trailer}",
code = result.exit_code,
stdout_path = result.stdout.as_file().expect("must be file"),
stderr_path = result.stderr.as_file().expect("must be file"),
header = if stderr.is_empty() {
Cow::Borrowed("")
} else {
format!("\n\ntask stderr output (last {MAX_STDERR_LINES} lines):\n\n").into()
},
trailer = if stderr.is_empty() { "" } else { "\n" }
);
EvaluationError::new(
state.document.clone(),
task_execution_failed(&error, state.task.name(), id, state.task.name_span()),
)
}
}
impl<'a> State<'a> {
async fn evaluate_input(
&mut self,
id: &str,
decl: &Decl<SyntaxNode>,
inputs: &TaskInputs,
) -> Result<(), Diagnostic> {
let name = decl.name();
let decl_ty = decl.ty();
let expected_ty = crate::convert_ast_type_v1(self.document, &decl_ty)?;
let (value, span) = match inputs.get(name.text()) {
Some(input) => {
if input.is_none()
&& !expected_ty.is_optional()
&& let Some(expr) = decl.expr()
{
debug!(
task_id = id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
input_name = name.text(),
"evaluating input default expression"
);
let mut evaluator =
ExprEvaluator::new(TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX));
(evaluator.evaluate_expr(&expr).await?, expr.span())
} else {
(input.clone(), name.span())
}
}
None => match decl.expr() {
Some(expr) => {
debug!(
task_id = id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
input_name = name.text(),
"evaluating input default expression"
);
let mut evaluator =
ExprEvaluator::new(TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX));
(evaluator.evaluate_expr(&expr).await?, expr.span())
}
_ => {
assert!(expected_ty.is_optional(), "type should be optional");
(Value::new_none(expected_ty.clone()), name.span())
}
},
};
let mut value = value
.coerce(
Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)),
&expected_ty,
)
.map_err(|e| runtime_type_mismatch(e, &expected_ty, name.span(), &value.ty(), span))?;
self.add_backend_inputs(
decl_ty.is_optional(),
&mut value,
self.transferer().clone(),
self.evaluator.backend.needs_local_inputs(),
)
.await
.map_err(|e| {
decl_evaluation_failed(
e,
self.task.name(),
true,
name.text(),
Some(Io::Input),
name.span(),
)
})?;
self.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
self.inputs.insert(name.text().to_string(), value.clone());
if decl.env().is_some() {
let value = value
.as_primitive()
.expect("value should be primitive")
.raw(Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)))
.to_string();
self.env.insert(name.text().to_string(), value);
}
Ok(())
}
async fn evaluate_decl(&mut self, id: &str, decl: &Decl<SyntaxNode>) -> Result<(), Diagnostic> {
let name = decl.name();
debug!(
task_id = id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
decl_name = name.text(),
"evaluating private declaration",
);
let decl_ty = decl.ty();
let ty = crate::convert_ast_type_v1(self.document, &decl_ty)?;
let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX));
let expr = decl.expr().expect("private decls should have expressions");
let value = evaluator.evaluate_expr(&expr).await?;
let mut value = value
.coerce(
Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)),
&ty,
)
.map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
self.add_backend_inputs(
decl_ty.is_optional(),
&mut value,
self.transferer().clone(),
self.evaluator.backend.needs_local_inputs(),
)
.await
.map_err(|e| {
decl_evaluation_failed(e, self.task.name(), true, name.text(), None, name.span())
})?;
self.scopes[ROOT_SCOPE_INDEX.0].insert(name.text(), value.clone());
if decl.env().is_some() {
let value = value
.as_primitive()
.expect("value should be primitive")
.raw(Some(&TaskEvaluationContext::new(self, ROOT_SCOPE_INDEX)))
.to_string();
self.env.insert(name.text().to_string(), value);
}
Ok(())
}
async fn evaluate_runtime_section(
&mut self,
id: &str,
section: &RuntimeSection<SyntaxNode>,
inputs: &TaskInputs,
) -> Result<(HashMap<String, Value>, HashMap<String, Value>), Diagnostic> {
debug!(
task_id = id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
"evaluating runtimes section",
);
let mut requirements = HashMap::new();
let mut hints = HashMap::new();
let version = self
.document
.version()
.expect("document should have version");
let scope_index = if version >= SupportedVersion::V1(V1::Three) {
TASK_SCOPE_INDEX
} else {
ROOT_SCOPE_INDEX
};
for item in section.items() {
let name = item.name();
match inputs.requirement(name.text()) {
Some(value) => {
requirements.insert(name.text().to_string(), value.clone());
continue;
}
_ => {
if let Some(value) = inputs.hint(name.text()) {
hints.insert(name.text().to_string(), value.clone());
continue;
}
}
}
let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(self, scope_index));
let (types, requirement) = match task_requirement_types(version, name.text()) {
Some(types) => (Some(types), true),
None => match task_hint_types(version, name.text(), false) {
Some(types) => (Some(types), false),
None => (None, false),
},
};
let expr = item.expr();
let mut value = evaluator.evaluate_expr(&expr).await?;
if let Some(types) = types {
value = types
.iter()
.find_map(|ty| {
value
.coerce(Some(&TaskEvaluationContext::new(self, scope_index)), ty)
.ok()
})
.ok_or_else(|| {
multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
})?;
}
if requirement {
requirements.insert(name.text().to_string(), value);
} else {
hints.insert(name.text().to_string(), value);
}
}
Ok((requirements, hints))
}
async fn evaluate_requirements_section(
&mut self,
id: &str,
section: &RequirementsSection<SyntaxNode>,
inputs: &TaskInputs,
) -> Result<HashMap<String, Value>, Diagnostic> {
debug!(
task_id = id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
"evaluating requirements",
);
let mut requirements = HashMap::new();
let version = self
.document
.version()
.expect("document should have version");
let scope_index = if version >= SupportedVersion::V1(V1::Three) {
TASK_SCOPE_INDEX
} else {
ROOT_SCOPE_INDEX
};
for item in section.items() {
let name = item.name();
if let Some(value) = inputs.requirement(name.text()) {
requirements.insert(name.text().to_string(), value.clone());
continue;
}
let mut evaluator = ExprEvaluator::new(TaskEvaluationContext::new(self, scope_index));
let types =
task_requirement_types(version, name.text()).expect("requirement should be known");
let expr = item.expr();
let value = evaluator.evaluate_expr(&expr).await?;
let value = types
.iter()
.find_map(|ty| {
value
.coerce(Some(&TaskEvaluationContext::new(self, scope_index)), ty)
.ok()
})
.ok_or_else(|| {
multiple_type_mismatch(types, name.span(), &value.ty(), expr.span())
})?;
requirements.insert(name.text().to_string(), value);
}
Ok(requirements)
}
async fn evaluate_hints_section(
&mut self,
id: &str,
section: &TaskHintsSection<SyntaxNode>,
inputs: &TaskInputs,
) -> Result<HashMap<String, Value>, Diagnostic> {
debug!(
task_id = id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
"evaluating hints section",
);
let mut hints = HashMap::new();
let version = self
.document
.version()
.expect("document should have version");
let scope_index = if version >= SupportedVersion::V1(V1::Three) {
TASK_SCOPE_INDEX
} else {
ROOT_SCOPE_INDEX
};
for item in section.items() {
let name = item.name();
if let Some(value) = inputs.hint(name.text()) {
hints.insert(name.text().to_string(), value.clone());
continue;
}
let mut evaluator =
ExprEvaluator::new(TaskEvaluationContext::new(self, scope_index).with_task());
let value = evaluator.evaluate_hints_item(&name, &item.expr()).await?;
hints.insert(name.text().to_string(), value);
}
Ok(hints)
}
async fn evaluate_command(
&mut self,
id: &str,
section: &CommandSection<SyntaxNode>,
) -> EvaluationResult<String> {
debug!(
task_id = id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
"evaluating command section",
);
let document = self.document.clone();
let mut command = String::new();
match section.strip_whitespace() {
Some(parts) => {
let mut evaluator =
ExprEvaluator::new(TaskEvaluationContext::new(self, TASK_SCOPE_INDEX));
for part in parts {
match part {
StrippedCommandPart::Text(t) => {
command.push_str(t.as_str());
}
StrippedCommandPart::Placeholder(placeholder) => {
evaluator
.evaluate_placeholder(&placeholder, &mut command)
.await
.map_err(|d| EvaluationError::new(document.clone(), d))?;
}
}
}
}
_ => {
warn!(
"command for task `{task}` in `{uri}` has mixed indentation; whitespace \
stripping was skipped",
task = self.task.name(),
uri = self.document.uri(),
);
let mut evaluator =
ExprEvaluator::new(TaskEvaluationContext::new(self, TASK_SCOPE_INDEX));
let heredoc = section.is_heredoc();
for part in section.parts() {
match part {
CommandPart::Text(t) => {
t.unescape_to(heredoc, &mut command);
}
CommandPart::Placeholder(placeholder) => {
evaluator
.evaluate_placeholder(&placeholder, &mut command)
.await
.map_err(|d| EvaluationError::new(document.clone(), d))?;
}
}
}
}
}
Ok(command)
}
async fn evaluate_sections(
&mut self,
id: &str,
definition: &TaskDefinition<SyntaxNode>,
inputs: &TaskInputs,
attempt: u64,
previous_task_data: Option<Arc<TaskPostEvaluationData>>,
) -> EvaluationResult<EvaluatedSections> {
let version = self.document.version();
let task_meta = definition
.metadata()
.map(|s| Object::from_v1_metadata(s.items()))
.unwrap_or_else(Object::empty);
let task_parameter_meta = definition
.parameter_metadata()
.map(|s| Object::from_v1_metadata(s.items()))
.unwrap_or_else(Object::empty);
let task_ext = Object::empty();
if version >= Some(SupportedVersion::V1(V1::Three)) {
let mut task = TaskPreEvaluationValue::new(
self.task.name(),
id,
attempt.try_into().expect("attempt should fit in i64"),
task_meta.clone(),
task_parameter_meta.clone(),
task_ext.clone(),
);
if let Some(prev_data) = &previous_task_data {
task.set_previous(prev_data.clone());
}
let scope = &mut self.scopes[TASK_SCOPE_INDEX.0];
if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
*v = HiddenValue::TaskPreEvaluation(task).into();
} else {
scope.insert(TASK_VAR_NAME, HiddenValue::TaskPreEvaluation(task));
}
}
let (requirements, hints) = match definition.runtime() {
Some(section) => self
.evaluate_runtime_section(id, §ion, inputs)
.await
.map_err(|d| EvaluationError::new(self.document.clone(), d))?,
_ => (
match definition.requirements() {
Some(section) => self
.evaluate_requirements_section(id, §ion, inputs)
.await
.map_err(|d| EvaluationError::new(self.document.clone(), d))?,
None => Default::default(),
},
match definition.hints() {
Some(section) => self
.evaluate_hints_section(id, §ion, inputs)
.await
.map_err(|d| EvaluationError::new(self.document.clone(), d))?,
None => Default::default(),
},
),
};
let constraints = self
.evaluator
.backend
.constraints(inputs, &requirements, &hints)
.with_context(|| {
format!(
"failed to get constraints for task `{task}`",
task = self.task.name()
)
})?;
if version >= Some(SupportedVersion::V1(V1::Two)) {
let max_retries =
requirements::max_retries(inputs, &requirements, &self.evaluator.config)?;
let mut task = TaskPostEvaluationValue::new(
self.task.name(),
id,
&constraints,
max_retries.try_into().with_context(|| {
format!(
"the number of max retries is too large to run task `{task}`",
task = self.task.name()
)
})?,
attempt.try_into().with_context(|| {
format!(
"too many attempts were made to run task `{task}`",
task = self.task.name()
)
})?,
task_meta,
task_parameter_meta,
task_ext,
);
if let Some(version) = version
&& version >= SupportedVersion::V1(V1::Three)
&& let Some(prev_data) = &previous_task_data
{
task.set_previous(prev_data.clone());
}
let scope = &mut self.scopes[TASK_SCOPE_INDEX.0];
if let Some(v) = scope.get_mut(TASK_VAR_NAME) {
*v = HiddenValue::TaskPostEvaluation(task).into();
} else {
scope.insert(TASK_VAR_NAME, HiddenValue::TaskPostEvaluation(task));
}
}
let command = self
.evaluate_command(
id,
&definition.command().expect("must have command section"),
)
.await?;
Ok(EvaluatedSections {
command,
requirements,
hints,
constraints,
})
}
async fn evaluate_output(
&mut self,
id: &str,
decl: &Decl<SyntaxNode>,
evaluated: &EvaluatedTask,
) -> Result<(), Diagnostic> {
let name = decl.name();
debug!(
task_id = id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
output_name = name.text(),
"evaluating output",
);
let decl_ty = decl.ty();
let ty = crate::convert_ast_type_v1(self.document, &decl_ty)?;
let mut evaluator = ExprEvaluator::new(
TaskEvaluationContext::new(self, TASK_SCOPE_INDEX)
.with_work_dir(&evaluated.result.work_dir)
.with_stdout(&evaluated.result.stdout)
.with_stderr(&evaluated.result.stderr),
);
let expr = decl.expr().expect("outputs should have expressions");
let value = evaluator.evaluate_expr(&expr).await?;
let mut value = value
.coerce(Some(evaluator.context()), &ty)
.map_err(|e| runtime_type_mismatch(e, &ty, name.span(), &value.ty(), expr.span()))?;
value = value
.resolve_paths(
ty.is_optional(),
self.base_dir.as_local(),
Some(self.transferer().as_ref()),
&|path| {
if self.path_map.contains_left(path) {
return Ok(path.clone());
}
let output_path = evaluated.result.work_dir.join(path.as_str())?;
if self.evaluator.backend.guest_inputs_dir().is_none() {
return Ok(HostPath::new(String::try_from(output_path)?));
}
let output_path = if let (Some(joined), Some(base)) =
(output_path.as_local(), evaluated.result.work_dir.as_local())
{
if joined.starts_with(base)
|| joined == evaluated.stdout().as_file().unwrap().as_str()
|| joined == evaluated.stderr().as_file().unwrap().as_str()
{
HostPath::new(String::try_from(output_path)?)
} else {
self.path_map
.get_by_right(&GuestPath(path.0.clone()))
.ok_or_else(|| {
anyhow!(
"guest path `{path}` is not an input or within the task's \
working directory"
)
})?
.0
.clone()
.into()
}
} else if let (Some(_), Some(_)) = (
output_path.as_local(),
evaluated.result.work_dir.as_remote(),
) {
bail!("cannot access guest path `{path}` from a remotely executing task")
} else {
HostPath::new(String::try_from(output_path)?)
};
Ok(output_path)
},
)
.await
.map_err(|e| {
decl_evaluation_failed(
e,
self.task.name(),
true,
name.text(),
Some(Io::Output),
name.span(),
)
})?;
self.scopes[OUTPUT_SCOPE_INDEX.0].insert(name.text(), value);
Ok(())
}
async fn localize_inputs(&mut self, task_id: &str) -> EvaluationResult<()> {
if self.evaluator.backend.needs_local_inputs() {
let mut downloads = JoinSet::new();
for (idx, input) in self.backend_inputs.as_slice_mut().iter_mut().enumerate() {
if input.local_path().is_some() {
continue;
}
if let Some(url) = input.path().as_remote() {
let transferer = self.evaluator.transferer.clone();
let url = url.clone();
downloads.spawn(async move {
transferer
.download(&url)
.await
.map(|l| (idx, l))
.with_context(|| anyhow!("failed to localize `{url}`"))
});
}
}
while let Some(result) = downloads.join_next().await {
match result.unwrap_or_else(|e| Err(anyhow!("download task failed: {e}"))) {
Ok((idx, location)) => {
self.backend_inputs.as_slice_mut()[idx].set_location(location);
}
Err(e) => {
return Err(EvaluationError::new(
self.document.clone(),
task_localization_failed(e, self.task.name(), self.task.name_span()),
));
}
}
}
}
if enabled!(Level::DEBUG) {
for input in self.backend_inputs.as_slice() {
match (
input.path().as_local().is_some(),
input.local_path(),
input.guest_path(),
) {
(true, _, None) | (false, None, None) => {}
(true, _, Some(guest_path)) => {
debug!(
task_id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
"task input `{path}` mapped to `{guest_path}`",
path = input.path(),
);
}
(false, Some(local_path), None) => {
debug!(
task_id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
"task input `{path}` downloaded to `{local_path}`",
path = input.path(),
local_path = local_path.display()
);
}
(false, None, Some(guest_path)) => {
debug!(
task_id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
"task input `{path}` mapped to `{guest_path}`",
path = input.path(),
);
}
(false, Some(local_path), Some(guest_path)) => {
debug!(
task_id,
task_name = self.task.name(),
document = self.document.uri().as_str(),
"task input `{path}` downloaded to `{local_path}` and mapped to \
`{guest_path}`",
path = input.path(),
local_path = local_path.display(),
);
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod test {
use std::fs;
use std::path::Path;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
use tracing_test::traced_test;
use wdl_analysis::Analyzer;
use wdl_analysis::Config as AnalysisConfig;
use wdl_analysis::DiagnosticsConfig;
use crate::CancellationContext;
use crate::Events;
use crate::TaskInputs;
use crate::config::BackendConfig;
use crate::config::CallCachingMode;
use crate::config::Config;
use crate::eval::EvaluatedTask;
use crate::v1::Evaluator;
async fn evaluate_task(mode: CallCachingMode, root_dir: &Path, source: &str) -> EvaluatedTask {
fs::write(root_dir.join("source.wdl"), source).expect("failed to write WDL source file");
let analyzer = Analyzer::new(
AnalysisConfig::default().with_diagnostics_config(DiagnosticsConfig::except_all()),
|(), _, _, _| async {},
);
analyzer
.add_directory(root_dir)
.await
.expect("failed to add directory");
let results = analyzer
.analyze(())
.await
.expect("failed to analyze document");
assert_eq!(results.len(), 1, "expected only one result");
let document = results.first().expect("should have result").document();
let mut config = Config::default();
config.task.cache = mode;
config.task.cache_dir = root_dir.join("cache").to_string_lossy().into();
config
.backends
.insert("default".into(), BackendConfig::Local(Default::default()));
let evaluator = Evaluator::new(
&root_dir.join("runs"),
config.into(),
CancellationContext::default(),
Events::disabled(),
)
.await
.unwrap();
let runs_dir = root_dir.join("runs");
evaluator
.evaluate_task(
document,
document.task_by_name("test").expect("should have task"),
TaskInputs::default(),
&runs_dir,
)
.await
.unwrap()
}
#[tokio::test]
#[traced_test]
async fn cache_off() {
const SOURCE: &str = r#"
version 1.2
task test {
input {
String name = "friend"
}
command <<<echo "hello, ~{name}!">>>
output {
String message = read_string(stdout())
}
}
"#;
let root_dir = tempdir().expect("failed to create temporary directory");
let evaluated = evaluate_task(CallCachingMode::Off, root_dir.path(), SOURCE).await;
assert!(!evaluated.cached());
assert_eq!(evaluated.exit_code(), 0);
assert_eq!(
fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
.unwrap()
.trim(),
"hello, friend!"
);
assert_eq!(
fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
""
);
assert!(
logs_contain("call caching is disabled"),
"expected cache to be off"
);
}
#[tokio::test]
#[traced_test]
async fn cache_on() {
const SOURCE: &str = r#"
version 1.2
task test {
input {
String name = "friend"
}
command <<<echo "hello, ~{name}!">>>
output {
String message = read_string(stdout())
}
}
"#;
let root_dir = tempdir().expect("failed to create temporary directory");
let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
assert!(!evaluated.cached());
assert_eq!(evaluated.exit_code(), 0);
assert_eq!(
fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
.unwrap()
.trim(),
"hello, friend!"
);
assert_eq!(
fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
""
);
assert!(logs_contain("using call cache"), "expected cache to be on");
assert!(
logs_contain("call cache miss"),
"expected first run to miss the cache"
);
assert!(
logs_contain("running task"),
"expected the task to have executed"
);
let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
assert!(evaluated.cached());
assert_eq!(evaluated.exit_code(), 0);
assert_eq!(
fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
.unwrap()
.trim(),
"hello, friend!"
);
assert_eq!(
fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
""
);
assert!(
logs_contain("task execution was skipped"),
"expected second run to skip execution"
);
}
#[tokio::test]
#[traced_test]
async fn cache_on_not_cacheable() {
const SOURCE: &str = r#"
version 1.2
task test {
input {
String name = "friend"
}
command <<<echo "hello, ~{name}!">>>
hints {
cacheable: false
}
output {
String message = read_string(stdout())
}
}
"#;
let root_dir = tempdir().expect("failed to create temporary directory");
let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
assert!(!evaluated.cached());
assert_eq!(evaluated.exit_code(), 0);
assert_eq!(
fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
.unwrap()
.trim(),
"hello, friend!"
);
assert_eq!(
fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
""
);
assert!(logs_contain("using call cache"), "expected cache to be on");
assert!(
logs_contain("task is not cacheable due to `cacheable` hint being set to `false`"),
"expected task to not be cacheable"
);
let evaluated = evaluate_task(CallCachingMode::On, root_dir.path(), SOURCE).await;
assert!(!evaluated.cached());
assert_eq!(evaluated.exit_code(), 0);
assert_eq!(
fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
.unwrap()
.trim(),
"hello, friend!"
);
assert_eq!(
fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
""
);
assert!(
!logs_contain("task execution was skipped"),
"expected second run to not skip execution"
);
}
#[tokio::test]
#[traced_test]
async fn cache_explicit() {
const SOURCE: &str = r#"
version 1.2
task test {
input {
String name = "friend"
}
command <<<echo "hello, ~{name}!">>>
output {
String message = read_string(stdout())
}
}
"#;
let root_dir = tempdir().expect("failed to create temporary directory");
let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
assert!(!evaluated.cached());
assert_eq!(evaluated.exit_code(), 0);
assert_eq!(
fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
.unwrap()
.trim(),
"hello, friend!"
);
assert_eq!(
fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
""
);
assert!(logs_contain("using call cache"), "expected cache to be on");
assert!(
logs_contain(
"task is not cacheable due to `cacheable` hint not being explicitly set to `true`"
),
"expected task to not be cacheable"
);
let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
assert!(!evaluated.cached());
assert_eq!(evaluated.exit_code(), 0);
assert_eq!(
fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
.unwrap()
.trim(),
"hello, friend!"
);
assert_eq!(
fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
""
);
assert!(
!logs_contain("task execution was skipped"),
"expected second run to not skip execution"
);
}
#[tokio::test]
#[traced_test]
async fn cache_explicit_cacheable() {
const SOURCE: &str = r#"
version 1.2
task test {
input {
String name = "friend"
}
command <<<echo "hello, ~{name}!">>>
hints {
cacheable: true
}
output {
String message = read_string(stdout())
}
}
"#;
let root_dir = tempdir().expect("failed to create temporary directory");
let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
assert!(!evaluated.cached());
assert_eq!(evaluated.exit_code(), 0);
assert_eq!(
fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
.unwrap()
.trim(),
"hello, friend!"
);
assert_eq!(
fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
""
);
assert!(logs_contain("using call cache"), "expected cache to be on");
assert!(
logs_contain("call cache miss"),
"expected first run to miss the cache"
);
assert!(
logs_contain("running task"),
"expected the task to have executed"
);
let evaluated = evaluate_task(CallCachingMode::Explicit, root_dir.path(), SOURCE).await;
assert!(evaluated.cached());
assert_eq!(evaluated.exit_code(), 0);
assert_eq!(
fs::read_to_string(evaluated.stdout().as_file().unwrap().as_str())
.unwrap()
.trim(),
"hello, friend!"
);
assert_eq!(
fs::read_to_string(evaluated.stderr().as_file().unwrap().as_str()).unwrap(),
""
);
assert!(
logs_contain("task execution was skipped"),
"expected second run to skip execution"
);
}
}