mod expr;
mod task;
mod validators;
mod workflow;
use std::collections::HashMap;
use std::fs::File;
use std::io::BufWriter;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
use anyhow::Context;
use anyhow::Result;
pub(crate) use expr::*;
use serde::Serialize;
pub(crate) use task::*;
use tokio::sync::broadcast;
use tracing::info;
use wdl_analysis::types::EnumVariantCacheKey;
use super::CancellationContext;
use super::Events;
use crate::EngineEvent;
use crate::Value;
use crate::backend::TaskExecutionBackend;
use crate::cache::CallCache;
use crate::cache::CallCacheExclusions;
use crate::config::CallCachingMode;
use crate::config::Config;
use crate::http::HttpTransferer;
use crate::http::Transferer;
const INPUTS_FILE: &str = "inputs.json";
const OUTPUTS_FILE: &str = "outputs.json";
fn write_json_file(path: impl AsRef<Path>, value: &impl Serialize) -> Result<()> {
let path = path.as_ref();
let file = File::create(path)
.with_context(|| format!("failed to create file `{path}`", path = path.display()))?;
serde_json::to_writer_pretty(BufWriter::new(file), value)
.with_context(|| format!("failed to write file `{path}`", path = path.display()))
}
#[derive(Clone)]
pub struct Evaluator {
config: Arc<Config>,
backend: Arc<dyn TaskExecutionBackend>,
cancellation: CancellationContext,
transferer: Arc<dyn Transferer>,
cache: Option<CallCache>,
events: Option<broadcast::Sender<EngineEvent>>,
variant_cache: Arc<Mutex<HashMap<EnumVariantCacheKey, Value>>>,
}
impl Evaluator {
pub async fn new(
root_dir: impl AsRef<Path>,
config: Arc<Config>,
cancellation: CancellationContext,
events: Events,
) -> Result<Self> {
config.validate().await?;
let root_dir = root_dir.as_ref();
let backend = config
.create_backend(root_dir, events.clone(), cancellation.clone())
.await?;
let transferer = Arc::new(HttpTransferer::new(
config.clone(),
cancellation.first(),
events.transfer().clone(),
)?);
let cache = match config.task.cache {
CallCachingMode::Off => {
info!("call caching is disabled");
None
}
_ => Some(
CallCache::new(
config.task.cache_dir().as_deref(),
config.task.digests,
transferer.clone(),
Arc::new(CallCacheExclusions {
inputs: config.task.excluded_cache_inputs.clone(),
requirements: config.task.excluded_cache_requirements.clone(),
hints: config.task.excluded_cache_hints.clone(),
}),
)
.await?,
),
};
Ok(Self {
config,
backend,
cancellation,
transferer,
cache,
events: events.engine().clone(),
variant_cache: Default::default(),
})
}
}