#![allow(clippy::new_without_default)]
use crate::error::PerformError;
use crate::job::Job;
use sqlx::PgPool;
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::marker::PhantomData;
#[derive(Default)]
#[allow(missing_debug_implementations)] pub struct Registry<Env> {
jobs: HashMap<&'static str, JobVTable>,
_marker: PhantomData<Env>,
}
impl<Env: 'static> Registry<Env> {
pub fn register_job<T: Job + 'static + Send>(&mut self) {
if TypeId::of::<T::Environment>() == TypeId::of::<Env>() {
self.jobs.insert(T::JOB_TYPE, JobVTable::from_job::<T>());
} else {
log::warn!("could not register job {}", T::JOB_TYPE);
}
}
pub fn load() -> Self {
let jobs = inventory::iter::<JobVTable>
.into_iter()
.filter(|vtable| vtable.env_type == TypeId::of::<Env>())
.map(|&vtable| (vtable.job_type, vtable))
.collect();
Self {
jobs,
_marker: PhantomData,
}
}
pub fn get(&self, job_type: &str) -> Option<PerformJob<Env>> {
self.jobs.get(job_type).map(|&vtable| PerformJob {
vtable,
_marker: PhantomData,
})
}
}
#[macro_export]
macro_rules! register_job {
($job_ty: ty) => {
$crate::inventory::submit! {
#![crate = coil]
coil::JobVTable::from_job::<$job_ty>()
}
};
}
#[doc(hidden)]
#[derive(Clone, Copy)]
pub struct JobVTable {
env_type: TypeId,
job_type: &'static str,
perform: fn(serde_json::Value, &dyn Any, &PgPool) -> Result<(), PerformError>,
}
inventory::collect!(JobVTable);
impl JobVTable {
pub fn from_job<T: 'static + Job + Send>() -> Self {
Self {
env_type: TypeId::of::<T::Environment>(),
job_type: T::JOB_TYPE,
perform: perform_sync_job::<T>,
}
}
}
fn perform_sync_job<T: Job>(
data: serde_json::Value,
env: &dyn Any,
conn: &PgPool,
) -> Result<(), PerformError> {
let environment = env.downcast_ref().ok_or_else::<PerformError, _>(|| {
"Incorrect environment type. This should never happen. \
Please open an issue at https://github.com/paritytech/coil/issues/new"
.into()
})?;
let data = serde_json::from_value(data)?;
T::perform(data, environment, conn)
}
pub struct PerformJob<Env> {
vtable: JobVTable,
_marker: PhantomData<Env>,
}
impl<Env: 'static + Send + Sync> PerformJob<Env> {
pub fn perform_sync(
&self,
data: serde_json::Value,
env: &Env,
conn: &PgPool,
) -> Result<(), PerformError> {
(self.vtable.perform)(data, env, conn)
}
}