use std::borrow::Cow;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering;
use anyhow::Result;
use cloud_copy::TransferEvent;
use crankshaft::events::Event as CrankshaftEvent;
use indexmap::IndexMap;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tracing::error;
use wdl_analysis::Document;
use wdl_analysis::document::Task;
use wdl_analysis::types::Type;
use wdl_ast::Diagnostic;
use wdl_ast::Span;
use wdl_ast::SupportedVersion;
use crate::EvaluationPath;
use crate::GuestPath;
use crate::HostPath;
use crate::Outputs;
use crate::Value;
use crate::backend::TaskExecutionResult;
use crate::config::FailureMode;
use crate::http::Transferer;
mod trie;
pub mod v1;
const ROOT_NAME: &str = ".root";
const CANCELLATION_STATE_NOT_CANCELED: u8 = 0;
const CANCELLATION_STATE_WAITING: u8 = 1;
const CANCELLATION_STATE_CANCELING: u8 = 2;
const CANCELLATION_STATE_ERROR: u8 = 4;
const CANCELLATION_STATE_MASK: u8 = 0x3;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CancellationContextState {
NotCanceled,
Waiting,
Canceling,
}
impl CancellationContextState {
fn get(state: &Arc<AtomicU8>) -> Self {
match state.load(Ordering::SeqCst) & CANCELLATION_STATE_MASK {
CANCELLATION_STATE_NOT_CANCELED => Self::NotCanceled,
CANCELLATION_STATE_WAITING => Self::Waiting,
CANCELLATION_STATE_CANCELING => Self::Canceling,
_ => unreachable!("unexpected cancellation context state"),
}
}
fn update(mode: FailureMode, error: bool, state: &Arc<AtomicU8>) -> Option<Self> {
let previous_state = state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
if error && state != CANCELLATION_STATE_NOT_CANCELED {
return None;
}
let mut new_state = match state & CANCELLATION_STATE_MASK {
CANCELLATION_STATE_NOT_CANCELED => match mode {
FailureMode::Slow => CANCELLATION_STATE_WAITING,
FailureMode::Fast => CANCELLATION_STATE_CANCELING,
},
CANCELLATION_STATE_WAITING => CANCELLATION_STATE_CANCELING,
CANCELLATION_STATE_CANCELING => CANCELLATION_STATE_CANCELING,
_ => unreachable!("unexpected cancellation context state"),
};
if error {
new_state |= CANCELLATION_STATE_ERROR;
}
Some(new_state | (state & CANCELLATION_STATE_ERROR))
})
.ok()?;
match previous_state & CANCELLATION_STATE_MASK {
CANCELLATION_STATE_NOT_CANCELED => match mode {
FailureMode::Slow => Some(Self::Waiting),
FailureMode::Fast => Some(Self::Canceling),
},
CANCELLATION_STATE_WAITING => Some(Self::Canceling),
CANCELLATION_STATE_CANCELING => Some(Self::Canceling),
_ => unreachable!("unexpected cancellation context state"),
}
}
}
#[derive(Debug, Clone)]
pub struct CancellationContext {
mode: FailureMode,
state: Arc<AtomicU8>,
first: CancellationToken,
second: CancellationToken,
}
impl CancellationContext {
pub fn new(mode: FailureMode) -> Self {
Self {
mode,
state: Arc::new(CANCELLATION_STATE_NOT_CANCELED.into()),
first: CancellationToken::new(),
second: CancellationToken::new(),
}
}
pub fn state(&self) -> CancellationContextState {
CancellationContextState::get(&self.state)
}
#[must_use]
pub fn cancel(&self) -> CancellationContextState {
let state =
CancellationContextState::update(self.mode, false, &self.state).expect("should update");
match state {
CancellationContextState::NotCanceled => panic!("should be canceled"),
CancellationContextState::Waiting => self.first.cancel(),
CancellationContextState::Canceling => {
self.first.cancel();
self.second.cancel();
}
}
state
}
pub fn first(&self) -> CancellationToken {
self.first.clone()
}
pub fn second(&self) -> CancellationToken {
self.second.clone()
}
pub(crate) fn user_canceled(&self) -> bool {
let state = self.state.load(Ordering::SeqCst);
state != CANCELLATION_STATE_NOT_CANCELED && (state & CANCELLATION_STATE_ERROR == 0)
}
pub(crate) fn error(&self, error: &EvaluationError) {
if let Some(state) = CancellationContextState::update(self.mode, true, &self.state) {
let message: Cow<'_, str> = match error {
EvaluationError::Canceled => "evaluation was canceled".into(),
EvaluationError::Source(e) => e.diagnostic.message().into(),
EvaluationError::Other(e) => format!("{e:#}").into(),
};
match state {
CancellationContextState::NotCanceled => unreachable!("should be canceled"),
CancellationContextState::Waiting => {
self.first.cancel();
error!(
"an evaluation error occurred: waiting for any executing tasks to \
complete: {message}"
);
}
CancellationContextState::Canceling => {
self.first.cancel();
self.second.cancel();
error!(
"an evaluation error occurred: waiting for any executing tasks to cancel: \
{message}"
);
}
}
}
}
}
impl Default for CancellationContext {
fn default() -> Self {
Self::new(FailureMode::Slow)
}
}
#[derive(Debug, Clone)]
pub enum EngineEvent {
ReusedCachedExecutionResult {
id: String,
},
TaskParked,
TaskUnparked {
canceled: bool,
},
}
#[derive(Debug, Clone, Default)]
pub struct Events {
engine: Option<broadcast::Sender<EngineEvent>>,
crankshaft: Option<broadcast::Sender<CrankshaftEvent>>,
transfer: Option<broadcast::Sender<TransferEvent>>,
}
impl Events {
pub fn new(capacity: usize) -> Self {
Self {
engine: Some(broadcast::Sender::new(capacity)),
crankshaft: Some(broadcast::Sender::new(capacity)),
transfer: Some(broadcast::Sender::new(capacity)),
}
}
pub fn disabled() -> Self {
Self::default()
}
pub fn subscribe_engine(&self) -> Option<broadcast::Receiver<EngineEvent>> {
self.engine.as_ref().map(|s| s.subscribe())
}
pub fn subscribe_crankshaft(&self) -> Option<broadcast::Receiver<CrankshaftEvent>> {
self.crankshaft.as_ref().map(|s| s.subscribe())
}
pub fn subscribe_transfer(&self) -> Option<broadcast::Receiver<TransferEvent>> {
self.transfer.as_ref().map(|s| s.subscribe())
}
pub(crate) fn engine(&self) -> &Option<broadcast::Sender<EngineEvent>> {
&self.engine
}
pub(crate) fn crankshaft(&self) -> &Option<broadcast::Sender<CrankshaftEvent>> {
&self.crankshaft
}
pub(crate) fn transfer(&self) -> &Option<broadcast::Sender<TransferEvent>> {
&self.transfer
}
}
#[derive(Debug, Clone)]
pub struct CallLocation {
pub document: Document,
pub span: Span,
}
#[derive(Debug)]
pub struct SourceError {
pub document: Document,
pub diagnostic: Diagnostic,
pub backtrace: Vec<CallLocation>,
}
#[derive(Debug)]
pub enum EvaluationError {
Canceled,
Source(Box<SourceError>),
Other(anyhow::Error),
}
impl EvaluationError {
pub fn new(document: Document, diagnostic: Diagnostic) -> Self {
Self::Source(Box::new(SourceError {
document,
diagnostic,
backtrace: Default::default(),
}))
}
#[allow(clippy::inherent_to_string)]
pub fn to_string(&self) -> String {
use std::collections::HashMap;
use codespan_reporting::diagnostic::Label;
use codespan_reporting::diagnostic::LabelStyle;
use codespan_reporting::files::SimpleFiles;
use codespan_reporting::term;
use codespan_reporting::term::Config;
use codespan_reporting::term::termcolor::Buffer;
use wdl_ast::AstNode;
match self {
Self::Canceled => "evaluation was canceled".to_string(),
Self::Source(e) => {
let mut files = SimpleFiles::new();
let mut map = HashMap::new();
let file_id = files.add(e.document.path(), e.document.root().text().to_string());
let diagnostic =
e.diagnostic
.to_codespan(file_id)
.with_labels_iter(e.backtrace.iter().map(|l| {
let id = l.document.id();
let file_id = *map.entry(id).or_insert_with(|| {
files.add(l.document.path(), l.document.root().text().to_string())
});
Label {
style: LabelStyle::Secondary,
file_id,
range: l.span.start()..l.span.end(),
message: "called from this location".into(),
}
}));
let mut buffer = Buffer::no_color();
term::emit_to_write_style(&mut buffer, &Config::default(), &files, &diagnostic)
.expect("failed to emit diagnostic");
String::from_utf8(buffer.into_inner()).expect("should be UTF-8")
}
Self::Other(e) => format!("{e:#}"),
}
}
}
impl From<anyhow::Error> for EvaluationError {
fn from(e: anyhow::Error) -> Self {
Self::Other(e)
}
}
pub type EvaluationResult<T> = Result<T, EvaluationError>;
pub(crate) trait EvaluationContext: Send + Sync {
fn version(&self) -> SupportedVersion;
fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic>;
fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic>;
fn enum_variant_value(&self, enum_name: &str, variant_name: &str) -> Result<Value, Diagnostic>;
fn base_dir(&self) -> &EvaluationPath;
fn temp_dir(&self) -> &Path;
fn stdout(&self) -> Option<&Value> {
None
}
fn stderr(&self) -> Option<&Value> {
None
}
fn task(&self) -> Option<&Task> {
None
}
fn transferer(&self) -> &dyn Transferer;
fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
let _ = path;
None
}
fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
let _ = path;
None
}
fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
let _ = path;
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct ScopeIndex(usize);
impl ScopeIndex {
pub const fn new(index: usize) -> Self {
Self(index)
}
}
impl From<usize> for ScopeIndex {
fn from(index: usize) -> Self {
Self(index)
}
}
impl From<ScopeIndex> for usize {
fn from(index: ScopeIndex) -> Self {
index.0
}
}
#[derive(Default, Debug)]
struct Scope {
parent: Option<ScopeIndex>,
names: IndexMap<String, Value>,
}
impl Scope {
pub fn new(parent: ScopeIndex) -> Self {
Self {
parent: Some(parent),
names: Default::default(),
}
}
pub fn insert(&mut self, name: impl Into<String>, value: impl Into<Value>) {
let prev = self.names.insert(name.into(), value.into());
assert!(prev.is_none(), "conflicting name in scope");
}
pub fn local(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
self.names.iter().map(|(k, v)| (k.as_str(), v))
}
pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut Value> {
self.names.get_mut(name)
}
pub(crate) fn clear(&mut self) {
self.parent = None;
self.names.clear();
}
pub(crate) fn set_parent(&mut self, parent: ScopeIndex) {
self.parent = Some(parent);
}
}
impl From<Scope> for IndexMap<String, Value> {
fn from(scope: Scope) -> Self {
scope.names
}
}
impl From<Scope> for Outputs {
fn from(scope: Scope) -> Self {
scope.names.into()
}
}
#[derive(Debug, Clone, Copy)]
struct ScopeRef<'a> {
scopes: &'a [Scope],
index: ScopeIndex,
}
impl<'a> ScopeRef<'a> {
pub fn new(scopes: &'a [Scope], index: impl Into<ScopeIndex>) -> Self {
Self {
scopes,
index: index.into(),
}
}
pub fn parent(&self) -> Option<Self> {
self.scopes[self.index.0].parent.map(|p| Self {
scopes: self.scopes,
index: p,
})
}
pub fn local(&self, name: &str) -> Option<&Value> {
self.scopes[self.index.0].names.get(name)
}
pub fn lookup(&self, name: &str) -> Option<&Value> {
let mut current = Some(self.index);
while let Some(index) = current {
if let Some(name) = self.scopes[index.0].names.get(name) {
return Some(name);
}
current = self.scopes[index.0].parent;
}
None
}
}
#[derive(Debug)]
pub struct EvaluatedTask {
result: TaskExecutionResult,
outputs: Outputs,
error: Option<EvaluationError>,
cached: bool,
}
impl EvaluatedTask {
fn new(cached: bool, result: TaskExecutionResult, error: Option<EvaluationError>) -> Self {
Self {
result,
outputs: Default::default(),
error,
cached,
}
}
pub fn failed(&self) -> bool {
self.error.is_some()
}
pub fn cached(&self) -> bool {
self.cached
}
pub fn exit_code(&self) -> i32 {
self.result.exit_code
}
pub fn work_dir(&self) -> &EvaluationPath {
&self.result.work_dir
}
pub fn stdout(&self) -> &Value {
&self.result.stdout
}
pub fn stderr(&self) -> &Value {
&self.result.stderr
}
pub fn into_outputs(self) -> EvaluationResult<Outputs> {
match self.error {
Some(e) => Err(e),
None => Ok(self.outputs),
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn cancellation_slow() {
let context = CancellationContext::new(FailureMode::Slow);
assert_eq!(context.state(), CancellationContextState::NotCanceled);
assert_eq!(context.cancel(), CancellationContextState::Waiting);
assert_eq!(context.state(), CancellationContextState::Waiting);
assert!(context.user_canceled());
assert!(context.first.is_cancelled());
assert!(!context.second.is_cancelled());
assert_eq!(context.cancel(), CancellationContextState::Canceling);
assert_eq!(context.state(), CancellationContextState::Canceling);
assert!(context.user_canceled());
assert!(context.first.is_cancelled());
assert!(context.second.is_cancelled());
assert_eq!(context.cancel(), CancellationContextState::Canceling);
assert_eq!(context.state(), CancellationContextState::Canceling);
assert!(context.user_canceled());
assert!(context.first.is_cancelled());
assert!(context.second.is_cancelled());
}
#[test]
fn cancellation_fast() {
let context = CancellationContext::new(FailureMode::Fast);
assert_eq!(context.state(), CancellationContextState::NotCanceled);
assert_eq!(context.cancel(), CancellationContextState::Canceling);
assert_eq!(context.state(), CancellationContextState::Canceling);
assert!(context.user_canceled());
assert!(context.first.is_cancelled());
assert!(context.second.is_cancelled());
assert_eq!(context.cancel(), CancellationContextState::Canceling);
assert_eq!(context.state(), CancellationContextState::Canceling);
assert!(context.user_canceled());
assert!(context.first.is_cancelled());
assert!(context.second.is_cancelled());
}
#[test]
fn cancellation_error_slow() {
let context = CancellationContext::new(FailureMode::Slow);
assert_eq!(context.state(), CancellationContextState::NotCanceled);
context.error(&EvaluationError::Canceled);
assert_eq!(context.state(), CancellationContextState::Waiting);
assert!(!context.user_canceled());
assert!(context.first.is_cancelled());
assert!(!context.second.is_cancelled());
context.error(&EvaluationError::Canceled);
assert_eq!(context.state(), CancellationContextState::Waiting);
assert!(!context.user_canceled());
assert!(context.first.is_cancelled());
assert!(!context.second.is_cancelled());
assert_eq!(context.cancel(), CancellationContextState::Canceling);
assert_eq!(context.state(), CancellationContextState::Canceling);
assert!(!context.user_canceled());
assert!(context.first.is_cancelled());
assert!(context.second.is_cancelled());
}
#[test]
fn cancellation_error_fast() {
let context = CancellationContext::new(FailureMode::Fast);
assert_eq!(context.state(), CancellationContextState::NotCanceled);
context.error(&EvaluationError::Canceled);
assert_eq!(context.state(), CancellationContextState::Canceling);
assert!(!context.user_canceled());
assert!(context.first.is_cancelled());
assert!(context.second.is_cancelled());
context.error(&EvaluationError::Canceled);
assert_eq!(context.state(), CancellationContextState::Canceling);
assert!(!context.user_canceled());
assert!(context.first.is_cancelled());
assert!(context.second.is_cancelled());
assert_eq!(context.cancel(), CancellationContextState::Canceling);
assert_eq!(context.state(), CancellationContextState::Canceling);
assert!(!context.user_canceled());
assert!(context.first.is_cancelled());
assert!(context.second.is_cancelled());
}
}