use graphile_worker_ctx::WorkerContext;
use serde::Deserialize;
use serde::Serialize;
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
pub type TaskHandlerFn = Arc<
dyn Fn(WorkerContext) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> + Send + Sync,
>;
pub trait IntoTaskHandlerResult {
fn into_task_handler_result(self) -> Result<(), impl Debug>;
}
#[derive(Clone)]
pub struct JobDefinition {
identifier: &'static str,
handler: TaskHandlerFn,
}
impl JobDefinition {
pub fn of<T: TaskHandler>() -> Self {
let handler = move |ctx: WorkerContext| {
let ctx = ctx.clone();
Box::pin(run_task_from_worker_ctx::<T>(ctx))
as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
};
Self {
identifier: T::IDENTIFIER,
handler: Arc::new(handler),
}
}
pub fn identifier(&self) -> &'static str {
self.identifier
}
pub fn handler(&self) -> TaskHandlerFn {
self.handler.clone()
}
pub fn into_parts(self) -> (&'static str, TaskHandlerFn) {
(self.identifier, self.handler)
}
}
impl IntoTaskHandlerResult for () {
fn into_task_handler_result(self) -> Result<(), impl Debug> {
Ok::<_, ()>(())
}
}
impl<D: Debug> IntoTaskHandlerResult for Result<(), D> {
fn into_task_handler_result(self) -> Result<(), impl Debug> {
self
}
}
pub trait TaskHandler: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
const IDENTIFIER: &'static str;
fn definition() -> JobDefinition
where
Self: Sized,
{
JobDefinition::of::<Self>()
}
fn run(
self,
ctx: WorkerContext,
) -> impl Future<Output = impl IntoTaskHandlerResult> + Send + 'static;
}
pub async fn run_task_from_worker_ctx<T: TaskHandler>(
worker_context: WorkerContext,
) -> Result<(), String> {
let job = T::deserialize(worker_context.payload());
let Ok(job) = job else {
let e = job.err().unwrap();
return Err(format!("{e:?}"));
};
job.run(worker_context)
.await
.into_task_handler_result()
.map_err(|e| format!("{e:?}"))
}