1mod expr;
4mod task;
5mod validators;
6mod workflow;
7
8use std::collections::HashMap;
9use std::fs::File;
10use std::io::BufWriter;
11use std::path::Path;
12use std::sync::Arc;
13use std::sync::Mutex;
14
15use anyhow::Context;
16use anyhow::Result;
17pub(crate) use expr::*;
18use serde::Serialize;
19pub(crate) use task::*;
20use tokio::sync::broadcast;
21use tracing::info;
22use wdl_analysis::types::EnumVariantCacheKey;
23
24use super::CancellationContext;
25use super::Events;
26use crate::EngineEvent;
27use crate::Value;
28use crate::backend::TaskExecutionBackend;
29use crate::cache::CallCache;
30use crate::cache::CallCacheExclusions;
31use crate::config::CallCachingMode;
32use crate::config::Config;
33use crate::http::HttpTransferer;
34use crate::http::Transferer;
35
36const INPUTS_FILE: &str = "inputs.json";
39
40const OUTPUTS_FILE: &str = "outputs.json";
43
44fn write_json_file(path: impl AsRef<Path>, value: &impl Serialize) -> Result<()> {
46 let path = path.as_ref();
47 let file = File::create(path)
48 .with_context(|| format!("failed to create file `{path}`", path = path.display()))?;
49 serde_json::to_writer_pretty(BufWriter::new(file), value)
50 .with_context(|| format!("failed to write file `{path}`", path = path.display()))
51}
52
53#[derive(Clone)]
60pub struct Evaluator {
61 config: Arc<Config>,
63 backend: Arc<dyn TaskExecutionBackend>,
65 cancellation: CancellationContext,
67 transferer: Arc<dyn Transferer>,
69 cache: Option<CallCache>,
71 events: Option<broadcast::Sender<EngineEvent>>,
73 variant_cache: Arc<Mutex<HashMap<EnumVariantCacheKey, Value>>>,
75}
76
77impl Evaluator {
78 pub async fn new(
83 root_dir: impl AsRef<Path>,
84 config: Arc<Config>,
85 cancellation: CancellationContext,
86 events: Events,
87 ) -> Result<Self> {
88 config.validate().await?;
89
90 let root_dir = root_dir.as_ref();
91 let backend = config
92 .create_backend(root_dir, events.clone(), cancellation.clone())
93 .await?;
94 let transferer = Arc::new(HttpTransferer::new(
95 config.clone(),
96 cancellation.first(),
97 events.transfer().clone(),
98 )?);
99
100 let cache = match config.task.cache {
101 CallCachingMode::Off => {
102 info!("call caching is disabled");
103 None
104 }
105 _ => Some(
106 CallCache::new(
107 config.task.cache_dir.as_deref(),
108 config.task.digests,
109 transferer.clone(),
110 Arc::new(CallCacheExclusions {
111 inputs: config.task.excluded_cache_inputs.clone(),
112 requirements: config.task.excluded_cache_requirements.clone(),
113 hints: config.task.excluded_cache_hints.clone(),
114 }),
115 )
116 .await?,
117 ),
118 };
119
120 Ok(Self {
121 config,
122 backend,
123 cancellation,
124 transferer,
125 cache,
126 events: events.engine().clone(),
127 variant_cache: Default::default(),
128 })
129 }
130}