1use std::borrow::Cow;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::atomic::AtomicU8;
7use std::sync::atomic::Ordering;
8
9use anyhow::Result;
10use cloud_copy::TransferEvent;
11use crankshaft::events::Event as CrankshaftEvent;
12use indexmap::IndexMap;
13use tokio::sync::broadcast;
14use tokio_util::sync::CancellationToken;
15use tracing::error;
16use wdl_analysis::Document;
17use wdl_analysis::document::Task;
18use wdl_analysis::types::Type;
19use wdl_ast::Diagnostic;
20use wdl_ast::Span;
21use wdl_ast::SupportedVersion;
22
23use crate::EvaluationPath;
24use crate::GuestPath;
25use crate::HostPath;
26use crate::Outputs;
27use crate::Value;
28use crate::backend::TaskExecutionResult;
29use crate::config::FailureMode;
30use crate::http::Transferer;
31
32mod trie;
33pub mod v1;
34
35const ROOT_NAME: &str = ".root";
39
40const CANCELLATION_STATE_NOT_CANCELED: u8 = 0;
42
43const CANCELLATION_STATE_WAITING: u8 = 1;
48
49const CANCELLATION_STATE_CANCELING: u8 = 2;
53
54const CANCELLATION_STATE_ERROR: u8 = 4;
59
60const CANCELLATION_STATE_MASK: u8 = 0x3;
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum CancellationContextState {
66 NotCanceled,
68 Waiting,
71 Canceling,
74}
75
76impl CancellationContextState {
77 fn get(state: &Arc<AtomicU8>) -> Self {
79 match state.load(Ordering::SeqCst) & CANCELLATION_STATE_MASK {
80 CANCELLATION_STATE_NOT_CANCELED => Self::NotCanceled,
81 CANCELLATION_STATE_WAITING => Self::Waiting,
82 CANCELLATION_STATE_CANCELING => Self::Canceling,
83 _ => unreachable!("unexpected cancellation context state"),
84 }
85 }
86
87 fn update(mode: FailureMode, error: bool, state: &Arc<AtomicU8>) -> Option<Self> {
92 let previous_state = state
94 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
95 if error && state != CANCELLATION_STATE_NOT_CANCELED {
97 return None;
98 }
99
100 let mut new_state = match state & CANCELLATION_STATE_MASK {
102 CANCELLATION_STATE_NOT_CANCELED => match mode {
103 FailureMode::Slow => CANCELLATION_STATE_WAITING,
104 FailureMode::Fast => CANCELLATION_STATE_CANCELING,
105 },
106 CANCELLATION_STATE_WAITING => CANCELLATION_STATE_CANCELING,
107 CANCELLATION_STATE_CANCELING => CANCELLATION_STATE_CANCELING,
108 _ => unreachable!("unexpected cancellation context state"),
109 };
110
111 if error {
113 new_state |= CANCELLATION_STATE_ERROR;
114 }
115
116 Some(new_state | (state & CANCELLATION_STATE_ERROR))
118 })
119 .ok()?;
120
121 match previous_state & CANCELLATION_STATE_MASK {
122 CANCELLATION_STATE_NOT_CANCELED => match mode {
123 FailureMode::Slow => Some(Self::Waiting),
124 FailureMode::Fast => Some(Self::Canceling),
125 },
126 CANCELLATION_STATE_WAITING => Some(Self::Canceling),
127 CANCELLATION_STATE_CANCELING => Some(Self::Canceling),
128 _ => unreachable!("unexpected cancellation context state"),
129 }
130 }
131}
132
133#[derive(Debug, Clone)]
137pub struct CancellationContext {
138 mode: FailureMode,
140 state: Arc<AtomicU8>,
142 first: CancellationToken,
144 second: CancellationToken,
148}
149
150impl CancellationContext {
151 pub fn new(mode: FailureMode) -> Self {
161 Self {
162 mode,
163 state: Arc::new(CANCELLATION_STATE_NOT_CANCELED.into()),
164 first: CancellationToken::new(),
165 second: CancellationToken::new(),
166 }
167 }
168
169 pub fn state(&self) -> CancellationContextState {
171 CancellationContextState::get(&self.state)
172 }
173
174 #[must_use]
182 pub fn cancel(&self) -> CancellationContextState {
183 let state =
184 CancellationContextState::update(self.mode, false, &self.state).expect("should update");
185
186 match state {
187 CancellationContextState::NotCanceled => panic!("should be canceled"),
188 CancellationContextState::Waiting => self.first.cancel(),
189 CancellationContextState::Canceling => {
190 self.first.cancel();
191 self.second.cancel();
192 }
193 }
194
195 state
196 }
197
198 pub fn first(&self) -> CancellationToken {
208 self.first.clone()
209 }
210
211 pub fn second(&self) -> CancellationToken {
222 self.second.clone()
223 }
224
225 pub(crate) fn user_canceled(&self) -> bool {
227 let state = self.state.load(Ordering::SeqCst);
228 state != CANCELLATION_STATE_NOT_CANCELED && (state & CANCELLATION_STATE_ERROR == 0)
229 }
230
231 pub(crate) fn error(&self, error: &EvaluationError) {
238 if let Some(state) = CancellationContextState::update(self.mode, true, &self.state) {
239 let message: Cow<'_, str> = match error {
240 EvaluationError::Canceled => "evaluation was canceled".into(),
241 EvaluationError::Source(e) => e.diagnostic.message().into(),
242 EvaluationError::Other(e) => format!("{e:#}").into(),
243 };
244
245 match state {
246 CancellationContextState::NotCanceled => unreachable!("should be canceled"),
247 CancellationContextState::Waiting => {
248 self.first.cancel();
249
250 error!(
251 "an evaluation error occurred: waiting for any executing tasks to \
252 complete: {message}"
253 );
254 }
255 CancellationContextState::Canceling => {
256 self.first.cancel();
257 self.second.cancel();
258
259 error!(
260 "an evaluation error occurred: waiting for any executing tasks to cancel: \
261 {message}"
262 );
263 }
264 }
265 }
266 }
267}
268
269impl Default for CancellationContext {
270 fn default() -> Self {
271 Self::new(FailureMode::Slow)
272 }
273}
274
275#[derive(Debug, Clone)]
277pub enum EngineEvent {
278 ReusedCachedExecutionResult {
280 id: String,
282 },
283 TaskParked,
286 TaskUnparked {
288 canceled: bool,
290 },
291}
292
293#[derive(Debug, Clone, Default)]
295pub struct Events {
296 engine: Option<broadcast::Sender<EngineEvent>>,
300 crankshaft: Option<broadcast::Sender<CrankshaftEvent>>,
304 transfer: Option<broadcast::Sender<TransferEvent>>,
308}
309
310impl Events {
311 pub fn new(capacity: usize) -> Self {
313 Self {
314 engine: Some(broadcast::Sender::new(capacity)),
315 crankshaft: Some(broadcast::Sender::new(capacity)),
316 transfer: Some(broadcast::Sender::new(capacity)),
317 }
318 }
319
320 pub fn disabled() -> Self {
322 Self::default()
323 }
324
325 pub fn subscribe_engine(&self) -> Option<broadcast::Receiver<EngineEvent>> {
329 self.engine.as_ref().map(|s| s.subscribe())
330 }
331
332 pub fn subscribe_crankshaft(&self) -> Option<broadcast::Receiver<CrankshaftEvent>> {
336 self.crankshaft.as_ref().map(|s| s.subscribe())
337 }
338
339 pub fn subscribe_transfer(&self) -> Option<broadcast::Receiver<TransferEvent>> {
343 self.transfer.as_ref().map(|s| s.subscribe())
344 }
345
346 pub(crate) fn engine(&self) -> &Option<broadcast::Sender<EngineEvent>> {
348 &self.engine
349 }
350
351 pub(crate) fn crankshaft(&self) -> &Option<broadcast::Sender<CrankshaftEvent>> {
353 &self.crankshaft
354 }
355
356 pub(crate) fn transfer(&self) -> &Option<broadcast::Sender<TransferEvent>> {
358 &self.transfer
359 }
360}
361
362#[derive(Debug, Clone)]
364pub struct CallLocation {
365 pub document: Document,
367 pub span: Span,
369}
370
371#[derive(Debug)]
373pub struct SourceError {
374 pub document: Document,
376 pub diagnostic: Diagnostic,
378 pub backtrace: Vec<CallLocation>,
385}
386
387#[derive(Debug)]
389pub enum EvaluationError {
390 Canceled,
392 Source(Box<SourceError>),
394 Other(anyhow::Error),
396}
397
398impl EvaluationError {
399 pub fn new(document: Document, diagnostic: Diagnostic) -> Self {
401 Self::Source(Box::new(SourceError {
402 document,
403 diagnostic,
404 backtrace: Default::default(),
405 }))
406 }
407
408 #[allow(clippy::inherent_to_string)]
410 pub fn to_string(&self) -> String {
411 use std::collections::HashMap;
412
413 use codespan_reporting::diagnostic::Label;
414 use codespan_reporting::diagnostic::LabelStyle;
415 use codespan_reporting::files::SimpleFiles;
416 use codespan_reporting::term::Config;
417 use codespan_reporting::term::termcolor::Buffer;
418 use codespan_reporting::term::{self};
419 use wdl_ast::AstNode;
420
421 match self {
422 Self::Canceled => "evaluation was canceled".to_string(),
423 Self::Source(e) => {
424 let mut files = SimpleFiles::new();
425 let mut map = HashMap::new();
426
427 let file_id = files.add(e.document.path(), e.document.root().text().to_string());
428
429 let diagnostic =
430 e.diagnostic
431 .to_codespan(file_id)
432 .with_labels_iter(e.backtrace.iter().map(|l| {
433 let id = l.document.id();
434 let file_id = *map.entry(id).or_insert_with(|| {
435 files.add(l.document.path(), l.document.root().text().to_string())
436 });
437
438 Label {
439 style: LabelStyle::Secondary,
440 file_id,
441 range: l.span.start()..l.span.end(),
442 message: "called from this location".into(),
443 }
444 }));
445
446 let mut buffer = Buffer::no_color();
447 term::emit(&mut buffer, &Config::default(), &files, &diagnostic)
448 .expect("failed to emit diagnostic");
449
450 String::from_utf8(buffer.into_inner()).expect("should be UTF-8")
451 }
452 Self::Other(e) => format!("{e:?}"),
453 }
454 }
455}
456
457impl From<anyhow::Error> for EvaluationError {
458 fn from(e: anyhow::Error) -> Self {
459 Self::Other(e)
460 }
461}
462
463pub type EvaluationResult<T> = Result<T, EvaluationError>;
465
466pub(crate) trait EvaluationContext: Send + Sync {
468 fn version(&self) -> SupportedVersion;
470
471 fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic>;
473
474 fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic>;
476
477 fn enum_variant_value(&self, enum_name: &str, variant_name: &str) -> Result<Value, Diagnostic>;
479
480 fn base_dir(&self) -> &EvaluationPath;
489
490 fn temp_dir(&self) -> &Path;
492
493 fn stdout(&self) -> Option<&Value> {
497 None
498 }
499
500 fn stderr(&self) -> Option<&Value> {
504 None
505 }
506
507 fn task(&self) -> Option<&Task> {
511 None
512 }
513
514 fn transferer(&self) -> &dyn Transferer;
516
517 fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
522 let _ = path;
523 None
524 }
525
526 fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
531 let _ = path;
532 None
533 }
534
535 fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
540 let _ = path;
541 Ok(())
542 }
543}
544
545#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
547struct ScopeIndex(usize);
548
549impl ScopeIndex {
550 pub const fn new(index: usize) -> Self {
552 Self(index)
553 }
554}
555
556impl From<usize> for ScopeIndex {
557 fn from(index: usize) -> Self {
558 Self(index)
559 }
560}
561
562impl From<ScopeIndex> for usize {
563 fn from(index: ScopeIndex) -> Self {
564 index.0
565 }
566}
567
568#[derive(Default, Debug)]
570struct Scope {
571 parent: Option<ScopeIndex>,
575 names: IndexMap<String, Value>,
577}
578
579impl Scope {
580 pub fn new(parent: ScopeIndex) -> Self {
582 Self {
583 parent: Some(parent),
584 names: Default::default(),
585 }
586 }
587
588 pub fn insert(&mut self, name: impl Into<String>, value: impl Into<Value>) {
590 let prev = self.names.insert(name.into(), value.into());
591 assert!(prev.is_none(), "conflicting name in scope");
592 }
593
594 pub fn local(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
596 self.names.iter().map(|(k, v)| (k.as_str(), v))
597 }
598
599 pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut Value> {
601 self.names.get_mut(name)
602 }
603
604 pub(crate) fn clear(&mut self) {
606 self.parent = None;
607 self.names.clear();
608 }
609
610 pub(crate) fn set_parent(&mut self, parent: ScopeIndex) {
612 self.parent = Some(parent);
613 }
614}
615
616impl From<Scope> for IndexMap<String, Value> {
617 fn from(scope: Scope) -> Self {
618 scope.names
619 }
620}
621
622impl From<Scope> for Outputs {
623 fn from(scope: Scope) -> Self {
624 scope.names.into()
625 }
626}
627
628#[derive(Debug, Clone, Copy)]
630struct ScopeRef<'a> {
631 scopes: &'a [Scope],
633 index: ScopeIndex,
635}
636
637impl<'a> ScopeRef<'a> {
638 pub fn new(scopes: &'a [Scope], index: impl Into<ScopeIndex>) -> Self {
640 Self {
641 scopes,
642 index: index.into(),
643 }
644 }
645
646 pub fn parent(&self) -> Option<Self> {
650 self.scopes[self.index.0].parent.map(|p| Self {
651 scopes: self.scopes,
652 index: p,
653 })
654 }
655
656 pub fn local(&self, name: &str) -> Option<&Value> {
660 self.scopes[self.index.0].names.get(name)
661 }
662
663 pub fn lookup(&self, name: &str) -> Option<&Value> {
667 let mut current = Some(self.index);
668
669 while let Some(index) = current {
670 if let Some(name) = self.scopes[index.0].names.get(name) {
671 return Some(name);
672 }
673
674 current = self.scopes[index.0].parent;
675 }
676
677 None
678 }
679}
680
681#[derive(Debug)]
689pub struct EvaluatedTask {
690 result: TaskExecutionResult,
692 outputs: Outputs,
694 error: Option<EvaluationError>,
698 cached: bool,
700}
701
702impl EvaluatedTask {
703 fn new(cached: bool, result: TaskExecutionResult, error: Option<EvaluationError>) -> Self {
705 Self {
706 result,
707 outputs: Default::default(),
708 error,
709 cached,
710 }
711 }
712
713 pub fn failed(&self) -> bool {
716 self.error.is_some()
717 }
718
719 pub fn cached(&self) -> bool {
722 self.cached
723 }
724
725 pub fn exit_code(&self) -> i32 {
727 self.result.exit_code
728 }
729
730 pub fn work_dir(&self) -> &EvaluationPath {
732 &self.result.work_dir
733 }
734
735 pub fn stdout(&self) -> &Value {
737 &self.result.stdout
738 }
739
740 pub fn stderr(&self) -> &Value {
742 &self.result.stderr
743 }
744
745 pub fn into_outputs(self) -> EvaluationResult<Outputs> {
750 match self.error {
751 Some(e) => Err(e),
752 None => Ok(self.outputs),
753 }
754 }
755}
756
757#[cfg(test)]
758mod test {
759 use super::*;
760
761 #[test]
762 fn cancellation_slow() {
763 let context = CancellationContext::new(FailureMode::Slow);
764 assert_eq!(context.state(), CancellationContextState::NotCanceled);
765
766 assert_eq!(context.cancel(), CancellationContextState::Waiting);
768 assert_eq!(context.state(), CancellationContextState::Waiting);
769 assert!(context.user_canceled());
770 assert!(context.first.is_cancelled());
771 assert!(!context.second.is_cancelled());
772
773 assert_eq!(context.cancel(), CancellationContextState::Canceling);
775 assert_eq!(context.state(), CancellationContextState::Canceling);
776 assert!(context.user_canceled());
777 assert!(context.first.is_cancelled());
778 assert!(context.second.is_cancelled());
779
780 assert_eq!(context.cancel(), CancellationContextState::Canceling);
782 assert_eq!(context.state(), CancellationContextState::Canceling);
783 assert!(context.user_canceled());
784 assert!(context.first.is_cancelled());
785 assert!(context.second.is_cancelled());
786 }
787
788 #[test]
789 fn cancellation_fast() {
790 let context = CancellationContext::new(FailureMode::Fast);
791 assert_eq!(context.state(), CancellationContextState::NotCanceled);
792
793 assert_eq!(context.cancel(), CancellationContextState::Canceling);
795 assert_eq!(context.state(), CancellationContextState::Canceling);
796 assert!(context.user_canceled());
797 assert!(context.first.is_cancelled());
798 assert!(context.second.is_cancelled());
799
800 assert_eq!(context.cancel(), CancellationContextState::Canceling);
802 assert_eq!(context.state(), CancellationContextState::Canceling);
803 assert!(context.user_canceled());
804 assert!(context.first.is_cancelled());
805 assert!(context.second.is_cancelled());
806 }
807
808 #[test]
809 fn cancellation_error_slow() {
810 let context = CancellationContext::new(FailureMode::Slow);
811 assert_eq!(context.state(), CancellationContextState::NotCanceled);
812
813 context.error(&EvaluationError::Canceled);
815 assert_eq!(context.state(), CancellationContextState::Waiting);
816 assert!(!context.user_canceled());
817 assert!(context.first.is_cancelled());
818 assert!(!context.second.is_cancelled());
819
820 context.error(&EvaluationError::Canceled);
822 assert_eq!(context.state(), CancellationContextState::Waiting);
823 assert!(!context.user_canceled());
824 assert!(context.first.is_cancelled());
825 assert!(!context.second.is_cancelled());
826
827 assert_eq!(context.cancel(), CancellationContextState::Canceling);
829 assert_eq!(context.state(), CancellationContextState::Canceling);
830 assert!(!context.user_canceled());
831 assert!(context.first.is_cancelled());
832 assert!(context.second.is_cancelled());
833 }
834
835 #[test]
836 fn cancellation_error_fast() {
837 let context = CancellationContext::new(FailureMode::Fast);
838 assert_eq!(context.state(), CancellationContextState::NotCanceled);
839
840 context.error(&EvaluationError::Canceled);
842 assert_eq!(context.state(), CancellationContextState::Canceling);
843 assert!(!context.user_canceled());
844 assert!(context.first.is_cancelled());
845 assert!(context.second.is_cancelled());
846
847 context.error(&EvaluationError::Canceled);
849 assert_eq!(context.state(), CancellationContextState::Canceling);
850 assert!(!context.user_canceled());
851 assert!(context.first.is_cancelled());
852 assert!(context.second.is_cancelled());
853
854 assert_eq!(context.cancel(), CancellationContextState::Canceling);
856 assert_eq!(context.state(), CancellationContextState::Canceling);
857 assert!(!context.user_canceled());
858 assert!(context.first.is_cancelled());
859 assert!(context.second.is_cancelled());
860 }
861}