use crate::{
data_switch::{self, DataCache, DataSwitch, SpaceSpec, TimeSpec},
harness::{self, CheckResult},
pipeline::Pipeline,
};
use std::collections::HashMap;
use thiserror::Error;
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Error {
#[error("failed to run check: {0}")]
Runner(#[from] harness::Error),
#[error("invalid argument: {0}")]
InvalidArg(&'static str),
#[error("data switch failed to find data: {0}")]
DataSwitch(#[from] data_switch::Error),
}
#[derive(Debug)]
pub struct Scheduler {
#[allow(missing_docs)]
pub pipelines: HashMap<String, Pipeline>,
data_switch: DataSwitch,
}
impl Scheduler {
pub fn new(pipelines: HashMap<String, Pipeline>, data_switch: DataSwitch) -> Self {
Scheduler {
pipelines,
data_switch,
}
}
pub fn schedule_tests(pipeline: &Pipeline, data: DataCache) -> Result<Vec<CheckResult>, Error> {
pipeline
.steps
.iter()
.map(|step| harness::run_check(step, &data).map_err(Error::Runner))
.collect()
}
pub async fn validate_direct(
&self,
data_source: impl AsRef<str>,
_backing_sources: &[impl AsRef<str>],
time_spec: &TimeSpec,
space_spec: &SpaceSpec,
test_pipeline: impl AsRef<str>,
extra_spec: Option<&str>,
) -> Result<Vec<CheckResult>, Error> {
let pipeline = self
.pipelines
.get(test_pipeline.as_ref())
.ok_or(Error::InvalidArg("pipeline not recognised"))?;
let data = match self
.data_switch
.fetch_data(
data_source.as_ref(),
space_spec,
time_spec,
pipeline.num_leading_required,
pipeline.num_trailing_required,
extra_spec,
)
.await
{
Ok(data) => data,
Err(e) => {
tracing::error!(%e);
return Err(Error::DataSwitch(e));
}
};
Scheduler::schedule_tests(pipeline, data)
}
}