use crate::errors::GraphileWorkerError;
use crate::sql::add_job::RawJobSpec;
use crate::worker_utils::{CleanupTask, RescheduleJobOptions, WorkerUtils};
use crate::{DbJob, Job, JobSpec, TaskHandler};
use graphile_worker_ctx::WorkerContext;
use graphile_worker_migrations::MigrateError;
use serde::Serialize;
pub trait WorkerContextExt {
fn utils(&self) -> WorkerUtils;
fn add_job<T: TaskHandler + 'static>(
&self,
payload: T,
spec: JobSpec,
) -> impl core::future::Future<Output = Result<Job, GraphileWorkerError>> + Send;
fn add_raw_job<P: Serialize + Send + 'static>(
&self,
identifier: &str,
payload: P,
spec: JobSpec,
) -> impl core::future::Future<Output = Result<Job, GraphileWorkerError>> + Send;
fn add_jobs<T: TaskHandler + Clone + 'static>(
&self,
jobs: &[(T, &JobSpec)],
) -> impl core::future::Future<Output = Result<Vec<Job>, GraphileWorkerError>> + Send;
fn add_raw_jobs(
&self,
jobs: &[RawJobSpec],
) -> impl core::future::Future<Output = Result<Vec<Job>, GraphileWorkerError>> + Send;
fn remove_job(
&self,
job_key: &str,
) -> impl core::future::Future<Output = Result<(), GraphileWorkerError>> + Send;
fn complete_jobs(
&self,
ids: &[i64],
) -> impl core::future::Future<Output = Result<Vec<DbJob>, GraphileWorkerError>> + Send;
fn permanently_fail_jobs(
&self,
ids: &[i64],
reason: &str,
) -> impl core::future::Future<Output = Result<Vec<DbJob>, GraphileWorkerError>> + Send;
fn reschedule_jobs(
&self,
ids: &[i64],
options: RescheduleJobOptions,
) -> impl core::future::Future<Output = Result<Vec<DbJob>, GraphileWorkerError>> + Send;
fn force_unlock_workers(
&self,
worker_ids: &[&str],
) -> impl core::future::Future<Output = Result<(), GraphileWorkerError>> + Send;
fn cleanup(
&self,
tasks: &[CleanupTask],
) -> impl core::future::Future<Output = Result<(), GraphileWorkerError>> + Send;
fn migrate(&self) -> impl core::future::Future<Output = Result<(), MigrateError>> + Send;
}
impl WorkerContextExt for WorkerContext {
fn utils(&self) -> WorkerUtils {
WorkerUtils::new(self.pg_pool().clone(), self.escaped_schema().to_string())
.with_use_local_time(*self.use_local_time())
.with_task_details(self.task_details().clone())
}
async fn add_job<T: TaskHandler + 'static>(
&self,
payload: T,
spec: JobSpec,
) -> Result<Job, GraphileWorkerError> {
self.utils().add_job(payload, spec).await
}
async fn add_raw_job<P: Serialize + Send + 'static>(
&self,
identifier: &str,
payload: P,
spec: JobSpec,
) -> Result<Job, GraphileWorkerError> {
self.utils().add_raw_job(identifier, payload, spec).await
}
async fn add_jobs<T: TaskHandler + Clone + 'static>(
&self,
jobs: &[(T, &JobSpec)],
) -> Result<Vec<Job>, GraphileWorkerError> {
self.utils().add_jobs(jobs).await
}
async fn add_raw_jobs(&self, jobs: &[RawJobSpec]) -> Result<Vec<Job>, GraphileWorkerError> {
self.utils().add_raw_jobs(jobs).await
}
async fn remove_job(&self, job_key: &str) -> Result<(), GraphileWorkerError> {
self.utils().remove_job(job_key).await
}
async fn complete_jobs(&self, ids: &[i64]) -> Result<Vec<DbJob>, GraphileWorkerError> {
self.utils().complete_jobs(ids).await
}
async fn permanently_fail_jobs(
&self,
ids: &[i64],
reason: &str,
) -> Result<Vec<DbJob>, GraphileWorkerError> {
self.utils().permanently_fail_jobs(ids, reason).await
}
async fn reschedule_jobs(
&self,
ids: &[i64],
options: RescheduleJobOptions,
) -> Result<Vec<DbJob>, GraphileWorkerError> {
self.utils().reschedule_jobs(ids, options).await
}
async fn force_unlock_workers(&self, worker_ids: &[&str]) -> Result<(), GraphileWorkerError> {
self.utils().force_unlock_workers(worker_ids).await
}
async fn cleanup(&self, tasks: &[CleanupTask]) -> Result<(), GraphileWorkerError> {
self.utils().cleanup(tasks).await
}
async fn migrate(&self) -> Result<(), MigrateError> {
self.utils().migrate().await
}
}