use std::any::type_name;
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Display;
use std::future::Future;
use std::sync::Arc;
use std::time::Instant;
use anymap2::any::CloneAnySendSync;
use anymap2::Map;
use sqlx::{Pool, Postgres};
use uuid::Uuid;
use crate::hidden::{BuildFn, RunFn};
use crate::utils::Opaque;
use crate::{JobBuilder, JobRunnerOptions};
type BoxedError = Box<dyn Error + Send + 'static>;
pub struct JobRegistry {
#[allow(clippy::type_complexity)]
error_handler: Arc<dyn Fn(&str, BoxedError) + Send + Sync>,
job_map: HashMap<&'static str, &'static NamedJob>,
context: Map<dyn CloneAnySendSync + Send + Sync>,
}
#[derive(Debug)]
pub struct UnknownJobError;
impl Error for UnknownJobError {}
impl Display for UnknownJobError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("Unknown job")
}
}
impl JobRegistry {
pub fn new(jobs: &[&'static NamedJob]) -> Self {
let mut job_map = HashMap::new();
for &job in jobs {
if job_map.insert(job.name(), job).is_some() {
panic!("Duplicate job registered: {}", job.name());
}
}
Self {
error_handler: Arc::new(Self::default_error_handler),
job_map,
context: Map::new(),
}
}
pub fn set_error_handler(
&mut self,
error_handler: impl Fn(&str, BoxedError) + Send + Sync + 'static,
) -> &mut Self {
self.error_handler = Arc::new(error_handler);
self
}
pub fn set_context<C: Clone + Send + Sync + 'static>(&mut self, context: C) -> &mut Self {
self.context.insert(context);
self
}
pub fn context<C: Clone + Send + Sync + 'static>(&self) -> C {
if let Some(c) = self.context.get::<C>() {
c.clone()
} else {
panic!(
"No context of type `{}` has been provided.",
type_name::<C>()
);
}
}
pub fn resolve_job(&self, name: &str) -> Option<&'static NamedJob> {
self.job_map.get(name).copied()
}
pub fn default_error_handler(name: &str, error: BoxedError) {
log::error!("Job `{}` failed: {:?}", name, error);
}
#[doc(hidden)]
pub fn spawn_internal<E: Into<Box<dyn Error + Send + Sync + 'static>>>(
&self,
name: &'static str,
f: impl Future<Output = Result<(), E>> + Send + 'static,
) {
let error_handler = self.error_handler.clone();
tokio::spawn(async move {
let start_time = Instant::now();
log::info!("Job `{}` started.", name);
if let Err(e) = f.await {
error_handler(name, e.into());
} else {
log::info!(
"Job `{}` completed in {}s.",
name,
start_time.elapsed().as_secs_f64()
);
}
});
}
pub fn runner(self, pool: &Pool<Postgres>) -> JobRunnerOptions {
JobRunnerOptions::new(pool, move |current_job| {
if let Some(job) = self.resolve_job(current_job.name()) {
(job.run_fn.0 .0)(&self, current_job);
} else {
(self.error_handler)(current_job.name(), Box::new(UnknownJobError))
}
})
}
}
#[derive(Debug)]
pub struct NamedJob {
name: &'static str,
build_fn: Opaque<BuildFn>,
run_fn: Opaque<RunFn>,
}
impl NamedJob {
#[doc(hidden)]
pub const fn new_internal(name: &'static str, build_fn: BuildFn, run_fn: RunFn) -> Self {
Self {
name,
build_fn: Opaque(build_fn),
run_fn: Opaque(run_fn),
}
}
pub fn builder(&self) -> JobBuilder<'static> {
let mut builder = JobBuilder::new(self.name);
(self.build_fn.0 .0)(&mut builder);
builder
}
pub fn builder_with_id(&self, id: Uuid) -> JobBuilder<'static> {
let mut builder = JobBuilder::new_with_id(id, self.name);
(self.build_fn.0 .0)(&mut builder);
builder
}
pub const fn name(&self) -> &'static str {
self.name
}
}