use std::collections::HashMap;
use std::error::Error;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{Arc, Mutex, PoisonError};
use log::{debug, trace};
use serde::de::DeserializeOwned;
use structopt::StructOpt;
use super::driver::{CacheId, Driver, Instruction};
use super::{Extractor, Fragment, Installer, Transformation};
use crate::extension::{Extensible, Extension};
use crate::validation::Action;
use crate::AnyError;
#[derive(Debug)]
pub struct MultiError {
pub errors: Vec<AnyError>,
pub pipeline: &'static str,
}
impl MultiError {
pub fn wrap(mut errs: Vec<AnyError>, pipeline: &'static str) -> AnyError {
match errs.len() {
0 => panic!("No errors in multi-error"),
1 => errs.pop().unwrap(),
_ => MultiError {
errors: errs,
pipeline,
}
.into(),
}
}
}
impl Display for MultiError {
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
write!(
formatter,
"Pipeline {} failed with {} errors",
self.pipeline,
self.errors.len()
)
}
}
impl Error for MultiError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.errors.get(0).map(|e| e.deref() as &dyn Error)
}
}
struct InstallCache<I, O, C, R, H> {
installer: I,
cache: HashMap<CacheId, H>,
_type: PhantomData<(R, O, C)>,
}
impl<I, O, C, R> InstallCache<I, O, C, R, I::UninstallHandle>
where
I: Installer<R, O, C>,
{
fn new(installer: I) -> Self {
Self {
installer,
cache: HashMap::new(),
_type: PhantomData,
}
}
fn interpret(&mut self, instruction: Instruction<R>, name: &'static str) {
match instruction {
Instruction::DropAll => self.cache.clear(),
Instruction::DropSpecific(id) => assert!(self.cache.remove(&id).is_some()),
Instruction::Install { id, resource } => {
let handle = self.installer.install(resource, name);
assert!(self.cache.insert(id, handle).is_none());
}
}
}
}
pub struct CfgExtractor<F>(F);
impl<'a, O, C: 'a, F, R> Extractor<'a, O, C> for CfgExtractor<F>
where
F: FnMut(&'a C) -> R,
R: Fragment + 'a,
{
type Fragment = R;
fn extract(&mut self, _: &'a O, config: &'a C) -> R {
(self.0)(config)
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct NopTransformation;
impl<R: 'static, I, S> Transformation<R, I, S> for NopTransformation {
type OutputResource = R;
type OutputInstaller = I;
fn installer(&mut self, installer: I, _: &str) -> I {
installer
}
fn transform(&mut self, resource: R, _: &S, _: &str) -> Result<Self::OutputResource, AnyError> {
Ok(resource)
}
}
pub struct ChainedTransformation<A, B>(A, B);
impl<A, B, R, I, S> Transformation<R, I, S> for ChainedTransformation<A, B>
where
A: Transformation<R, I, S>,
B: Transformation<A::OutputResource, A::OutputInstaller, S>,
{
type OutputResource = B::OutputResource;
type OutputInstaller = B::OutputInstaller;
fn installer(&mut self, installer: I, name: &'static str) -> B::OutputInstaller {
let installer = self.0.installer(installer, name);
self.1.installer(installer, name)
}
fn transform(
&mut self,
resource: R,
fragment: &S,
name: &'static str,
) -> Result<Self::OutputResource, AnyError> {
let resource = self.0.transform(resource, fragment, name)?;
self.1.transform(resource, fragment, name)
}
}
pub struct SetInstaller<T, I>(T, Option<I>);
impl<T, I, R, OI, S> Transformation<R, OI, S> for SetInstaller<T, I>
where
T: Transformation<R, OI, S>,
{
type OutputResource = T::OutputResource;
type OutputInstaller = I;
fn installer(&mut self, _installer: OI, _: &'static str) -> I {
self.1
.take()
.expect("SetInstaller::installer called more than once")
}
fn transform(
&mut self,
resource: R,
fragment: &S,
name: &'static str,
) -> Result<Self::OutputResource, AnyError> {
self.0.transform(resource, fragment, name)
}
}
pub struct Map<T, M>(T, M);
impl<T, M, Rin, Rout, I, S> Transformation<Rin, I, S> for Map<T, M>
where
T: Transformation<Rin, I, S>,
M: FnMut(T::OutputResource) -> Rout,
Rout: 'static,
{
type OutputResource = Rout;
type OutputInstaller = T::OutputInstaller;
fn installer(&mut self, installer: I, name: &'static str) -> T::OutputInstaller {
self.0.installer(installer, name)
}
fn transform(
&mut self,
resource: Rin,
fragment: &S,
name: &'static str,
) -> Result<Rout, AnyError> {
let r = self.0.transform(resource, fragment, name)?;
trace!("Mapping resource {}", name);
Ok((self.1)(r))
}
}
pub struct AndThen<T, A>(T, A);
impl<T, M, Rin, Rout, I, S> Transformation<Rin, I, S> for AndThen<T, M>
where
T: Transformation<Rin, I, S>,
M: FnMut(T::OutputResource) -> Result<Rout, AnyError>,
Rout: 'static,
{
type OutputResource = Rout;
type OutputInstaller = T::OutputInstaller;
fn installer(&mut self, installer: I, name: &'static str) -> T::OutputInstaller {
self.0.installer(installer, name)
}
fn transform(
&mut self,
resource: Rin,
fragment: &S,
name: &'static str,
) -> Result<Rout, AnyError> {
let r = self.0.transform(resource, fragment, name)?;
trace!("Mapping resource {}", name);
let res = (self.1)(r)?;
Ok(res)
}
}
pub struct Pipeline<Fragment, Extractor, Driver, Transformation, SpiritType> {
name: &'static str,
_fragment: PhantomData<dyn Fn(Fragment)>,
_spirit: PhantomData<dyn Fn(SpiritType)>,
extractor: Extractor,
driver: Driver,
transformation: Transformation,
}
impl Pipeline<(), (), (), (), ()> {
pub fn new(name: &'static str) -> Self {
Self {
name,
_fragment: PhantomData,
_spirit: PhantomData,
extractor: (),
driver: (),
transformation: (),
}
}
pub fn extract<O, C, E: for<'e> Extractor<'e, O, C>>(
self,
e: E,
) -> Pipeline<
<E as Extractor<'static, O, C>>::Fragment,
E,
<<E as Extractor<'static, O, C>>::Fragment as Fragment>::Driver,
NopTransformation,
(O, C),
> {
trace!("Configured extractor on pipeline {}", self.name);
Pipeline {
name: self.name,
_fragment: PhantomData,
_spirit: PhantomData,
extractor: e,
driver: Default::default(),
transformation: NopTransformation,
}
}
pub fn extract_cfg<O, C: 'static, R, E>(
self,
e: E,
) -> Pipeline<R, CfgExtractor<E>, R::Driver, NopTransformation, (O, C)>
where
CfgExtractor<E>: for<'a> Extractor<'a, O, C>,
E: FnMut(&'static C) -> R,
R: Fragment,
{
trace!("Configured extractor on pipeline {}", self.name);
Pipeline {
name: self.name,
_fragment: PhantomData,
_spirit: PhantomData,
extractor: CfgExtractor(e),
driver: Default::default(),
transformation: NopTransformation,
}
}
}
impl<F, E, D, T, O, C> Pipeline<F, E, D, T, (O, C)>
where
F: Fragment,
{
pub fn set_driver<ND: Driver<F>>(self, driver: ND) -> Pipeline<F, E, ND, T, (O, C)>
where
T: Transformation<<ND::SubFragment as Fragment>::Resource, F::Installer, ND::SubFragment>,
{
trace!("Overriding the driver on pipeline {}", self.name);
Pipeline {
driver,
name: self.name,
_fragment: PhantomData,
_spirit: PhantomData,
extractor: self.extractor,
transformation: self.transformation,
}
}
}
impl<F, E, D, T, O, C> Pipeline<F, E, D, T, (O, C)>
where
F: Fragment,
D: Driver<F>,
T: Transformation<<D::SubFragment as Fragment>::Resource, F::Installer, D::SubFragment>,
{
pub fn transform<NT>(
self,
transform: NT,
) -> Pipeline<F, E, D, ChainedTransformation<T, NT>, (O, C)>
where
NT: Transformation<T::OutputResource, T::OutputInstaller, D::SubFragment>,
{
trace!("Adding a transformation to pipeline {}", self.name);
Pipeline {
name: self.name,
_fragment: PhantomData,
_spirit: PhantomData,
driver: self.driver,
extractor: self.extractor,
transformation: ChainedTransformation(self.transformation, transform),
}
}
pub fn map<M, R>(self, m: M) -> Pipeline<F, E, D, Map<T, M>, (O, C)>
where
M: FnMut(T::OutputResource) -> R,
{
trace!("Adding a map transformation to pipeline {}", self.name);
Pipeline {
name: self.name,
_fragment: PhantomData,
_spirit: PhantomData,
driver: self.driver,
extractor: self.extractor,
transformation: Map(self.transformation, m),
}
}
pub fn and_then<A, R>(self, a: A) -> Pipeline<F, E, D, AndThen<T, A>, (O, C)>
where
A: FnMut(T::OutputResource) -> Result<R, AnyError>,
{
trace!(
"Adding an and_then transformation to pipeline {}",
self.name
);
Pipeline {
name: self.name,
_fragment: PhantomData,
_spirit: PhantomData,
driver: self.driver,
extractor: self.extractor,
transformation: AndThen(self.transformation, a),
}
}
pub fn install<I>(self, installer: I) -> Pipeline<F, E, D, SetInstaller<T, I>, (O, C)>
where
I: Installer<T::OutputResource, O, C>,
{
trace!("Setting installer to pipeline {}", self.name);
Pipeline {
name: self.name,
_fragment: PhantomData,
_spirit: PhantomData,
driver: self.driver,
extractor: self.extractor,
transformation: SetInstaller(self.transformation, Some(installer)),
}
}
pub fn check(self) -> Self
where
D::SubFragment: Fragment,
T::OutputInstaller: Installer<T::OutputResource, O, C>,
{
self
}
}
pub struct CompiledPipeline<O, C, T, I, D, E, R, H> {
name: &'static str,
transformation: T,
install_cache: InstallCache<I, O, C, R, H>,
driver: D,
extractor: E,
}
impl<O, C, T, I, D, E, R, H> CompiledPipeline<O, C, T, I, D, E, R, H> {
fn explode(&mut self) -> (&'static str, &mut T, &mut D) {
(self.name, &mut self.transformation, &mut self.driver)
}
}
pub trait BoundedCompiledPipeline<'a, O, C> {
fn run(me: &Arc<Mutex<Self>>, opts: &'a O, config: &'a C) -> Result<Action, Vec<AnyError>>;
}
impl<'a, O, C, T, I, D, E> BoundedCompiledPipeline<'a, O, C>
for CompiledPipeline<O, C, T, I, D, E, T::OutputResource, I::UninstallHandle>
where
O: 'static,
C: 'static,
E: Extractor<'a, O, C> + 'static,
D: Driver<E::Fragment> + Send + 'static,
T: Transformation<
<D::SubFragment as Fragment>::Resource,
<D::SubFragment as Fragment>::Installer,
D::SubFragment,
> + 'static,
T::OutputResource: 'static,
I: Installer<T::OutputResource, O, C> + Send + 'static,
{
fn run(me: &Arc<Mutex<Self>>, opts: &'a O, config: &'a C) -> Result<Action, Vec<AnyError>> {
let mut me_lock = me.lock().unwrap_or_else(PoisonError::into_inner);
let fragment = me_lock.extractor.extract(opts, config);
let (name, transform, driver) = me_lock.explode();
debug!("Running pipeline {}", name);
let instructions = driver.instructions(&fragment, transform, name)?;
let me_f = Arc::clone(me);
let failure = move || {
debug!("Rolling back pipeline {}", name);
me_f.lock()
.unwrap_or_else(PoisonError::into_inner)
.driver
.abort(name);
};
let me_s = Arc::clone(me);
let success = move || {
debug!(
"Success for pipeline {}, performing {} install instructions",
name,
instructions.len(),
);
let mut me = me_s.lock().unwrap_or_else(PoisonError::into_inner);
me.driver.confirm(name);
let name = me.name;
for ins in instructions {
me.install_cache.interpret(ins, name);
}
};
Ok(Action::new().on_abort(failure).on_success(success))
}
}
impl<F, B, E, D, T> Extension<B> for Pipeline<F, E, D, T, (B::Opts, B::Config)>
where
B::Config: DeserializeOwned + Send + Sync + 'static,
B::Opts: StructOpt + Send + Sync + 'static,
B: Extensible<Ok = B>,
CompiledPipeline<
B::Opts,
B::Config,
T,
T::OutputInstaller,
D,
E,
T::OutputResource,
<T::OutputInstaller as Installer<T::OutputResource, B::Opts, B::Config>>::UninstallHandle,
>: for<'a> BoundedCompiledPipeline<'a, B::Opts, B::Config> + Send + 'static,
D: Driver<F> + Send + 'static,
F: Fragment,
T: Transformation<
<D::SubFragment as Fragment>::Resource,
<D::SubFragment as Fragment>::Installer,
D::SubFragment,
>,
T::OutputInstaller: Installer<T::OutputResource, B::Opts, B::Config> + 'static,
{
fn apply(self, mut builder: B) -> Result<B, AnyError> {
trace!("Inserting pipeline {}", self.name);
let mut transformation = self.transformation;
let mut installer = transformation.installer(Default::default(), self.name);
builder = F::init(builder, self.name)?;
builder = installer.init(builder, self.name)?;
let compiled = CompiledPipeline {
name: self.name,
driver: self.driver,
extractor: self.extractor,
install_cache: InstallCache::new(installer),
transformation,
};
let compiled = Arc::new(Mutex::new(compiled));
let name = self.name;
if F::RUN_BEFORE_CONFIG && !B::STARTED {
let compiled = Arc::clone(&compiled);
let before_config = move |cfg: &B::Config, opts: &B::Opts| {
BoundedCompiledPipeline::run(&compiled, opts, cfg)
.map(|action| action.run(true))
.map_err(|errs| MultiError::wrap(errs, name))
};
builder = builder.before_config(before_config)?;
}
let validator = move |_old: &_, cfg: &Arc<B::Config>, opts: &B::Opts| {
BoundedCompiledPipeline::run(&compiled, opts, cfg)
.map_err(|errs| MultiError::wrap(errs, name))
};
builder.config_validator(validator)
}
}