mod hooks;
use std::sync::Arc;
use chrono::Utc;
use graphile_worker_job::Job;
use graphile_worker_lifecycle_hooks::JobFetchContext;
use tracing::{error, trace};
use self::hooks::run_job_with_hooks;
use super::errors::ProcessJobError;
use super::release::release_job;
use super::WorkerRunner;
use crate::streams::job_signal::JobSignalSource;
use graphile_worker_queries::get_job::get_job;
pub(super) async fn process_one_job(
worker: &WorkerRunner,
source: JobSignalSource,
) -> Result<Option<Job>, ProcessJobError> {
let now = worker.use_local_time.then(Utc::now);
let task_details_guard = worker.task_details.read().await;
let job = get_job(
&worker.database,
&task_details_guard,
&worker.schema,
&worker.worker_id,
&worker.forbidden_flags,
now,
)
.await
.map_err(|e| {
error!("Could not get job : {:?}", e);
e
})?;
drop(task_details_guard);
match job {
Some(job) => {
let job = Arc::new(job);
if !worker.hooks.is_empty() {
worker
.hooks
.emit(JobFetchContext {
job: job.clone(),
worker_id: worker.worker_id.clone(),
})
.await;
}
run_and_release_job(job.clone(), worker, &source).await?;
Ok(Some(
Arc::try_unwrap(job).unwrap_or_else(|arc| (*arc).clone()),
))
}
None => {
trace!(source = ?source, "No job found");
Ok(None)
}
}
}
pub(super) async fn run_and_release_job(
job: Arc<Job>,
worker: &WorkerRunner,
source: &JobSignalSource,
) -> Result<(), ProcessJobError> {
let (job_result, duration) = run_job_with_hooks(job.clone(), worker, source).await;
release_job(job_result, job.clone(), worker, duration)
.await
.map_err(|e| {
error!("Release job error : {:?}", e);
e
})?;
Ok(())
}