1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fmt;
6use std::fs;
7use std::io::BufRead;
8use std::path::Path;
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use anyhow::Context;
13use anyhow::Result;
14use anyhow::bail;
15use cloud_copy::TransferEvent;
16use crankshaft::events::Event as CrankshaftEvent;
17use indexmap::IndexMap;
18use itertools::Itertools;
19use rev_buf_reader::RevBufReader;
20use tokio::sync::broadcast;
21use wdl_analysis::Document;
22use wdl_analysis::document::Task;
23use wdl_analysis::types::Type;
24use wdl_ast::Diagnostic;
25use wdl_ast::Span;
26use wdl_ast::SupportedVersion;
27use wdl_ast::v1::TASK_REQUIREMENT_RETURN_CODES;
28use wdl_ast::v1::TASK_REQUIREMENT_RETURN_CODES_ALIAS;
29
30use crate::CompoundValue;
31use crate::Outputs;
32use crate::PrimitiveValue;
33use crate::TaskExecutionResult;
34use crate::Value;
35use crate::http::Location;
36use crate::http::Transferer;
37use crate::path;
38use crate::path::EvaluationPath;
39use crate::stdlib::download_file;
40
41pub mod trie;
42pub mod v1;
43
44const MAX_STDERR_LINES: usize = 10;
46
47const ROOT_NAME: &str = ".root";
51
52#[derive(Debug, Clone, Default)]
54pub struct Events {
55 crankshaft: Option<broadcast::Sender<CrankshaftEvent>>,
59 transfer: Option<broadcast::Sender<TransferEvent>>,
63}
64
65impl Events {
66 pub fn all(capacity: usize) -> Self {
68 Self {
69 crankshaft: Some(broadcast::Sender::new(capacity)),
70 transfer: Some(broadcast::Sender::new(capacity)),
71 }
72 }
73
74 pub fn none() -> Self {
76 Self::default()
77 }
78
79 pub fn crankshaft_only(capacity: usize) -> Self {
82 Self {
83 crankshaft: Some(broadcast::Sender::new(capacity)),
84 transfer: None,
85 }
86 }
87
88 pub fn transfer_only(capacity: usize) -> Self {
91 Self {
92 crankshaft: None,
93 transfer: Some(broadcast::Sender::new(capacity)),
94 }
95 }
96
97 pub fn subscribe_crankshaft(&self) -> Option<broadcast::Receiver<CrankshaftEvent>> {
101 self.crankshaft.as_ref().map(|s| s.subscribe())
102 }
103
104 pub fn subscribe_transfer(&self) -> Option<broadcast::Receiver<TransferEvent>> {
108 self.transfer.as_ref().map(|s| s.subscribe())
109 }
110
111 pub(crate) fn crankshaft(&self) -> &Option<broadcast::Sender<CrankshaftEvent>> {
113 &self.crankshaft
114 }
115
116 pub(crate) fn transfer(&self) -> &Option<broadcast::Sender<TransferEvent>> {
118 &self.transfer
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct CallLocation {
125 pub document: Document,
127 pub span: Span,
129}
130
131#[derive(Debug)]
133pub struct SourceError {
134 pub document: Document,
136 pub diagnostic: Diagnostic,
138 pub backtrace: Vec<CallLocation>,
145}
146
147#[derive(Debug)]
149pub enum EvaluationError {
150 Source(Box<SourceError>),
152 Other(anyhow::Error),
154}
155
156impl EvaluationError {
157 pub fn new(document: Document, diagnostic: Diagnostic) -> Self {
159 Self::Source(Box::new(SourceError {
160 document,
161 diagnostic,
162 backtrace: Default::default(),
163 }))
164 }
165
166 #[cfg(feature = "codespan-reporting")]
168 #[allow(clippy::inherent_to_string)]
169 pub fn to_string(&self) -> String {
170 use codespan_reporting::diagnostic::Label;
171 use codespan_reporting::diagnostic::LabelStyle;
172 use codespan_reporting::files::SimpleFiles;
173 use codespan_reporting::term::Config;
174 use codespan_reporting::term::termcolor::Buffer;
175 use codespan_reporting::term::{self};
176 use wdl_ast::AstNode;
177
178 match self {
179 Self::Source(e) => {
180 let mut files = SimpleFiles::new();
181 let mut map = HashMap::new();
182
183 let file_id = files.add(e.document.path(), e.document.root().text().to_string());
184
185 let diagnostic =
186 e.diagnostic
187 .to_codespan(file_id)
188 .with_labels_iter(e.backtrace.iter().map(|l| {
189 let id = l.document.id();
190 let file_id = *map.entry(id).or_insert_with(|| {
191 files.add(l.document.path(), l.document.root().text().to_string())
192 });
193
194 Label {
195 style: LabelStyle::Secondary,
196 file_id,
197 range: l.span.start()..l.span.end(),
198 message: "called from this location".into(),
199 }
200 }));
201
202 let mut buffer = Buffer::no_color();
203 term::emit(&mut buffer, &Config::default(), &files, &diagnostic)
204 .expect("failed to emit diagnostic");
205
206 String::from_utf8(buffer.into_inner()).expect("should be UTF-8")
207 }
208 Self::Other(e) => format!("{e:?}"),
209 }
210 }
211}
212
213impl From<anyhow::Error> for EvaluationError {
214 fn from(e: anyhow::Error) -> Self {
215 Self::Other(e)
216 }
217}
218
219pub type EvaluationResult<T> = Result<T, EvaluationError>;
221
222#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
227pub struct HostPath(pub(crate) Arc<String>);
228
229impl HostPath {
230 pub fn new(path: impl Into<String>) -> Self {
232 Self(Arc::new(path.into()))
233 }
234
235 pub fn as_str(&self) -> &str {
237 &self.0
238 }
239
240 pub fn expand(&mut self, base_dir: &EvaluationPath) -> Result<()> {
244 if let Cow::Owned(s) = shellexpand::full(self.as_str()).with_context(|| {
246 format!("failed to shell expand path `{path}`", path = self.as_str())
247 })? {
248 *Arc::make_mut(&mut self.0) = s;
249 }
250
251 if path::is_url(self.as_str()) {
253 return Ok(());
254 }
255
256 if let Some(s) = base_dir.join(self.as_str())?.to_str() {
258 *Arc::make_mut(&mut self.0) = s.to_string();
259 }
260
261 Ok(())
262 }
263}
264
265impl fmt::Display for HostPath {
266 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267 self.0.fmt(f)
268 }
269}
270
271impl From<Arc<String>> for HostPath {
272 fn from(path: Arc<String>) -> Self {
273 Self(path)
274 }
275}
276
277impl From<HostPath> for Arc<String> {
278 fn from(path: HostPath) -> Self {
279 path.0
280 }
281}
282
283#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
290pub struct GuestPath(pub(crate) Arc<String>);
291
292impl GuestPath {
293 pub fn new(path: impl Into<String>) -> Self {
295 Self(Arc::new(path.into()))
296 }
297
298 pub fn as_str(&self) -> &str {
300 &self.0
301 }
302}
303
304impl fmt::Display for GuestPath {
305 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306 self.0.fmt(f)
307 }
308}
309
310impl From<Arc<String>> for GuestPath {
311 fn from(path: Arc<String>) -> Self {
312 Self(path)
313 }
314}
315
316impl From<GuestPath> for Arc<String> {
317 fn from(path: GuestPath) -> Self {
318 path.0
319 }
320}
321
322pub trait EvaluationContext: Send + Sync {
324 fn version(&self) -> SupportedVersion;
326
327 fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic>;
329
330 fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic>;
332
333 fn base_dir(&self) -> &EvaluationPath;
342
343 fn temp_dir(&self) -> &Path;
345
346 fn stdout(&self) -> Option<&Value> {
350 None
351 }
352
353 fn stderr(&self) -> Option<&Value> {
357 None
358 }
359
360 fn task(&self) -> Option<&Task> {
364 None
365 }
366
367 fn transferer(&self) -> &dyn Transferer;
369
370 fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
375 let _ = path;
376 None
377 }
378
379 fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
384 let _ = path;
385 None
386 }
387
388 fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
393 let _ = path;
394 Ok(())
395 }
396}
397
398#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
400pub struct ScopeIndex(usize);
401
402impl ScopeIndex {
403 pub const fn new(index: usize) -> Self {
405 Self(index)
406 }
407}
408
409impl From<usize> for ScopeIndex {
410 fn from(index: usize) -> Self {
411 Self(index)
412 }
413}
414
415impl From<ScopeIndex> for usize {
416 fn from(index: ScopeIndex) -> Self {
417 index.0
418 }
419}
420
421#[derive(Default, Debug)]
423pub struct Scope {
424 parent: Option<ScopeIndex>,
428 names: IndexMap<String, Value>,
430}
431
432impl Scope {
433 pub fn new(parent: ScopeIndex) -> Self {
435 Self {
436 parent: Some(parent),
437 names: Default::default(),
438 }
439 }
440
441 pub fn insert(&mut self, name: impl Into<String>, value: impl Into<Value>) {
443 let prev = self.names.insert(name.into(), value.into());
444 assert!(prev.is_none(), "conflicting name in scope");
445 }
446
447 pub fn local(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
449 self.names.iter().map(|(k, v)| (k.as_str(), v))
450 }
451
452 pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut Value> {
454 self.names.get_mut(name)
455 }
456
457 pub(crate) fn clear(&mut self) {
459 self.parent = None;
460 self.names.clear();
461 }
462
463 pub(crate) fn set_parent(&mut self, parent: ScopeIndex) {
465 self.parent = Some(parent);
466 }
467}
468
469impl From<Scope> for IndexMap<String, Value> {
470 fn from(scope: Scope) -> Self {
471 scope.names
472 }
473}
474
475#[derive(Debug, Clone, Copy)]
477pub struct ScopeRef<'a> {
478 scopes: &'a [Scope],
480 index: ScopeIndex,
482}
483
484impl<'a> ScopeRef<'a> {
485 pub fn new(scopes: &'a [Scope], index: impl Into<ScopeIndex>) -> Self {
487 Self {
488 scopes,
489 index: index.into(),
490 }
491 }
492
493 pub fn parent(&self) -> Option<Self> {
497 self.scopes[self.index.0].parent.map(|p| Self {
498 scopes: self.scopes,
499 index: p,
500 })
501 }
502
503 pub fn names(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
505 self.scopes[self.index.0]
506 .names
507 .iter()
508 .map(|(n, name)| (n.as_str(), name))
509 }
510
511 pub fn for_each(&self, mut cb: impl FnMut(&str, &Value) -> Result<()>) -> Result<()> {
516 let mut current = Some(self.index);
517
518 while let Some(index) = current {
519 for (n, v) in self.scopes[index.0].local() {
520 cb(n, v)?;
521 }
522
523 current = self.scopes[index.0].parent;
524 }
525
526 Ok(())
527 }
528
529 pub fn local(&self, name: &str) -> Option<&Value> {
533 self.scopes[self.index.0].names.get(name)
534 }
535
536 pub fn lookup(&self, name: &str) -> Option<&Value> {
540 let mut current = Some(self.index);
541
542 while let Some(index) = current {
543 if let Some(name) = self.scopes[index.0].names.get(name) {
544 return Some(name);
545 }
546
547 current = self.scopes[index.0].parent;
548 }
549
550 None
551 }
552}
553
554#[derive(Debug)]
556pub struct EvaluatedTask {
557 attempt_dir: PathBuf,
559 result: TaskExecutionResult,
561 outputs: EvaluationResult<Outputs>,
569}
570
571impl EvaluatedTask {
572 fn new(attempt_dir: PathBuf, result: TaskExecutionResult) -> anyhow::Result<Self> {
576 Ok(Self {
577 result,
578 attempt_dir,
579 outputs: Ok(Default::default()),
580 })
581 }
582
583 pub fn exit_code(&self) -> i32 {
585 self.result.exit_code
586 }
587
588 pub fn attempt_dir(&self) -> &Path {
590 &self.attempt_dir
591 }
592
593 pub fn work_dir(&self) -> &EvaluationPath {
595 &self.result.work_dir
596 }
597
598 pub fn stdout(&self) -> &Value {
600 &self.result.stdout
601 }
602
603 pub fn stderr(&self) -> &Value {
605 &self.result.stderr
606 }
607
608 pub fn outputs(&self) -> &EvaluationResult<Outputs> {
616 &self.outputs
617 }
618
619 pub fn into_result(self) -> EvaluationResult<Outputs> {
625 self.outputs
626 }
627
628 async fn handle_exit(
632 &self,
633 requirements: &HashMap<String, Value>,
634 transferer: &dyn Transferer,
635 ) -> anyhow::Result<()> {
636 let mut error = true;
637 if let Some(return_codes) = requirements
638 .get(TASK_REQUIREMENT_RETURN_CODES)
639 .or_else(|| requirements.get(TASK_REQUIREMENT_RETURN_CODES_ALIAS))
640 {
641 match return_codes {
642 Value::Primitive(PrimitiveValue::String(s)) if s.as_ref() == "*" => {
643 error = false;
644 }
645 Value::Primitive(PrimitiveValue::String(s)) => {
646 bail!(
647 "invalid return code value `{s}`: only `*` is accepted when the return \
648 code is specified as a string"
649 );
650 }
651 Value::Primitive(PrimitiveValue::Integer(ok)) => {
652 if self.result.exit_code == i32::try_from(*ok).unwrap_or_default() {
653 error = false;
654 }
655 }
656 Value::Compound(CompoundValue::Array(codes)) => {
657 error = !codes.as_slice().iter().any(|v| {
658 v.as_integer()
659 .map(|i| i32::try_from(i).unwrap_or_default() == self.result.exit_code)
660 .unwrap_or(false)
661 });
662 }
663 _ => unreachable!("unexpected return codes value"),
664 }
665 } else {
666 error = self.result.exit_code != 0;
667 }
668
669 if error {
670 let stderr = download_file(
673 transferer,
674 self.work_dir(),
675 self.stderr().as_file().unwrap(),
676 )
677 .await
678 .ok()
679 .and_then(|l| {
680 fs::File::open(l).ok().map(|f| {
681 let reader = RevBufReader::new(f);
683 let lines: Vec<_> = reader
684 .lines()
685 .take(MAX_STDERR_LINES)
686 .map_while(|l| l.ok())
687 .collect();
688
689 lines
691 .iter()
692 .rev()
693 .format_with("\n", |l, f| f(&format_args!(" {l}")))
694 .to_string()
695 })
696 })
697 .unwrap_or_default();
698
699 bail!(
701 "process terminated with exit code {code}: see `{stdout_path}` and \
702 `{stderr_path}` for task output and the related files in \
703 `{dir}`{header}{stderr}{trailer}",
704 code = self.result.exit_code,
705 dir = self.attempt_dir().display(),
706 stdout_path = self.stdout().as_file().expect("must be file"),
707 stderr_path = self.stderr().as_file().expect("must be file"),
708 header = if stderr.is_empty() {
709 Cow::Borrowed("")
710 } else {
711 format!("\n\ntask stderr output (last {MAX_STDERR_LINES} lines):\n\n").into()
712 },
713 trailer = if stderr.is_empty() { "" } else { "\n" }
714 );
715 }
716
717 Ok(())
718 }
719}
720
721#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
723pub enum InputKind {
724 File,
726 Directory,
728}
729
730impl From<InputKind> for crankshaft::engine::task::input::Type {
731 fn from(value: InputKind) -> Self {
732 match value {
733 InputKind::File => Self::File,
734 InputKind::Directory => Self::Directory,
735 }
736 }
737}
738
739#[derive(Debug, Clone)]
741pub struct Input {
742 kind: InputKind,
744 path: EvaluationPath,
746 guest_path: Option<GuestPath>,
750 location: Option<Location>,
754}
755
756impl Input {
757 fn new(kind: InputKind, path: EvaluationPath, guest_path: Option<GuestPath>) -> Self {
759 Self {
760 kind,
761 path,
762 guest_path,
763 location: None,
764 }
765 }
766
767 pub fn kind(&self) -> InputKind {
769 self.kind
770 }
771
772 pub fn path(&self) -> &EvaluationPath {
776 &self.path
777 }
778
779 pub fn guest_path(&self) -> Option<&GuestPath> {
783 self.guest_path.as_ref()
784 }
785
786 pub fn local_path(&self) -> Option<&Path> {
790 self.location.as_deref().or_else(|| self.path.as_local())
791 }
792
793 pub fn set_location(&mut self, location: Location) {
797 self.location = Some(location);
798 }
799}