1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fmt;
6use std::fs;
7use std::io::BufRead;
8use std::path::Path;
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use anyhow::Result;
13use anyhow::bail;
14use cloud_copy::TransferEvent;
15use crankshaft::events::Event as CrankshaftEvent;
16use indexmap::IndexMap;
17use itertools::Itertools;
18use rev_buf_reader::RevBufReader;
19use tokio::sync::broadcast;
20use wdl_analysis::Document;
21use wdl_analysis::document::Task;
22use wdl_analysis::types::Type;
23use wdl_ast::Diagnostic;
24use wdl_ast::Span;
25use wdl_ast::SupportedVersion;
26use wdl_ast::v1::TASK_REQUIREMENT_RETURN_CODES;
27use wdl_ast::v1::TASK_REQUIREMENT_RETURN_CODES_ALIAS;
28
29use crate::CompoundValue;
30use crate::Outputs;
31use crate::PrimitiveValue;
32use crate::TaskExecutionResult;
33use crate::Value;
34use crate::http::Location;
35use crate::http::Transferer;
36use crate::path::EvaluationPath;
37use crate::stdlib::download_file;
38
39pub mod trie;
40pub mod v1;
41
42const MAX_STDERR_LINES: usize = 10;
44
45const ROOT_NAME: &str = ".root";
49
50#[derive(Debug, Clone, Default)]
52pub struct Events {
53 crankshaft: Option<broadcast::Sender<CrankshaftEvent>>,
57 transfer: Option<broadcast::Sender<TransferEvent>>,
61}
62
63impl Events {
64 pub fn all(capacity: usize) -> Self {
66 Self {
67 crankshaft: Some(broadcast::Sender::new(capacity)),
68 transfer: Some(broadcast::Sender::new(capacity)),
69 }
70 }
71
72 pub fn none() -> Self {
74 Self::default()
75 }
76
77 pub fn crankshaft_only(capacity: usize) -> Self {
80 Self {
81 crankshaft: Some(broadcast::Sender::new(capacity)),
82 transfer: None,
83 }
84 }
85
86 pub fn transfer_only(capacity: usize) -> Self {
89 Self {
90 crankshaft: None,
91 transfer: Some(broadcast::Sender::new(capacity)),
92 }
93 }
94
95 pub fn subscribe_crankshaft(&self) -> Option<broadcast::Receiver<CrankshaftEvent>> {
99 self.crankshaft.as_ref().map(|s| s.subscribe())
100 }
101
102 pub fn subscribe_transfer(&self) -> Option<broadcast::Receiver<TransferEvent>> {
106 self.transfer.as_ref().map(|s| s.subscribe())
107 }
108
109 pub(crate) fn crankshaft(&self) -> &Option<broadcast::Sender<CrankshaftEvent>> {
111 &self.crankshaft
112 }
113
114 pub(crate) fn transfer(&self) -> &Option<broadcast::Sender<TransferEvent>> {
116 &self.transfer
117 }
118}
119
120#[derive(Debug, Clone)]
122pub struct CallLocation {
123 pub document: Document,
125 pub span: Span,
127}
128
129#[derive(Debug)]
131pub struct SourceError {
132 pub document: Document,
134 pub diagnostic: Diagnostic,
136 pub backtrace: Vec<CallLocation>,
143}
144
145#[derive(Debug)]
147pub enum EvaluationError {
148 Source(Box<SourceError>),
150 Other(anyhow::Error),
152}
153
154impl EvaluationError {
155 pub fn new(document: Document, diagnostic: Diagnostic) -> Self {
157 Self::Source(Box::new(SourceError {
158 document,
159 diagnostic,
160 backtrace: Default::default(),
161 }))
162 }
163
164 #[cfg(feature = "codespan-reporting")]
166 #[allow(clippy::inherent_to_string)]
167 pub fn to_string(&self) -> String {
168 use codespan_reporting::diagnostic::Label;
169 use codespan_reporting::diagnostic::LabelStyle;
170 use codespan_reporting::files::SimpleFiles;
171 use codespan_reporting::term::Config;
172 use codespan_reporting::term::termcolor::Buffer;
173 use codespan_reporting::term::{self};
174 use wdl_ast::AstNode;
175
176 match self {
177 Self::Source(e) => {
178 let mut files = SimpleFiles::new();
179 let mut map = HashMap::new();
180
181 let file_id = files.add(e.document.path(), e.document.root().text().to_string());
182
183 let diagnostic =
184 e.diagnostic
185 .to_codespan(file_id)
186 .with_labels_iter(e.backtrace.iter().map(|l| {
187 let id = l.document.id();
188 let file_id = *map.entry(id).or_insert_with(|| {
189 files.add(l.document.path(), l.document.root().text().to_string())
190 });
191
192 Label {
193 style: LabelStyle::Secondary,
194 file_id,
195 range: l.span.start()..l.span.end(),
196 message: "called from this location".into(),
197 }
198 }));
199
200 let mut buffer = Buffer::no_color();
201 term::emit(&mut buffer, &Config::default(), &files, &diagnostic)
202 .expect("failed to emit diagnostic");
203
204 String::from_utf8(buffer.into_inner()).expect("should be UTF-8")
205 }
206 Self::Other(e) => format!("{e:?}"),
207 }
208 }
209}
210
211impl From<anyhow::Error> for EvaluationError {
212 fn from(e: anyhow::Error) -> Self {
213 Self::Other(e)
214 }
215}
216
217pub type EvaluationResult<T> = Result<T, EvaluationError>;
219
220#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
225pub struct HostPath(pub(crate) Arc<String>);
226
227impl HostPath {
228 pub fn new(path: impl Into<String>) -> Self {
230 Self(Arc::new(path.into()))
231 }
232
233 pub fn as_str(&self) -> &str {
235 &self.0
236 }
237}
238
239impl fmt::Display for HostPath {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 self.0.fmt(f)
242 }
243}
244
245impl From<Arc<String>> for HostPath {
246 fn from(path: Arc<String>) -> Self {
247 Self(path)
248 }
249}
250
251impl From<HostPath> for Arc<String> {
252 fn from(path: HostPath) -> Self {
253 path.0
254 }
255}
256
257#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
264pub struct GuestPath(pub(crate) Arc<String>);
265
266impl GuestPath {
267 pub fn new(path: impl Into<String>) -> Self {
269 Self(Arc::new(path.into()))
270 }
271
272 pub fn as_str(&self) -> &str {
274 &self.0
275 }
276}
277
278impl fmt::Display for GuestPath {
279 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280 self.0.fmt(f)
281 }
282}
283
284impl From<Arc<String>> for GuestPath {
285 fn from(path: Arc<String>) -> Self {
286 Self(path)
287 }
288}
289
290impl From<GuestPath> for Arc<String> {
291 fn from(path: GuestPath) -> Self {
292 path.0
293 }
294}
295
296pub trait EvaluationContext: Send + Sync {
298 fn version(&self) -> SupportedVersion;
300
301 fn resolve_name(&self, name: &str, span: Span) -> Result<Value, Diagnostic>;
303
304 fn resolve_type_name(&self, name: &str, span: Span) -> Result<Type, Diagnostic>;
306
307 fn base_dir(&self) -> &EvaluationPath;
316
317 fn temp_dir(&self) -> &Path;
319
320 fn stdout(&self) -> Option<&Value> {
324 None
325 }
326
327 fn stderr(&self) -> Option<&Value> {
331 None
332 }
333
334 fn task(&self) -> Option<&Task> {
338 None
339 }
340
341 fn transferer(&self) -> &dyn Transferer;
343
344 fn guest_path(&self, path: &HostPath) -> Option<GuestPath> {
349 let _ = path;
350 None
351 }
352
353 fn host_path(&self, path: &GuestPath) -> Option<HostPath> {
358 let _ = path;
359 None
360 }
361
362 fn notify_file_created(&mut self, path: &HostPath) -> Result<()> {
367 let _ = path;
368 Ok(())
369 }
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
374pub struct ScopeIndex(usize);
375
376impl ScopeIndex {
377 pub const fn new(index: usize) -> Self {
379 Self(index)
380 }
381}
382
383impl From<usize> for ScopeIndex {
384 fn from(index: usize) -> Self {
385 Self(index)
386 }
387}
388
389impl From<ScopeIndex> for usize {
390 fn from(index: ScopeIndex) -> Self {
391 index.0
392 }
393}
394
395#[derive(Default, Debug)]
397pub struct Scope {
398 parent: Option<ScopeIndex>,
402 names: IndexMap<String, Value>,
404}
405
406impl Scope {
407 pub fn new(parent: ScopeIndex) -> Self {
409 Self {
410 parent: Some(parent),
411 names: Default::default(),
412 }
413 }
414
415 pub fn insert(&mut self, name: impl Into<String>, value: impl Into<Value>) {
417 let prev = self.names.insert(name.into(), value.into());
418 assert!(prev.is_none(), "conflicting name in scope");
419 }
420
421 pub fn local(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
423 self.names.iter().map(|(k, v)| (k.as_str(), v))
424 }
425
426 pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut Value> {
428 self.names.get_mut(name)
429 }
430
431 pub(crate) fn clear(&mut self) {
433 self.parent = None;
434 self.names.clear();
435 }
436
437 pub(crate) fn set_parent(&mut self, parent: ScopeIndex) {
439 self.parent = Some(parent);
440 }
441}
442
443impl From<Scope> for IndexMap<String, Value> {
444 fn from(scope: Scope) -> Self {
445 scope.names
446 }
447}
448
449#[derive(Debug, Clone, Copy)]
451pub struct ScopeRef<'a> {
452 scopes: &'a [Scope],
454 index: ScopeIndex,
456}
457
458impl<'a> ScopeRef<'a> {
459 pub fn new(scopes: &'a [Scope], index: impl Into<ScopeIndex>) -> Self {
461 Self {
462 scopes,
463 index: index.into(),
464 }
465 }
466
467 pub fn parent(&self) -> Option<Self> {
471 self.scopes[self.index.0].parent.map(|p| Self {
472 scopes: self.scopes,
473 index: p,
474 })
475 }
476
477 pub fn names(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
479 self.scopes[self.index.0]
480 .names
481 .iter()
482 .map(|(n, name)| (n.as_str(), name))
483 }
484
485 pub fn for_each(&self, mut cb: impl FnMut(&str, &Value) -> Result<()>) -> Result<()> {
490 let mut current = Some(self.index);
491
492 while let Some(index) = current {
493 for (n, v) in self.scopes[index.0].local() {
494 cb(n, v)?;
495 }
496
497 current = self.scopes[index.0].parent;
498 }
499
500 Ok(())
501 }
502
503 pub fn local(&self, name: &str) -> Option<&Value> {
507 self.scopes[self.index.0].names.get(name)
508 }
509
510 pub fn lookup(&self, name: &str) -> Option<&Value> {
514 let mut current = Some(self.index);
515
516 while let Some(index) = current {
517 if let Some(name) = self.scopes[index.0].names.get(name) {
518 return Some(name);
519 }
520
521 current = self.scopes[index.0].parent;
522 }
523
524 None
525 }
526}
527
528#[derive(Debug)]
530pub struct EvaluatedTask {
531 attempt_dir: PathBuf,
533 result: TaskExecutionResult,
535 outputs: EvaluationResult<Outputs>,
543}
544
545impl EvaluatedTask {
546 fn new(attempt_dir: PathBuf, result: TaskExecutionResult) -> anyhow::Result<Self> {
550 Ok(Self {
551 result,
552 attempt_dir,
553 outputs: Ok(Default::default()),
554 })
555 }
556
557 pub fn exit_code(&self) -> i32 {
559 self.result.exit_code
560 }
561
562 pub fn attempt_dir(&self) -> &Path {
564 &self.attempt_dir
565 }
566
567 pub fn work_dir(&self) -> &EvaluationPath {
569 &self.result.work_dir
570 }
571
572 pub fn stdout(&self) -> &Value {
574 &self.result.stdout
575 }
576
577 pub fn stderr(&self) -> &Value {
579 &self.result.stderr
580 }
581
582 pub fn outputs(&self) -> &EvaluationResult<Outputs> {
590 &self.outputs
591 }
592
593 pub fn into_result(self) -> EvaluationResult<Outputs> {
599 self.outputs
600 }
601
602 async fn handle_exit(
606 &self,
607 requirements: &HashMap<String, Value>,
608 transferer: &dyn Transferer,
609 ) -> anyhow::Result<()> {
610 let mut error = true;
611 if let Some(return_codes) = requirements
612 .get(TASK_REQUIREMENT_RETURN_CODES)
613 .or_else(|| requirements.get(TASK_REQUIREMENT_RETURN_CODES_ALIAS))
614 {
615 match return_codes {
616 Value::Primitive(PrimitiveValue::String(s)) if s.as_ref() == "*" => {
617 error = false;
618 }
619 Value::Primitive(PrimitiveValue::String(s)) => {
620 bail!(
621 "invalid return code value `{s}`: only `*` is accepted when the return \
622 code is specified as a string"
623 );
624 }
625 Value::Primitive(PrimitiveValue::Integer(ok)) => {
626 if self.result.exit_code == i32::try_from(*ok).unwrap_or_default() {
627 error = false;
628 }
629 }
630 Value::Compound(CompoundValue::Array(codes)) => {
631 error = !codes.as_slice().iter().any(|v| {
632 v.as_integer()
633 .map(|i| i32::try_from(i).unwrap_or_default() == self.result.exit_code)
634 .unwrap_or(false)
635 });
636 }
637 _ => unreachable!("unexpected return codes value"),
638 }
639 } else {
640 error = self.result.exit_code != 0;
641 }
642
643 if error {
644 let stderr = download_file(
647 transferer,
648 self.work_dir(),
649 self.stderr().as_file().unwrap(),
650 )
651 .await
652 .ok()
653 .and_then(|l| {
654 fs::File::open(l).ok().map(|f| {
655 let reader = RevBufReader::new(f);
657 let lines: Vec<_> = reader
658 .lines()
659 .take(MAX_STDERR_LINES)
660 .map_while(|l| l.ok())
661 .collect();
662
663 lines
665 .iter()
666 .rev()
667 .format_with("\n", |l, f| f(&format_args!(" {l}")))
668 .to_string()
669 })
670 })
671 .unwrap_or_default();
672
673 bail!(
675 "process terminated with exit code {code}: see `{stdout_path}` and \
676 `{stderr_path}` for task output and the related files in \
677 `{dir}`{header}{stderr}{trailer}",
678 code = self.result.exit_code,
679 dir = self.attempt_dir().display(),
680 stdout_path = self.stdout().as_file().expect("must be file"),
681 stderr_path = self.stderr().as_file().expect("must be file"),
682 header = if stderr.is_empty() {
683 Cow::Borrowed("")
684 } else {
685 format!("\n\ntask stderr output (last {MAX_STDERR_LINES} lines):\n\n").into()
686 },
687 trailer = if stderr.is_empty() { "" } else { "\n" }
688 );
689 }
690
691 Ok(())
692 }
693}
694
695#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
697pub enum InputKind {
698 File,
700 Directory,
702}
703
704impl From<InputKind> for crankshaft::engine::task::input::Type {
705 fn from(value: InputKind) -> Self {
706 match value {
707 InputKind::File => Self::File,
708 InputKind::Directory => Self::Directory,
709 }
710 }
711}
712
713#[derive(Debug, Clone)]
715pub struct Input {
716 kind: InputKind,
718 path: EvaluationPath,
720 guest_path: Option<GuestPath>,
724 location: Option<Location>,
728}
729
730impl Input {
731 fn new(kind: InputKind, path: EvaluationPath, guest_path: Option<GuestPath>) -> Self {
733 Self {
734 kind,
735 path,
736 guest_path,
737 location: None,
738 }
739 }
740
741 pub fn kind(&self) -> InputKind {
743 self.kind
744 }
745
746 pub fn path(&self) -> &EvaluationPath {
750 &self.path
751 }
752
753 pub fn guest_path(&self) -> Option<&GuestPath> {
757 self.guest_path.as_ref()
758 }
759
760 pub fn local_path(&self) -> Option<&Path> {
764 self.location.as_deref().or_else(|| self.path.as_local())
765 }
766
767 pub fn set_location(&mut self, location: Location) {
771 self.location = Some(location);
772 }
773}