wdl_engine/
eval.rs

1//! Module for evaluation.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fmt;
6use std::fs;
7use std::io::BufRead;
8use std::path::Path;
9use std::sync::Arc;
10use std::sync::atomic::AtomicU8;
11use std::sync::atomic::Ordering;
12
13use anyhow::Context;
14use anyhow::Result;
15use anyhow::bail;
16use cloud_copy::TransferEvent;
17use crankshaft::events::Event as CrankshaftEvent;
18use indexmap::IndexMap;
19use itertools::Itertools;
20use num_enum::IntoPrimitive;
21use rev_buf_reader::RevBufReader;
22use tokio::sync::broadcast;
23use tokio_util::sync::CancellationToken;
24use tracing::error;
25use wdl_analysis::Document;
26use wdl_analysis::document::Task;
27use wdl_analysis::types::Type;
28use wdl_ast::Diagnostic;
29use wdl_ast::Span;
30use wdl_ast::SupportedVersion;
31use wdl_ast::v1::TASK_REQUIREMENT_RETURN_CODES;
32use wdl_ast::v1::TASK_REQUIREMENT_RETURN_CODES_ALIAS;
33
34use crate::CompoundValue;
35use crate::Outputs;
36use crate::PrimitiveValue;
37use crate::Value;
38use crate::backend::TaskExecutionResult;
39use crate::cache::Hashable;
40use crate::config::FailureMode;
41use crate::http::Location;
42use crate::http::Transferer;
43use crate::path;
44use crate::path::EvaluationPath;
45use crate::stdlib::download_file;
46
47pub mod trie;
48pub mod v1;
49
50/// The maximum number of stderr lines to display in error messages.
51const MAX_STDERR_LINES: usize = 10;
52
53/// A name used whenever a file system "root" is mapped.
54///
55/// A root might be a root directory like `/` or `C:\`, but it also might be the root of a URL like `https://example.com`.
56const ROOT_NAME: &str = ".root";
57
58/// A constant to denote that no cancellation has occurred yet.
59const CANCELLATION_STATE_NOT_CANCELED: u8 = 0;
60
61/// A state bit to indicate that we're waiting for executing tasks to
62/// complete.
63///
64/// This bit is mutually exclusive with the `CANCELING` bit.
65const CANCELLATION_STATE_WAITING: u8 = 1;
66
67/// A state bit to denote that we're waiting for executing tasks to cancel.
68///
69/// This bit is mutually exclusive with the `WAITING` bit.
70const CANCELLATION_STATE_CANCELING: u8 = 2;
71
72/// A state bit to denote that cancellation was the result of an error.
73///
74/// This bit will only be set if either the `CANCELING` bit or the `WAITING`
75/// bit are set.
76const CANCELLATION_STATE_ERROR: u8 = 4;
77
78/// The mask to apply to the state for excluding the error bit.
79const CANCELLATION_STATE_MASK: u8 = 0x3;
80
81/// Represents the current state of a [`CancellationContext`].
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum CancellationContextState {
84    /// The context has not been canceled yet.
85    NotCanceled,
86    /// The context has been canceled and is waiting for executing tasks to
87    /// complete.
88    Waiting,
89    /// The context has been canceled and is waiting for executing tasks to
90    /// cancel.
91    Canceling,
92}
93
94impl CancellationContextState {
95    /// Gets the current context state.
96    fn get(state: &Arc<AtomicU8>) -> Self {
97        match state.load(Ordering::SeqCst) & CANCELLATION_STATE_MASK {
98            CANCELLATION_STATE_NOT_CANCELED => Self::NotCanceled,
99            CANCELLATION_STATE_WAITING => Self::Waiting,
100            CANCELLATION_STATE_CANCELING => Self::Canceling,
101            _ => unreachable!("unexpected cancellation context state"),
102        }
103    }
104
105    /// Updates the context state and returns the new state.
106    ///
107    /// Returns `None` if the update is for an error and there has already been
108    /// a cancellation (i.e. the update was not successful).
109    fn update(mode: FailureMode, error: bool, state: &Arc<AtomicU8>) -> Option<Self> {
110        // Update the provided state with the new state
111        let previous_state = state
112            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
113                // If updating for an error and there has been a cancellation, bail out
114                if error && state != CANCELLATION_STATE_NOT_CANCELED {
115                    return None;
116                }
117
118                // Otherwise, calculate the new state
119                let mut new_state = match state & CANCELLATION_STATE_MASK {
120                    CANCELLATION_STATE_NOT_CANCELED => match mode {
121                        FailureMode::Slow => CANCELLATION_STATE_WAITING,
122                        FailureMode::Fast => CANCELLATION_STATE_CANCELING,
123                    },
124                    CANCELLATION_STATE_WAITING => CANCELLATION_STATE_CANCELING,
125                    CANCELLATION_STATE_CANCELING => CANCELLATION_STATE_CANCELING,
126                    _ => unreachable!("unexpected cancellation context state"),
127                };
128
129                // Mark the error bit upon error
130                if error {
131                    new_state |= CANCELLATION_STATE_ERROR;
132                }
133
134                // Return the new state along with the old error bit
135                Some(new_state | (state & CANCELLATION_STATE_ERROR))
136            })
137            .ok()?;
138
139        match previous_state & CANCELLATION_STATE_MASK {
140            CANCELLATION_STATE_NOT_CANCELED => match mode {
141                FailureMode::Slow => Some(Self::Waiting),
142                FailureMode::Fast => Some(Self::Canceling),
143            },
144            CANCELLATION_STATE_WAITING => Some(Self::Canceling),
145            CANCELLATION_STATE_CANCELING => Some(Self::Canceling),
146            _ => unreachable!("unexpected cancellation context state"),
147        }
148    }
149}
150
151/// Represents context for cancelling workflow or task evaluation.
152///
153/// Uses a default failure mode of [`Slow`](FailureMode::Slow).
154#[derive(Clone)]
155pub struct CancellationContext {
156    /// The failure mode for the cancellation context.
157    mode: FailureMode,
158    /// The state of the cancellation context.
159    state: Arc<AtomicU8>,
160    /// Stores the underlying cancellation token.
161    token: CancellationToken,
162}
163
164impl CancellationContext {
165    /// Constructs a cancellation context for the given [`FailureMode`].
166    ///
167    /// If the provided `mode` is [`Slow`](FailureMode::Slow), the first call to
168    /// [`cancel`](Self::cancel) will wait for currently executing tasks to
169    /// complete; a subsequent call to [`cancel`](Self::cancel) will cancel the
170    /// currently executing tasks.
171    ///
172    /// If the provided `mode` is [`Fast`](FailureMode::Fast), the first call to
173    /// [`cancel`](Self::cancel) will cancel the currently executing tasks.
174    pub fn new(mode: FailureMode) -> Self {
175        Self {
176            mode,
177            state: Arc::new(CANCELLATION_STATE_NOT_CANCELED.into()),
178            token: CancellationToken::new(),
179        }
180    }
181
182    /// Gets the [`CancellationContextState`] of this [`CancellationContext`].
183    pub fn state(&self) -> CancellationContextState {
184        CancellationContextState::get(&self.state)
185    }
186
187    /// Performs a cancellation.
188    ///
189    /// Returns the current [`CancellationContextState`] which should be checked
190    /// to ensure the desired cancellation occurred.
191    ///
192    /// This method will never return a
193    /// [`CancellationContextState::NotCanceled`] state.
194    #[must_use]
195    pub fn cancel(&self) -> CancellationContextState {
196        let state =
197            CancellationContextState::update(self.mode, false, &self.state).expect("should update");
198        assert!(
199            state != CancellationContextState::NotCanceled,
200            "should be canceled"
201        );
202
203        if state == CancellationContextState::Canceling {
204            self.token.cancel();
205        }
206
207        state
208    }
209
210    /// Gets the cancellation token from the context.
211    ///
212    /// The token will be canceled when the [`CancellationContext::cancel`] is
213    /// called and the resulting state is
214    /// [`CancellationContextState::Canceling`].
215    ///
216    /// Callers should _not_ directly cancel the returned token and instead call
217    /// [`CancellationContext::cancel`].
218    pub fn token(&self) -> CancellationToken {
219        self.token.clone()
220    }
221
222    /// Determines if the user initiated the cancellation.
223    pub(crate) fn user_canceled(&self) -> bool {
224        let state = self.state.load(Ordering::SeqCst);
225        state != CANCELLATION_STATE_NOT_CANCELED && (state & CANCELLATION_STATE_ERROR == 0)
226    }
227
228    /// Triggers a cancellation as a result of an error.
229    ///
230    /// If the context has already been canceled, this is a no-op.
231    ///
232    /// Otherwise, a cancellation is attempted and an error message is logged
233    /// depending on the current state of the context.
234    pub(crate) fn error(&self, error: &EvaluationError) {
235        if let Some(state) = CancellationContextState::update(self.mode, true, &self.state) {
236            let message: Cow<'_, str> = match error {
237                EvaluationError::Canceled => "evaluation was canceled".into(),
238                EvaluationError::Source(e) => e.diagnostic.message().into(),
239                EvaluationError::Other(e) => format!("{e:#}").into(),
240            };
241
242            match state {
243                CancellationContextState::NotCanceled => unreachable!("should be canceled"),
244                CancellationContextState::Waiting => {
245                    error!(
246                        "an evaluation error occurred: waiting for any executing tasks to \
247                         complete: {message}"
248                    );
249                }
250                CancellationContextState::Canceling => {
251                    self.token.cancel();
252
253                    error!(
254                        "an evaluation error occurred: waiting for any executing tasks to cancel: \
255                         {message}"
256                    );
257                }
258            }
259        }
260    }
261}
262
263impl Default for CancellationContext {
264    fn default() -> Self {
265        Self::new(FailureMode::Slow)
266    }
267}
268
269/// Represents an event from the WDL evaluation engine.
270#[derive(Debug, Clone)]
271pub enum EngineEvent {
272    /// A cached task execution result was reused due to a call cache hit.
273    ReusedCachedExecutionResult {
274        /// The id of the task that reused a cached execution result.
275        id: String,
276    },
277}
278
279/// Represents events that may be sent during WDL evaluation.
280#[derive(Debug, Clone, Default)]
281pub struct Events {
282    /// The WDL engine events channel.
283    ///
284    /// This is `None` when engine events are not enabled.
285    engine: Option<broadcast::Sender<EngineEvent>>,
286    /// The Crankshaft events channel.
287    ///
288    /// This is `None` when Crankshaft events are not enabled.
289    crankshaft: Option<broadcast::Sender<CrankshaftEvent>>,
290    /// The transfer events channel.
291    ///
292    /// This is `None` when transfer events are not enabled.
293    transfer: Option<broadcast::Sender<TransferEvent>>,
294}
295
296impl Events {
297    /// Constructs a new `Events` and enables subscribing to all event channels.
298    pub fn new(capacity: usize) -> Self {
299        Self {
300            engine: Some(broadcast::Sender::new(capacity)),
301            crankshaft: Some(broadcast::Sender::new(capacity)),
302            transfer: Some(broadcast::Sender::new(capacity)),
303        }
304    }
305
306    /// Constructs a new `Events` and disable subscribing to any event channel.
307    pub fn disabled() -> Self {
308        Self::default()
309    }
310
311    /// Subscribes to the WDL engine events channel.
312    ///
313    /// Returns `None` if WDL engine events are not enabled.
314    pub fn subscribe_engine(&self) -> Option<broadcast::Receiver<EngineEvent>> {
315        self.engine.as_ref().map(|s| s.subscribe())
316    }
317
318    /// Subscribes to the Crankshaft events channel.
319    ///
320    /// Returns `None` if Crankshaft events are not enabled.
321    pub fn subscribe_crankshaft(&self) -> Option<broadcast::Receiver<CrankshaftEvent>> {
322        self.crankshaft.as_ref().map(|s| s.subscribe())
323    }
324
325    /// Subscribes to the transfer events channel.
326    ///
327    /// Returns `None` if transfer events are not enabled.
328    pub fn subscribe_transfer(&self) -> Option<broadcast::Receiver<TransferEvent>> {
329        self.transfer.as_ref().map(|s| s.subscribe())
330    }
331
332    /// Gets the sender for the Crankshaft events.
333    pub(crate) fn engine(&self) -> &Option<broadcast::Sender<EngineEvent>> {
334        &self.engine
335    }
336
337    /// Gets the sender for the Crankshaft events.
338    pub(crate) fn crankshaft(&self) -> &Option<broadcast::Sender<CrankshaftEvent>> {
339        &self.crankshaft
340    }
341
342    /// Gets the sender for the transfer events.
343    pub(crate) fn transfer(&self) -> &Option<broadcast::Sender<TransferEvent>> {
344        &self.transfer
345    }
346}
347
348/// Represents the location of a call in an evaluation error.
349#[derive(Debug, Clone)]
350pub struct CallLocation {
351    /// The document containing the call statement.
352    pub document: Document,
353    /// The span of the call statement.
354    pub span: Span,
355}
356
357/// Represents an error that originates from WDL source.
358#[derive(Debug)]
359pub struct SourceError {
360    /// The document originating the diagnostic.
361    pub document: Document,
362    /// The evaluation diagnostic.
363    pub diagnostic: Diagnostic,
364    /// The call backtrace for the error.
365    ///
366    /// An empty backtrace denotes that the error was encountered outside of
367    /// a call.
368    ///
369    /// The call locations are stored as most recent to least recent.
370    pub backtrace: Vec<CallLocation>,
371}
372
373/// Represents an error that may occur when evaluating a workflow or task.
374#[derive(Debug)]
375pub enum EvaluationError {
376    /// Evaluation was canceled.
377    Canceled,
378    /// The error came from WDL source evaluation.
379    Source(Box<SourceError>),
380    /// The error came from another source.
381    Other(anyhow::Error),
382}
383
384impl EvaluationError {
385    /// Creates a new evaluation error from the given document and diagnostic.
386    pub fn new(document: Document, diagnostic: Diagnostic) -> Self {
387        Self::Source(Box::new(SourceError {
388            document,
389            diagnostic,
390            backtrace: Default::default(),
391        }))
392    }
393
394    /// Helper for tests for converting an evaluation error to a string.
395    #[allow(clippy::inherent_to_string)]
396    pub fn to_string(&self) -> String {
397        use std::collections::HashMap;
398
399        use codespan_reporting::diagnostic::Label;
400        use codespan_reporting::diagnostic::LabelStyle;
401        use codespan_reporting::files::SimpleFiles;
402        use codespan_reporting::term::Config;
403        use codespan_reporting::term::termcolor::Buffer;
404        use codespan_reporting::term::{self};
405        use wdl_ast::AstNode;
406
407        match self {
408            Self::Canceled => "evaluation was canceled".to_string(),
409            Self::Source(e) => {
410                let mut files = SimpleFiles::new();
411                let mut map = HashMap::new();
412
413                let file_id = files.add(e.document.path(), e.document.root().text().to_string());
414
415                let diagnostic =
416                    e.diagnostic
417                        .to_codespan(file_id)
418                        .with_labels_iter(e.backtrace.iter().map(|l| {
419                            let id = l.document.id();
420                            let file_id = *map.entry(id).or_insert_with(|| {
421                                files.add(l.document.path(), l.document.root().text().to_string())
422                            });
423
424                            Label {
425                                style: LabelStyle::Secondary,
426                                file_id,
427                                range: l.span.start()..l.span.end(),
428                                message: "called from this location".into(),
429                            }
430                        }));
431
432                let mut buffer = Buffer::no_color();
433                term::emit(&mut buffer, &Config::default(), &files, &diagnostic)
434                    .expect("failed to emit diagnostic");
435
436                String::from_utf8(buffer.into_inner()).expect("should be UTF-8")
437            }
438            Self::Other(e) => format!("{e:?}"),
439        }
440    }
441}
442
443impl From<anyhow::Error> for EvaluationError {
444    fn from(e: anyhow::Error) -> Self {
445        Self::Other(e)
446    }
447}
448
449/// Represents a result from evaluating a workflow or task.
450pub type EvaluationResult<T> = Result<T, EvaluationError>;
451
452/// Represents a path to a file or directory on the host file system or a URL to
453/// a remote file.
454///
455/// The host in this context is where the WDL evaluation is taking place.
456#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
457pub struct HostPath(pub(crate) Arc<String>);
458
459impl HostPath {
460    /// Constructs a new host path from a string.
461    pub fn new(path: impl Into<String>) -> Self {
462        Self(Arc::new(path.into()))
463    }
464
465    /// Gets the string representation of the host path.
466    pub fn as_str(&self) -> &str {
467        &self.0
468    }
469
470    /// Shell-expands the path.
471    ///
472    /// The path is also joined with the provided base directory.
473    pub fn expand(&self, base_dir: &EvaluationPath) -> Result<Self> {
474        // Shell-expand both paths and URLs
475        let shell_expanded = shellexpand::full(self.as_str()).with_context(|| {
476            format!("failed to shell-expand path `{path}`", path = self.as_str())
477        })?;
478
479        // But don't join URLs
480        if path::is_supported_url(&shell_expanded) {
481            Ok(Self::new(shell_expanded))
482        } else {
483            // `join()` handles both relative and absolute paths
484            Ok(Self::new(
485                base_dir.join(&shell_expanded)?.display().to_string(),
486            ))
487        }
488    }
489
490    /// Determines if the host path is relative.
491    pub fn is_relative(&self) -> bool {
492        !path::is_supported_url(&self.0) && Path::new(self.0.as_str()).is_relative()
493    }
494}
495
496impl fmt::Display for HostPath {
497    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498        self.0.fmt(f)
499    }
500}
501
502impl From<Arc<String>> for HostPath {
503    fn from(path: Arc<String>) -> Self {
504        Self(path)
505    }
506}
507
508impl From<HostPath> for Arc<String> {
509    fn from(path: HostPath) -> Self {
510        path.0
511    }
512}
513
514impl From<String> for HostPath {
515    fn from(s: String) -> Self {
516        Arc::new(s).into()
517    }
518}
519
520impl<'a> From<&'a str> for HostPath {
521    fn from(s: &'a str) -> Self {
522        s.to_string().into()
523    }
524}
525
526impl From<url::Url> for HostPath {
527    fn from(url: url::Url) -> Self {
528        url.as_str().into()
529    }
530}
531
532/// Represents a path to a file or directory on the guest.
533///
534/// The guest in this context is the container where tasks are run.
535///
536/// For backends that do not use containers, a guest path is the same as a host
537/// path.
538#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
539pub struct GuestPath(pub(crate) Arc<String>);
540
541impl GuestPath {
542    /// Constructs a new guest path from a string.
543    pub fn new(path: impl Into<String>) -> Self {
544        Self(Arc::new(path.into()))
545    }
546
547    /// Gets the string representation of the guest path.
548    pub fn as_str(&self) -> &str {
549        &self.0
550    }
551}
552
553impl fmt::Display for GuestPath {
554    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
555        self.0.fmt(f)
556    }
557}
558
559impl From<Arc<String>> for GuestPath {
560    fn from(path: Arc<String>) -> Self {
561        Self(path)
562    }
563}
564
565impl From<GuestPath> for Arc<String> {
566    fn from(path: GuestPath) -> Self {
567        path.0
568    }
569}
570
571/// Represents context to an expression evaluator.
572pub trait EvaluationContext: Send + Sync {
573    /// Gets the supported version of the document being evaluated.
574    fn version(&self) -> SupportedVersion;
575
576    /// Gets the value of the given name in scope.
577    fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic>;
578
579    /// Resolves a type name to a type.
580    fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic>;
581
582    /// Gets the base directory for the evaluation.
583    ///
584    /// The base directory is what paths are relative to.
585    ///
586    /// For workflow evaluation, the base directory is the document's directory.
587    ///
588    /// For task evaluation, the base directory is the document's directory or
589    /// the task's working directory if the `output` section is being evaluated.
590    fn base_dir(&self) -> &EvaluationPath;
591
592    /// Gets the temp directory for the evaluation.
593    fn temp_dir(&self) -> &Path;
594
595    /// Gets the value to return for a call to the `stdout` function.
596    ///
597    /// This returns `Some` only when evaluating a task's outputs section.
598    fn stdout(&self) -> Option<&Value> {
599        None
600    }
601
602    /// Gets the value to return for a call to the `stderr` function.
603    ///
604    /// This returns `Some` only when evaluating a task's outputs section.
605    fn stderr(&self) -> Option<&Value> {
606        None
607    }
608
609    /// Gets the task associated with the evaluation context.
610    ///
611    /// This returns `Some` only when evaluating a task's hints sections.
612    fn task(&self) -> Option<&Task> {
613        None
614    }
615
616    /// Gets the transferer to use for evaluating expressions.
617    fn transferer(&self) -> &dyn Transferer;
618
619    /// Gets a guest path representation of a host path.
620    ///
621    /// Returns `None` if there is no guest path representation of the host
622    /// path.
623    fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
624        let _ = path;
625        None
626    }
627
628    /// Gets a host path representation of a guest path.
629    ///
630    /// Returns `None` if there is no host path representation of the guest
631    /// path.
632    fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
633        let _ = path;
634        None
635    }
636
637    /// Notifies the context that a file was created as a result of a call to a
638    /// stdlib function.
639    ///
640    /// A context may map a guest path for the new host path.
641    fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
642        let _ = path;
643        Ok(())
644    }
645}
646
647/// Represents an index of a scope in a collection of scopes.
648#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
649pub struct ScopeIndex(usize);
650
651impl ScopeIndex {
652    /// Constructs a new scope index from a raw index.
653    pub const fn new(index: usize) -> Self {
654        Self(index)
655    }
656}
657
658impl From<usize> for ScopeIndex {
659    fn from(index: usize) -> Self {
660        Self(index)
661    }
662}
663
664impl From<ScopeIndex> for usize {
665    fn from(index: ScopeIndex) -> Self {
666        index.0
667    }
668}
669
670/// Represents an evaluation scope in a WDL document.
671#[derive(Default, Debug)]
672pub struct Scope {
673    /// The index of the parent scope.
674    ///
675    /// This is `None` for the root scopes.
676    parent: Option<ScopeIndex>,
677    /// The map of names in scope to their values.
678    names: IndexMap<String, Value>,
679}
680
681impl Scope {
682    /// Creates a new scope given the parent scope.
683    pub fn new(parent: ScopeIndex) -> Self {
684        Self {
685            parent: Some(parent),
686            names: Default::default(),
687        }
688    }
689
690    /// Inserts a name into the scope.
691    pub fn insert(&mut self, name: impl Into<String>, value: impl Into<Value>) {
692        let prev = self.names.insert(name.into(), value.into());
693        assert!(prev.is_none(), "conflicting name in scope");
694    }
695
696    /// Iterates over the local names and values in the scope.
697    pub fn local(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
698        self.names.iter().map(|(k, v)| (k.as_str(), v))
699    }
700
701    /// Gets a mutable reference to an existing name in scope.
702    pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut Value> {
703        self.names.get_mut(name)
704    }
705
706    /// Clears the scope.
707    pub(crate) fn clear(&mut self) {
708        self.parent = None;
709        self.names.clear();
710    }
711
712    /// Sets the scope's parent.
713    pub(crate) fn set_parent(&mut self, parent: ScopeIndex) {
714        self.parent = Some(parent);
715    }
716}
717
718impl From<Scope> for IndexMap<String, Value> {
719    fn from(scope: Scope) -> Self {
720        scope.names
721    }
722}
723
724/// Represents a reference to a scope.
725#[derive(Debug, Clone, Copy)]
726pub struct ScopeRef<'a> {
727    /// The reference to the scopes collection.
728    scopes: &'a [Scope],
729    /// The index of the scope in the collection.
730    index: ScopeIndex,
731}
732
733impl<'a> ScopeRef<'a> {
734    /// Creates a new scope reference given the scope index.
735    pub fn new(scopes: &'a [Scope], index: impl Into<ScopeIndex>) -> Self {
736        Self {
737            scopes,
738            index: index.into(),
739        }
740    }
741
742    /// Gets the parent scope.
743    ///
744    /// Returns `None` if there is no parent scope.
745    pub fn parent(&self) -> Option<Self> {
746        self.scopes[self.index.0].parent.map(|p| Self {
747            scopes: self.scopes,
748            index: p,
749        })
750    }
751
752    /// Gets all of the name and values available at this scope.
753    pub fn names(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
754        self.scopes[self.index.0]
755            .names
756            .iter()
757            .map(|(n, name)| (n.as_str(), name))
758    }
759
760    /// Iterates over each name and value visible to the scope and calls the
761    /// provided callback.
762    ///
763    /// Stops iterating and returns an error if the callback returns an error.
764    pub fn for_each(&self, mut cb: impl FnMut(&str, &Value) -> Result<()>) -> Result<()> {
765        let mut current = Some(self.index);
766
767        while let Some(index) = current {
768            for (n, v) in self.scopes[index.0].local() {
769                cb(n, v)?;
770            }
771
772            current = self.scopes[index.0].parent;
773        }
774
775        Ok(())
776    }
777
778    /// Gets the value of a name local to this scope.
779    ///
780    /// Returns `None` if a name local to this scope was not found.
781    pub fn local(&self, name: &str) -> Option<&Value> {
782        self.scopes[self.index.0].names.get(name)
783    }
784
785    /// Lookups a name in the scope.
786    ///
787    /// Returns `None` if the name is not available in the scope.
788    pub fn lookup(&self, name: &str) -> Option<&Value> {
789        let mut current = Some(self.index);
790
791        while let Some(index) = current {
792            if let Some(name) = self.scopes[index.0].names.get(name) {
793                return Some(name);
794            }
795
796            current = self.scopes[index.0].parent;
797        }
798
799        None
800    }
801}
802
803/// Represents an evaluated task.
804#[derive(Debug)]
805pub struct EvaluatedTask {
806    /// Whether or not the execution result was from the call cache.
807    cached: bool,
808    /// The task execution result.
809    result: TaskExecutionResult,
810    /// The evaluated outputs of the task.
811    ///
812    /// This is `Ok` when the task executes successfully and all of the task's
813    /// outputs evaluated without error.
814    ///
815    /// Otherwise, this contains the error that occurred while attempting to
816    /// evaluate the task's outputs.
817    outputs: EvaluationResult<Outputs>,
818}
819
820impl EvaluatedTask {
821    /// Constructs a new evaluated task.
822    fn new(cached: bool, result: TaskExecutionResult) -> Self {
823        Self {
824            cached,
825            result,
826            outputs: Ok(Default::default()),
827        }
828    }
829
830    /// Determines whether or not the task execution result was used from the
831    /// call cache.
832    pub fn cached(&self) -> bool {
833        self.cached
834    }
835
836    /// Gets the exit code of the evaluated task.
837    pub fn exit_code(&self) -> i32 {
838        self.result.exit_code
839    }
840
841    /// Gets the working directory of the evaluated task.
842    pub fn work_dir(&self) -> &EvaluationPath {
843        &self.result.work_dir
844    }
845
846    /// Gets the stdout value of the evaluated task.
847    pub fn stdout(&self) -> &Value {
848        &self.result.stdout
849    }
850
851    /// Gets the stderr value of the evaluated task.
852    pub fn stderr(&self) -> &Value {
853        &self.result.stderr
854    }
855
856    /// Converts the evaluated task into an evaluation result.
857    ///
858    /// Returns `Ok(_)` if the task outputs were evaluated.
859    ///
860    /// Returns `Err(_)` if the task outputs could not be evaluated.
861    pub fn into_result(self) -> EvaluationResult<Outputs> {
862        self.outputs
863    }
864
865    /// Handles the exit of a task execution.
866    ///
867    /// Returns an error if the task failed.
868    async fn handle_exit(
869        &self,
870        requirements: &HashMap<String, Value>,
871        transferer: &dyn Transferer,
872    ) -> anyhow::Result<()> {
873        let mut error = true;
874        if let Some(return_codes) = requirements
875            .get(TASK_REQUIREMENT_RETURN_CODES)
876            .or_else(|| requirements.get(TASK_REQUIREMENT_RETURN_CODES_ALIAS))
877        {
878            match return_codes {
879                Value::Primitive(PrimitiveValue::String(s)) if s.as_ref() == "*" => {
880                    error = false;
881                }
882                Value::Primitive(PrimitiveValue::String(s)) => {
883                    bail!(
884                        "invalid return code value `{s}`: only `*` is accepted when the return \
885                         code is specified as a string"
886                    );
887                }
888                Value::Primitive(PrimitiveValue::Integer(ok)) => {
889                    if self.result.exit_code == i32::try_from(*ok).unwrap_or_default() {
890                        error = false;
891                    }
892                }
893                Value::Compound(CompoundValue::Array(codes)) => {
894                    error = !codes.as_slice().iter().any(|v| {
895                        v.as_integer()
896                            .map(|i| i32::try_from(i).unwrap_or_default() == self.result.exit_code)
897                            .unwrap_or(false)
898                    });
899                }
900                _ => unreachable!("unexpected return codes value"),
901            }
902        } else {
903            error = self.result.exit_code != 0;
904        }
905
906        if error {
907            // Read the last `MAX_STDERR_LINES` number of lines from stderr
908            // If there's a problem reading stderr, don't output it
909            let stderr = download_file(
910                transferer,
911                self.work_dir(),
912                self.stderr().as_file().unwrap(),
913            )
914            .await
915            .ok()
916            .and_then(|l| {
917                fs::File::open(l).ok().map(|f| {
918                    // Buffer the last N number of lines
919                    let reader = RevBufReader::new(f);
920                    let lines: Vec<_> = reader
921                        .lines()
922                        .take(MAX_STDERR_LINES)
923                        .map_while(|l| l.ok())
924                        .collect();
925
926                    // Iterate the lines in reverse order as we read them in reverse
927                    lines
928                        .iter()
929                        .rev()
930                        .format_with("\n", |l, f| f(&format_args!("  {l}")))
931                        .to_string()
932                })
933            })
934            .unwrap_or_default();
935
936            // If the work directory is remote,
937            bail!(
938                "process terminated with exit code {code}: see `{stdout_path}` and \
939                 `{stderr_path}` for task output{header}{stderr}{trailer}",
940                code = self.result.exit_code,
941                stdout_path = self.stdout().as_file().expect("must be file"),
942                stderr_path = self.stderr().as_file().expect("must be file"),
943                header = if stderr.is_empty() {
944                    Cow::Borrowed("")
945                } else {
946                    format!("\n\ntask stderr output (last {MAX_STDERR_LINES} lines):\n\n").into()
947                },
948                trailer = if stderr.is_empty() { "" } else { "\n" }
949            );
950        }
951
952        Ok(())
953    }
954}
955
956/// Gets the kind of content.
957#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, IntoPrimitive)]
958#[repr(u8)]
959pub enum ContentKind {
960    /// The content is a single file.
961    File,
962    /// The content is a directory.
963    Directory,
964}
965
966impl Hashable for ContentKind {
967    fn hash(&self, hasher: &mut blake3::Hasher) {
968        hasher.update(&[(*self).into()]);
969    }
970}
971
972impl From<ContentKind> for crankshaft::engine::task::input::Type {
973    fn from(value: ContentKind) -> Self {
974        match value {
975            ContentKind::File => Self::File,
976            ContentKind::Directory => Self::Directory,
977        }
978    }
979}
980
981/// Represents a `File` or `Directory` input to a task.
982#[derive(Debug, Clone)]
983pub struct Input {
984    /// The content kind of the input.
985    kind: ContentKind,
986    /// The path for the input.
987    path: EvaluationPath,
988    /// The guest path for the input.
989    ///
990    /// This is `None` when the backend isn't mapping input paths.
991    guest_path: Option<GuestPath>,
992    /// The download location for the input.
993    ///
994    /// This is `Some` if the input has been downloaded to a known location.
995    location: Option<Location>,
996}
997
998impl Input {
999    /// Creates a new input with the given path and guest path.
1000    pub(crate) fn new(
1001        kind: ContentKind,
1002        path: EvaluationPath,
1003        guest_path: Option<GuestPath>,
1004    ) -> Self {
1005        Self {
1006            kind,
1007            path,
1008            guest_path,
1009            location: None,
1010        }
1011    }
1012
1013    /// Gets the content kind of the input.
1014    pub fn kind(&self) -> ContentKind {
1015        self.kind
1016    }
1017
1018    /// Gets the path to the input.
1019    ///
1020    /// The path of the input may be local or remote.
1021    pub fn path(&self) -> &EvaluationPath {
1022        &self.path
1023    }
1024
1025    /// Gets the guest path for the input.
1026    ///
1027    /// This is `None` for inputs to backends that don't use containers.
1028    pub fn guest_path(&self) -> Option<&GuestPath> {
1029        self.guest_path.as_ref()
1030    }
1031
1032    /// Gets the local path of the input.
1033    ///
1034    /// Returns `None` if the input is remote and has not been localized.
1035    pub fn local_path(&self) -> Option<&Path> {
1036        self.location.as_deref().or_else(|| self.path.as_local())
1037    }
1038
1039    /// Sets the location of the input.
1040    ///
1041    /// This is used during localization to set a local path for remote inputs.
1042    pub fn set_location(&mut self, location: Location) {
1043        self.location = Some(location);
1044    }
1045}
1046
1047#[cfg(test)]
1048mod test {
1049    use super::*;
1050
1051    #[test]
1052    fn cancellation_slow() {
1053        let context = CancellationContext::new(FailureMode::Slow);
1054        assert_eq!(context.state(), CancellationContextState::NotCanceled);
1055
1056        // The first cancel should not cancel the token
1057        assert_eq!(context.cancel(), CancellationContextState::Waiting);
1058        assert_eq!(context.state(), CancellationContextState::Waiting);
1059        assert!(context.user_canceled());
1060        assert!(!context.token.is_cancelled());
1061
1062        // The second cancel should cancel the token
1063        assert_eq!(context.cancel(), CancellationContextState::Canceling);
1064        assert_eq!(context.state(), CancellationContextState::Canceling);
1065        assert!(context.user_canceled());
1066        assert!(context.token.is_cancelled());
1067
1068        // Subsequent cancellations have no effect
1069        assert_eq!(context.cancel(), CancellationContextState::Canceling);
1070        assert_eq!(context.state(), CancellationContextState::Canceling);
1071        assert!(context.user_canceled());
1072        assert!(context.token.is_cancelled());
1073    }
1074
1075    #[test]
1076    fn cancellation_fast() {
1077        let context = CancellationContext::new(FailureMode::Fast);
1078        assert_eq!(context.state(), CancellationContextState::NotCanceled);
1079
1080        // Fail fast should immediately cancel the token
1081        assert_eq!(context.cancel(), CancellationContextState::Canceling);
1082        assert_eq!(context.state(), CancellationContextState::Canceling);
1083        assert!(context.user_canceled());
1084        assert!(context.token.is_cancelled());
1085
1086        // Subsequent cancellations have no effect
1087        assert_eq!(context.cancel(), CancellationContextState::Canceling);
1088        assert_eq!(context.state(), CancellationContextState::Canceling);
1089        assert!(context.user_canceled());
1090        assert!(context.token.is_cancelled());
1091    }
1092
1093    #[test]
1094    fn cancellation_error_slow() {
1095        let context = CancellationContext::new(FailureMode::Slow);
1096        assert_eq!(context.state(), CancellationContextState::NotCanceled);
1097
1098        // An error should not cancel the token
1099        context.error(&EvaluationError::Canceled);
1100        assert_eq!(context.state(), CancellationContextState::Waiting);
1101        assert!(!context.user_canceled());
1102        assert!(!context.token.is_cancelled());
1103
1104        // A repeated error should not cancel the token either
1105        context.error(&EvaluationError::Canceled);
1106        assert_eq!(context.state(), CancellationContextState::Waiting);
1107        assert!(!context.user_canceled());
1108        assert!(!context.token.is_cancelled());
1109
1110        // However, another cancellation will
1111        assert_eq!(context.cancel(), CancellationContextState::Canceling);
1112        assert_eq!(context.state(), CancellationContextState::Canceling);
1113        assert!(!context.user_canceled());
1114        assert!(context.token.is_cancelled());
1115    }
1116
1117    #[test]
1118    fn cancellation_error_fast() {
1119        let context = CancellationContext::new(FailureMode::Fast);
1120        assert_eq!(context.state(), CancellationContextState::NotCanceled);
1121
1122        // An error should cancel the context
1123        context.error(&EvaluationError::Canceled);
1124        assert_eq!(context.state(), CancellationContextState::Canceling);
1125        assert!(!context.user_canceled());
1126        assert!(context.token.is_cancelled());
1127
1128        // A repeated error should not change anything
1129        context.error(&EvaluationError::Canceled);
1130        assert_eq!(context.state(), CancellationContextState::Canceling);
1131        assert!(!context.user_canceled());
1132        assert!(context.token.is_cancelled());
1133
1134        // Neither should another `cancel` call
1135        assert_eq!(context.cancel(), CancellationContextState::Canceling);
1136        assert_eq!(context.state(), CancellationContextState::Canceling);
1137        assert!(!context.user_canceled());
1138        assert!(context.token.is_cancelled());
1139    }
1140}