wdl_engine/
eval.rs

1//! Module for evaluation.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fmt;
6use std::fs;
7use std::io::BufRead;
8use std::path::Path;
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use anyhow::Result;
13use anyhow::bail;
14use cloud_copy::TransferEvent;
15use crankshaft::events::Event as CrankshaftEvent;
16use indexmap::IndexMap;
17use itertools::Itertools;
18use rev_buf_reader::RevBufReader;
19use tokio::sync::broadcast;
20use wdl_analysis::Document;
21use wdl_analysis::document::Task;
22use wdl_analysis::types::Type;
23use wdl_ast::Diagnostic;
24use wdl_ast::Span;
25use wdl_ast::SupportedVersion;
26use wdl_ast::v1::TASK_REQUIREMENT_RETURN_CODES;
27use wdl_ast::v1::TASK_REQUIREMENT_RETURN_CODES_ALIAS;
28
29use crate::CompoundValue;
30use crate::Outputs;
31use crate::PrimitiveValue;
32use crate::TaskExecutionResult;
33use crate::Value;
34use crate::http::Location;
35use crate::http::Transferer;
36use crate::path::EvaluationPath;
37use crate::stdlib::download_file;
38
39pub mod trie;
40pub mod v1;
41
42/// The maximum number of stderr lines to display in error messages.
43const MAX_STDERR_LINES: usize = 10;
44
45/// A name used whenever a file system "root" is mapped.
46///
47/// A root might be a root directory like `/` or `C:\`, but it also might be the root of a URL like `https://example.com`.
48const ROOT_NAME: &str = ".root";
49
50/// Represents events that may be sent during evaluation.
51#[derive(Debug, Clone, Default)]
52pub struct Events {
53    /// The Crankshaft events channel.
54    ///
55    /// This is `None` when Crankshaft events are not enabled.
56    crankshaft: Option<broadcast::Sender<CrankshaftEvent>>,
57    /// The transfer events channel.
58    ///
59    /// This is `None` when transfer events are not enabled.
60    transfer: Option<broadcast::Sender<TransferEvent>>,
61}
62
63impl Events {
64    /// Constructs a new `Events` and enables subscribing to all event channels.
65    pub fn all(capacity: usize) -> Self {
66        Self {
67            crankshaft: Some(broadcast::Sender::new(capacity)),
68            transfer: Some(broadcast::Sender::new(capacity)),
69        }
70    }
71
72    /// Constructs a new `Events` and disable subscribing to any event channel.
73    pub fn none() -> Self {
74        Self::default()
75    }
76
77    /// Constructs a new `Events` and enable subscribing to only the Crankshaft
78    /// events channel.
79    pub fn crankshaft_only(capacity: usize) -> Self {
80        Self {
81            crankshaft: Some(broadcast::Sender::new(capacity)),
82            transfer: None,
83        }
84    }
85
86    /// Constructs a new `Events` and enable subscribing to only the transfer
87    /// events channel.
88    pub fn transfer_only(capacity: usize) -> Self {
89        Self {
90            crankshaft: None,
91            transfer: Some(broadcast::Sender::new(capacity)),
92        }
93    }
94
95    /// Subscribes to the Crankshaft events channel.
96    ///
97    /// Returns `None` if Crankshaft events are not enabled.
98    pub fn subscribe_crankshaft(&self) -> Option<broadcast::Receiver<CrankshaftEvent>> {
99        self.crankshaft.as_ref().map(|s| s.subscribe())
100    }
101
102    /// Subscribes to the transfer events channel.
103    ///
104    /// Returns `None` if transfer events are not enabled.
105    pub fn subscribe_transfer(&self) -> Option<broadcast::Receiver<TransferEvent>> {
106        self.transfer.as_ref().map(|s| s.subscribe())
107    }
108
109    /// Gets the sender for the Crankshaft events.
110    pub(crate) fn crankshaft(&self) -> &Option<broadcast::Sender<CrankshaftEvent>> {
111        &self.crankshaft
112    }
113
114    /// Gets the sender for the transfer events.
115    pub(crate) fn transfer(&self) -> &Option<broadcast::Sender<TransferEvent>> {
116        &self.transfer
117    }
118}
119
120/// Represents the location of a call in an evaluation error.
121#[derive(Debug, Clone)]
122pub struct CallLocation {
123    /// The document containing the call statement.
124    pub document: Document,
125    /// The span of the call statement.
126    pub span: Span,
127}
128
129/// Represents an error that originates from WDL source.
130#[derive(Debug)]
131pub struct SourceError {
132    /// The document originating the diagnostic.
133    pub document: Document,
134    /// The evaluation diagnostic.
135    pub diagnostic: Diagnostic,
136    /// The call backtrace for the error.
137    ///
138    /// An empty backtrace denotes that the error was encountered outside of
139    /// a call.
140    ///
141    /// The call locations are stored as most recent to least recent.
142    pub backtrace: Vec<CallLocation>,
143}
144
145/// Represents an error that may occur when evaluating a workflow or task.
146#[derive(Debug)]
147pub enum EvaluationError {
148    /// The error came from WDL source evaluation.
149    Source(Box<SourceError>),
150    /// The error came from another source.
151    Other(anyhow::Error),
152}
153
154impl EvaluationError {
155    /// Creates a new evaluation error from the given document and diagnostic.
156    pub fn new(document: Document, diagnostic: Diagnostic) -> Self {
157        Self::Source(Box::new(SourceError {
158            document,
159            diagnostic,
160            backtrace: Default::default(),
161        }))
162    }
163
164    /// Helper for tests for converting an evaluation error to a string.
165    #[cfg(feature = "codespan-reporting")]
166    #[allow(clippy::inherent_to_string)]
167    pub fn to_string(&self) -> String {
168        use codespan_reporting::diagnostic::Label;
169        use codespan_reporting::diagnostic::LabelStyle;
170        use codespan_reporting::files::SimpleFiles;
171        use codespan_reporting::term::Config;
172        use codespan_reporting::term::termcolor::Buffer;
173        use codespan_reporting::term::{self};
174        use wdl_ast::AstNode;
175
176        match self {
177            Self::Source(e) => {
178                let mut files = SimpleFiles::new();
179                let mut map = HashMap::new();
180
181                let file_id = files.add(e.document.path(), e.document.root().text().to_string());
182
183                let diagnostic =
184                    e.diagnostic
185                        .to_codespan(file_id)
186                        .with_labels_iter(e.backtrace.iter().map(|l| {
187                            let id = l.document.id();
188                            let file_id = *map.entry(id).or_insert_with(|| {
189                                files.add(l.document.path(), l.document.root().text().to_string())
190                            });
191
192                            Label {
193                                style: LabelStyle::Secondary,
194                                file_id,
195                                range: l.span.start()..l.span.end(),
196                                message: "called from this location".into(),
197                            }
198                        }));
199
200                let mut buffer = Buffer::no_color();
201                term::emit(&mut buffer, &Config::default(), &files, &diagnostic)
202                    .expect("failed to emit diagnostic");
203
204                String::from_utf8(buffer.into_inner()).expect("should be UTF-8")
205            }
206            Self::Other(e) => format!("{e:?}"),
207        }
208    }
209}
210
211impl From<anyhow::Error> for EvaluationError {
212    fn from(e: anyhow::Error) -> Self {
213        Self::Other(e)
214    }
215}
216
217/// Represents a result from evaluating a workflow or task.
218pub type EvaluationResult<T> = Result<T, EvaluationError>;
219
220/// Represents a path to a file or directory on the host file system or a URL to
221/// a remote file.
222///
223/// The host in this context is where the WDL evaluation is taking place.
224#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
225pub struct HostPath(pub(crate) Arc<String>);
226
227impl HostPath {
228    /// Constructs a new host path from a string.
229    pub fn new(path: impl Into<String>) -> Self {
230        Self(Arc::new(path.into()))
231    }
232
233    /// Gets the string representation of the host path.
234    pub fn as_str(&self) -> &str {
235        &self.0
236    }
237}
238
239impl fmt::Display for HostPath {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        self.0.fmt(f)
242    }
243}
244
245impl From<Arc<String>> for HostPath {
246    fn from(path: Arc<String>) -> Self {
247        Self(path)
248    }
249}
250
251impl From<HostPath> for Arc<String> {
252    fn from(path: HostPath) -> Self {
253        path.0
254    }
255}
256
257/// Represents a path to a file or directory on the guest.
258///
259/// The guest in this context is the container where tasks are run.
260///
261/// For backends that do not use containers, a guest path is the same as a host
262/// path.
263#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
264pub struct GuestPath(pub(crate) Arc<String>);
265
266impl GuestPath {
267    /// Constructs a new guest path from a string.
268    pub fn new(path: impl Into<String>) -> Self {
269        Self(Arc::new(path.into()))
270    }
271
272    /// Gets the string representation of the guest path.
273    pub fn as_str(&self) -> &str {
274        &self.0
275    }
276}
277
278impl fmt::Display for GuestPath {
279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280        self.0.fmt(f)
281    }
282}
283
284impl From<Arc<String>> for GuestPath {
285    fn from(path: Arc<String>) -> Self {
286        Self(path)
287    }
288}
289
290impl From<GuestPath> for Arc<String> {
291    fn from(path: GuestPath) -> Self {
292        path.0
293    }
294}
295
296/// Represents context to an expression evaluator.
297pub trait EvaluationContext: Send + Sync {
298    /// Gets the supported version of the document being evaluated.
299    fn version(&self) -> SupportedVersion;
300
301    /// Gets the value of the given name in scope.
302    fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic>;
303
304    /// Resolves a type name to a type.
305    fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic>;
306
307    /// Gets the base directory for the evaluation.
308    ///
309    /// The base directory is what paths are relative to.
310    ///
311    /// For workflow evaluation, the base directory is the document's directory.
312    ///
313    /// For task evaluation, the base directory is the document's directory or
314    /// the task's working directory if the `output` section is being evaluated.
315    fn base_dir(&self) -> &EvaluationPath;
316
317    /// Gets the temp directory for the evaluation.
318    fn temp_dir(&self) -> &Path;
319
320    /// Gets the value to return for a call to the `stdout` function.
321    ///
322    /// This returns `Some` only when evaluating a task's outputs section.
323    fn stdout(&self) -> Option<&Value> {
324        None
325    }
326
327    /// Gets the value to return for a call to the `stderr` function.
328    ///
329    /// This returns `Some` only when evaluating a task's outputs section.
330    fn stderr(&self) -> Option<&Value> {
331        None
332    }
333
334    /// Gets the task associated with the evaluation context.
335    ///
336    /// This returns `Some` only when evaluating a task's hints sections.
337    fn task(&self) -> Option<&Task> {
338        None
339    }
340
341    /// Gets the transferer to use for evaluating expressions.
342    fn transferer(&self) -> &dyn Transferer;
343
344    /// Gets a guest path representation of a host path.
345    ///
346    /// Returns `None` if there is no guest path representation of the host
347    /// path.
348    fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
349        let _ = path;
350        None
351    }
352
353    /// Gets a host path representation of a guest path.
354    ///
355    /// Returns `None` if there is no host path representation of the guest
356    /// path.
357    fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
358        let _ = path;
359        None
360    }
361
362    /// Notifies the context that a file was created as a result of a call to a
363    /// stdlib function.
364    ///
365    /// A context may map a guest path for the new host path.
366    fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
367        let _ = path;
368        Ok(())
369    }
370}
371
372/// Represents an index of a scope in a collection of scopes.
373#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
374pub struct ScopeIndex(usize);
375
376impl ScopeIndex {
377    /// Constructs a new scope index from a raw index.
378    pub const fn new(index: usize) -> Self {
379        Self(index)
380    }
381}
382
383impl From<usize> for ScopeIndex {
384    fn from(index: usize) -> Self {
385        Self(index)
386    }
387}
388
389impl From<ScopeIndex> for usize {
390    fn from(index: ScopeIndex) -> Self {
391        index.0
392    }
393}
394
395/// Represents an evaluation scope in a WDL document.
396#[derive(Default, Debug)]
397pub struct Scope {
398    /// The index of the parent scope.
399    ///
400    /// This is `None` for the root scopes.
401    parent: Option<ScopeIndex>,
402    /// The map of names in scope to their values.
403    names: IndexMap<String, Value>,
404}
405
406impl Scope {
407    /// Creates a new scope given the parent scope.
408    pub fn new(parent: ScopeIndex) -> Self {
409        Self {
410            parent: Some(parent),
411            names: Default::default(),
412        }
413    }
414
415    /// Inserts a name into the scope.
416    pub fn insert(&mut self, name: impl Into<String>, value: impl Into<Value>) {
417        let prev = self.names.insert(name.into(), value.into());
418        assert!(prev.is_none(), "conflicting name in scope");
419    }
420
421    /// Iterates over the local names and values in the scope.
422    pub fn local(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
423        self.names.iter().map(|(k, v)| (k.as_str(), v))
424    }
425
426    /// Gets a mutable reference to an existing name in scope.
427    pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut Value> {
428        self.names.get_mut(name)
429    }
430
431    /// Clears the scope.
432    pub(crate) fn clear(&mut self) {
433        self.parent = None;
434        self.names.clear();
435    }
436
437    /// Sets the scope's parent.
438    pub(crate) fn set_parent(&mut self, parent: ScopeIndex) {
439        self.parent = Some(parent);
440    }
441}
442
443impl From<Scope> for IndexMap<String, Value> {
444    fn from(scope: Scope) -> Self {
445        scope.names
446    }
447}
448
449/// Represents a reference to a scope.
450#[derive(Debug, Clone, Copy)]
451pub struct ScopeRef<'a> {
452    /// The reference to the scopes collection.
453    scopes: &'a [Scope],
454    /// The index of the scope in the collection.
455    index: ScopeIndex,
456}
457
458impl<'a> ScopeRef<'a> {
459    /// Creates a new scope reference given the scope index.
460    pub fn new(scopes: &'a [Scope], index: impl Into<ScopeIndex>) -> Self {
461        Self {
462            scopes,
463            index: index.into(),
464        }
465    }
466
467    /// Gets the parent scope.
468    ///
469    /// Returns `None` if there is no parent scope.
470    pub fn parent(&self) -> Option<Self> {
471        self.scopes[self.index.0].parent.map(|p| Self {
472            scopes: self.scopes,
473            index: p,
474        })
475    }
476
477    /// Gets all of the name and values available at this scope.
478    pub fn names(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
479        self.scopes[self.index.0]
480            .names
481            .iter()
482            .map(|(n, name)| (n.as_str(), name))
483    }
484
485    /// Iterates over each name and value visible to the scope and calls the
486    /// provided callback.
487    ///
488    /// Stops iterating and returns an error if the callback returns an error.
489    pub fn for_each(&self, mut cb: impl FnMut(&str, &Value) -> Result<()>) -> Result<()> {
490        let mut current = Some(self.index);
491
492        while let Some(index) = current {
493            for (n, v) in self.scopes[index.0].local() {
494                cb(n, v)?;
495            }
496
497            current = self.scopes[index.0].parent;
498        }
499
500        Ok(())
501    }
502
503    /// Gets the value of a name local to this scope.
504    ///
505    /// Returns `None` if a name local to this scope was not found.
506    pub fn local(&self, name: &str) -> Option<&Value> {
507        self.scopes[self.index.0].names.get(name)
508    }
509
510    /// Lookups a name in the scope.
511    ///
512    /// Returns `None` if the name is not available in the scope.
513    pub fn lookup(&self, name: &str) -> Option<&Value> {
514        let mut current = Some(self.index);
515
516        while let Some(index) = current {
517            if let Some(name) = self.scopes[index.0].names.get(name) {
518                return Some(name);
519            }
520
521            current = self.scopes[index.0].parent;
522        }
523
524        None
525    }
526}
527
528/// Represents an evaluated task.
529#[derive(Debug)]
530pub struct EvaluatedTask {
531    /// The task attempt directory.
532    attempt_dir: PathBuf,
533    /// The task execution result.
534    result: TaskExecutionResult,
535    /// The evaluated outputs of the task.
536    ///
537    /// This is `Ok` when the task executes successfully and all of the task's
538    /// outputs evaluated without error.
539    ///
540    /// Otherwise, this contains the error that occurred while attempting to
541    /// evaluate the task's outputs.
542    outputs: EvaluationResult<Outputs>,
543}
544
545impl EvaluatedTask {
546    /// Constructs a new evaluated task.
547    ///
548    /// Returns an error if the stdout or stderr paths are not UTF-8.
549    fn new(attempt_dir: PathBuf, result: TaskExecutionResult) -> anyhow::Result<Self> {
550        Ok(Self {
551            result,
552            attempt_dir,
553            outputs: Ok(Default::default()),
554        })
555    }
556
557    /// Gets the exit code of the evaluated task.
558    pub fn exit_code(&self) -> i32 {
559        self.result.exit_code
560    }
561
562    /// Gets the attempt directory of the task.
563    pub fn attempt_dir(&self) -> &Path {
564        &self.attempt_dir
565    }
566
567    /// Gets the working directory of the evaluated task.
568    pub fn work_dir(&self) -> &EvaluationPath {
569        &self.result.work_dir
570    }
571
572    /// Gets the stdout value of the evaluated task.
573    pub fn stdout(&self) -> &Value {
574        &self.result.stdout
575    }
576
577    /// Gets the stderr value of the evaluated task.
578    pub fn stderr(&self) -> &Value {
579        &self.result.stderr
580    }
581
582    /// Gets the outputs of the evaluated task.
583    ///
584    /// This is `Ok` when the task executes successfully and all of the task's
585    /// outputs evaluated without error.
586    ///
587    /// Otherwise, this contains the error that occurred while attempting to
588    /// evaluate the task's outputs.
589    pub fn outputs(&self) -> &EvaluationResult<Outputs> {
590        &self.outputs
591    }
592
593    /// Converts the evaluated task into an evaluation result.
594    ///
595    /// Returns `Ok(_)` if the task outputs were evaluated.
596    ///
597    /// Returns `Err(_)` if the task outputs could not be evaluated.
598    pub fn into_result(self) -> EvaluationResult<Outputs> {
599        self.outputs
600    }
601
602    /// Handles the exit of a task execution.
603    ///
604    /// Returns an error if the task failed.
605    async fn handle_exit(
606        &self,
607        requirements: &HashMap<String, Value>,
608        transferer: &dyn Transferer,
609    ) -> anyhow::Result<()> {
610        let mut error = true;
611        if let Some(return_codes) = requirements
612            .get(TASK_REQUIREMENT_RETURN_CODES)
613            .or_else(|| requirements.get(TASK_REQUIREMENT_RETURN_CODES_ALIAS))
614        {
615            match return_codes {
616                Value::Primitive(PrimitiveValue::String(s)) if s.as_ref() == "*" => {
617                    error = false;
618                }
619                Value::Primitive(PrimitiveValue::String(s)) => {
620                    bail!(
621                        "invalid return code value `{s}`: only `*` is accepted when the return \
622                         code is specified as a string"
623                    );
624                }
625                Value::Primitive(PrimitiveValue::Integer(ok)) => {
626                    if self.result.exit_code == i32::try_from(*ok).unwrap_or_default() {
627                        error = false;
628                    }
629                }
630                Value::Compound(CompoundValue::Array(codes)) => {
631                    error = !codes.as_slice().iter().any(|v| {
632                        v.as_integer()
633                            .map(|i| i32::try_from(i).unwrap_or_default() == self.result.exit_code)
634                            .unwrap_or(false)
635                    });
636                }
637                _ => unreachable!("unexpected return codes value"),
638            }
639        } else {
640            error = self.result.exit_code != 0;
641        }
642
643        if error {
644            // Read the last `MAX_STDERR_LINES` number of lines from stderr
645            // If there's a problem reading stderr, don't output it
646            let stderr = download_file(
647                transferer,
648                self.work_dir(),
649                self.stderr().as_file().unwrap(),
650            )
651            .await
652            .ok()
653            .and_then(|l| {
654                fs::File::open(l).ok().map(|f| {
655                    // Buffer the last N number of lines
656                    let reader = RevBufReader::new(f);
657                    let lines: Vec<_> = reader
658                        .lines()
659                        .take(MAX_STDERR_LINES)
660                        .map_while(|l| l.ok())
661                        .collect();
662
663                    // Iterate the lines in reverse order as we read them in reverse
664                    lines
665                        .iter()
666                        .rev()
667                        .format_with("\n", |l, f| f(&format_args!("  {l}")))
668                        .to_string()
669                })
670            })
671            .unwrap_or_default();
672
673            // If the work directory is remote,
674            bail!(
675                "process terminated with exit code {code}: see `{stdout_path}` and \
676                 `{stderr_path}` for task output and the related files in \
677                 `{dir}`{header}{stderr}{trailer}",
678                code = self.result.exit_code,
679                dir = self.attempt_dir().display(),
680                stdout_path = self.stdout().as_file().expect("must be file"),
681                stderr_path = self.stderr().as_file().expect("must be file"),
682                header = if stderr.is_empty() {
683                    Cow::Borrowed("")
684                } else {
685                    format!("\n\ntask stderr output (last {MAX_STDERR_LINES} lines):\n\n").into()
686                },
687                trailer = if stderr.is_empty() { "" } else { "\n" }
688            );
689        }
690
691        Ok(())
692    }
693}
694
695/// Gets the kind of an input.
696#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
697pub enum InputKind {
698    /// The input is a single file.
699    File,
700    /// The input is a directory.
701    Directory,
702}
703
704impl From<InputKind> for crankshaft::engine::task::input::Type {
705    fn from(value: InputKind) -> Self {
706        match value {
707            InputKind::File => Self::File,
708            InputKind::Directory => Self::Directory,
709        }
710    }
711}
712
713/// Represents a `File` or `Directory` input to a task.
714#[derive(Debug, Clone)]
715pub struct Input {
716    /// The input kind.
717    kind: InputKind,
718    /// The path for the input.
719    path: EvaluationPath,
720    /// The guest path for the input.
721    ///
722    /// This is `None` when the backend isn't mapping input paths.
723    guest_path: Option<GuestPath>,
724    /// The download location for the input.
725    ///
726    /// This is `Some` if the input has been downloaded to a known location.
727    location: Option<Location>,
728}
729
730impl Input {
731    /// Creates a new input with the given path and access.
732    fn new(kind: InputKind, path: EvaluationPath, guest_path: Option<GuestPath>) -> Self {
733        Self {
734            kind,
735            path,
736            guest_path,
737            location: None,
738        }
739    }
740
741    /// Gets the kind of the input.
742    pub fn kind(&self) -> InputKind {
743        self.kind
744    }
745
746    /// Gets the path to the input.
747    ///
748    /// The path of the input may be local or remote.
749    pub fn path(&self) -> &EvaluationPath {
750        &self.path
751    }
752
753    /// Gets the guest path for the input.
754    ///
755    /// This is `None` for inputs to backends that don't use containers.
756    pub fn guest_path(&self) -> Option<&GuestPath> {
757        self.guest_path.as_ref()
758    }
759
760    /// Gets the local path of the input.
761    ///
762    /// Returns `None` if the input is remote and has not been localized.
763    pub fn local_path(&self) -> Option<&Path> {
764        self.location.as_deref().or_else(|| self.path.as_local())
765    }
766
767    /// Sets the location of the input.
768    ///
769    /// This is used during localization to set a local path for remote inputs.
770    pub fn set_location(&mut self, location: Location) {
771        self.location = Some(location);
772    }
773}