Skip to main content

wdl_engine/eval/
v1.rs

1//! Implementation of evaluation for V1 documents.
2
3mod expr;
4mod task;
5mod validators;
6mod workflow;
7
8use std::collections::HashMap;
9use std::fs::File;
10use std::io::BufWriter;
11use std::path::Path;
12use std::sync::Arc;
13use std::sync::Mutex;
14
15use anyhow::Context;
16use anyhow::Result;
17pub(crate) use expr::*;
18use serde::Serialize;
19pub(crate) use task::*;
20use tokio::sync::broadcast;
21use tracing::info;
22use wdl_analysis::types::EnumVariantCacheKey;
23
24use super::CancellationContext;
25use super::Events;
26use crate::EngineEvent;
27use crate::Value;
28use crate::backend::TaskExecutionBackend;
29use crate::cache::CallCache;
30use crate::cache::CallCacheExclusions;
31use crate::config::CallCachingMode;
32use crate::config::Config;
33use crate::http::HttpTransferer;
34use crate::http::Transferer;
35
36/// The name of the inputs file to write for each task and workflow in the
37/// outputs directory.
38const INPUTS_FILE: &str = "inputs.json";
39
40/// The name of the outputs file to write for each task and workflow in the
41/// outputs directory.
42const OUTPUTS_FILE: &str = "outputs.json";
43
44/// Serializes a value into a JSON file.
45fn write_json_file(path: impl AsRef<Path>, value: &impl Serialize) -> Result<()> {
46    let path = path.as_ref();
47    let file = File::create(path)
48        .with_context(|| format!("failed to create file `{path}`", path = path.display()))?;
49    serde_json::to_writer_pretty(BufWriter::new(file), value)
50        .with_context(|| format!("failed to write file `{path}`", path = path.display()))
51}
52
53/// Represents a WDL evaluator.
54///
55/// The evaluator is used to evaluate a specific task or the workflow of an
56/// analyzed document.
57///
58/// This type is cheaply cloned and sendable between threads.
59#[derive(Clone)]
60pub struct Evaluator {
61    /// The associated evaluation configuration.
62    config: Arc<Config>,
63    /// The associated task execution backend.
64    backend: Arc<dyn TaskExecutionBackend>,
65    /// The cancellation context for cancelling task evaluation.
66    cancellation: CancellationContext,
67    /// The transferer to use for expression evaluation.
68    transferer: Arc<dyn Transferer>,
69    /// The call cache to use for task evaluation.
70    cache: Option<CallCache>,
71    /// The events for evaluation.
72    events: Option<broadcast::Sender<EngineEvent>>,
73    /// Cache for evaluated enum variant values to avoid redundant AST lookups.
74    variant_cache: Arc<Mutex<HashMap<EnumVariantCacheKey, Value>>>,
75}
76
77impl Evaluator {
78    /// Constructs a new evaluator with the given evaluation root directory,
79    /// evaluation configuration, cancellation context, and events.
80    ///
81    /// Returns an error if the configuration isn't valid.
82    pub async fn new(
83        root_dir: impl AsRef<Path>,
84        config: Arc<Config>,
85        cancellation: CancellationContext,
86        events: Events,
87    ) -> Result<Self> {
88        config.validate().await?;
89
90        let root_dir = root_dir.as_ref();
91        let backend = config
92            .create_backend(root_dir, events.clone(), cancellation.clone())
93            .await?;
94        let transferer = Arc::new(HttpTransferer::new(
95            config.clone(),
96            cancellation.first(),
97            events.transfer().clone(),
98        )?);
99
100        let cache = match config.task.cache {
101            CallCachingMode::Off => {
102                info!("call caching is disabled");
103                None
104            }
105            _ => Some(
106                CallCache::new(
107                    config.task.cache_dir.as_deref(),
108                    config.task.digests,
109                    transferer.clone(),
110                    Arc::new(CallCacheExclusions {
111                        inputs: config.task.excluded_cache_inputs.clone(),
112                        requirements: config.task.excluded_cache_requirements.clone(),
113                        hints: config.task.excluded_cache_hints.clone(),
114                    }),
115                )
116                .await?,
117            ),
118        };
119
120        Ok(Self {
121            config,
122            backend,
123            cancellation,
124            transferer,
125            cache,
126            events: events.engine().clone(),
127            variant_cache: Default::default(),
128        })
129    }
130}