use chrono::{DateTime, Utc};
use es_entity::clock::ClockHandle;
use serde::Serialize;
use std::{marker::PhantomData, sync::Arc};
use tracing::instrument;
use super::{
Job, JobId,
entity::{JobType, NewJob},
error::JobError,
repo::JobRepo,
};
pub struct JobSpec<Config> {
pub id: JobId,
pub config: Config,
pub schedule_at: Option<DateTime<Utc>>,
pub queue_id: Option<String>,
}
impl<Config> JobSpec<Config> {
pub fn new(id: impl Into<JobId>, config: Config) -> Self {
Self {
id: id.into(),
config,
schedule_at: None,
queue_id: None,
}
}
pub fn schedule_at(mut self, schedule_at: DateTime<Utc>) -> Self {
self.schedule_at = Some(schedule_at);
self
}
pub fn queue_id(mut self, queue_id: impl Into<String>) -> Self {
self.queue_id = Some(queue_id.into());
self
}
}
#[derive(Clone)]
pub struct JobSpawner<Config> {
repo: Arc<JobRepo>,
job_type: JobType,
clock: ClockHandle,
_phantom: PhantomData<Config>,
}
impl<Config> JobSpawner<Config>
where
Config: Serialize + Send + Sync,
{
pub(crate) fn new(repo: Arc<JobRepo>, job_type: JobType, clock: ClockHandle) -> Self {
Self {
repo,
job_type,
clock,
_phantom: PhantomData,
}
}
pub fn job_type(&self) -> &JobType {
&self.job_type
}
#[instrument(
name = "job_spawner.spawn",
skip(self, config),
fields(job_type = %self.job_type),
err
)]
pub async fn spawn(
&self,
id: impl Into<JobId> + std::fmt::Debug,
config: Config,
) -> Result<Job, JobError> {
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
let job = self.spawn_in_op(&mut op, id, config).await?;
op.commit().await?;
Ok(job)
}
#[instrument(
name = "job_spawner.spawn_in_op",
skip(self, op, config),
fields(job_type = %self.job_type),
err
)]
pub async fn spawn_in_op(
&self,
op: &mut impl es_entity::AtomicOperation,
id: impl Into<JobId> + std::fmt::Debug,
config: Config,
) -> Result<Job, JobError> {
let schedule_at = op.maybe_now().unwrap_or_else(|| self.clock.now());
self.spawn_at_in_op(op, id, config, schedule_at).await
}
#[instrument(
name = "job_spawner.spawn_at",
skip(self, config),
fields(job_type = %self.job_type),
err
)]
pub async fn spawn_at(
&self,
id: impl Into<JobId> + std::fmt::Debug,
config: Config,
schedule_at: DateTime<Utc>,
) -> Result<Job, JobError> {
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
let job = self
.spawn_at_in_op(&mut op, id, config, schedule_at)
.await?;
op.commit().await?;
Ok(job)
}
#[instrument(
name = "job_spawner.spawn_at_in_op",
skip(self, op, config),
fields(job_type = %self.job_type),
err
)]
pub async fn spawn_at_in_op(
&self,
op: &mut impl es_entity::AtomicOperation,
id: impl Into<JobId> + std::fmt::Debug,
config: Config,
schedule_at: DateTime<Utc>,
) -> Result<Job, JobError> {
self.create_job_internal(op, id.into(), config, schedule_at, None)
.await
}
#[instrument(
name = "job_spawner.spawn_with_queue_id",
skip(self, config, queue_id),
fields(job_type = %self.job_type),
err
)]
pub async fn spawn_with_queue_id(
&self,
id: impl Into<JobId> + std::fmt::Debug,
config: Config,
queue_id: impl Into<String> + Send,
) -> Result<Job, JobError> {
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
let job = self
.spawn_with_queue_id_in_op(&mut op, id, config, queue_id)
.await?;
op.commit().await?;
Ok(job)
}
#[instrument(
name = "job_spawner.spawn_with_queue_id_in_op",
skip(self, op, config, queue_id),
fields(job_type = %self.job_type),
err
)]
pub async fn spawn_with_queue_id_in_op(
&self,
op: &mut impl es_entity::AtomicOperation,
id: impl Into<JobId> + std::fmt::Debug,
config: Config,
queue_id: impl Into<String> + Send,
) -> Result<Job, JobError> {
let schedule_at = op.maybe_now().unwrap_or_else(|| self.clock.now());
self.spawn_at_with_queue_id_in_op(op, id, config, schedule_at, queue_id)
.await
}
#[instrument(
name = "job_spawner.spawn_at_with_queue_id",
skip(self, config, queue_id),
fields(job_type = %self.job_type),
err
)]
pub async fn spawn_at_with_queue_id(
&self,
id: impl Into<JobId> + std::fmt::Debug,
config: Config,
schedule_at: DateTime<Utc>,
queue_id: impl Into<String> + Send,
) -> Result<Job, JobError> {
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
let job = self
.spawn_at_with_queue_id_in_op(&mut op, id, config, schedule_at, queue_id)
.await?;
op.commit().await?;
Ok(job)
}
#[instrument(
name = "job_spawner.spawn_at_with_queue_id_in_op",
skip(self, op, config, queue_id),
fields(job_type = %self.job_type),
err
)]
pub async fn spawn_at_with_queue_id_in_op(
&self,
op: &mut impl es_entity::AtomicOperation,
id: impl Into<JobId> + std::fmt::Debug,
config: Config,
schedule_at: DateTime<Utc>,
queue_id: impl Into<String> + Send,
) -> Result<Job, JobError> {
self.create_job_internal(op, id.into(), config, schedule_at, Some(queue_id.into()))
.await
}
#[instrument(
name = "job_spawner.spawn_all",
skip(self, specs),
fields(job_type = %self.job_type),
err
)]
pub async fn spawn_all(&self, specs: Vec<JobSpec<Config>>) -> Result<Vec<Job>, JobError> {
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
let jobs = self.spawn_all_in_op(&mut op, specs).await?;
op.commit().await?;
Ok(jobs)
}
#[instrument(
name = "job_spawner.spawn_all_in_op",
skip(self, op, specs),
fields(job_type = %self.job_type, count),
err
)]
pub async fn spawn_all_in_op(
&self,
op: &mut impl es_entity::AtomicOperation,
specs: Vec<JobSpec<Config>>,
) -> Result<Vec<Job>, JobError> {
tracing::Span::current().record("count", specs.len());
if specs.is_empty() {
return Ok(Vec::new());
}
let default_schedule_at = op.maybe_now().unwrap_or_else(|| self.clock.now());
let mut new_jobs = Vec::with_capacity(specs.len());
let mut schedule_times = Vec::with_capacity(specs.len());
let mut queue_ids: Vec<Option<String>> = Vec::with_capacity(specs.len());
for spec in specs {
schedule_times.push(spec.schedule_at.unwrap_or(default_schedule_at));
queue_ids.push(spec.queue_id);
let new_job = NewJob::builder()
.id(spec.id)
.unique_per_type(false)
.job_type(self.job_type.clone())
.config(spec.config)?
.tracing_context(es_entity::context::TracingContext::current())
.build()
.expect("Could not build new job");
new_jobs.push(new_job);
}
let mut jobs = self.repo.create_all_in_op(op, new_jobs).await?;
let ids: Vec<JobId> = jobs.iter().map(|j| j.id).collect();
sqlx::query(
r#"
INSERT INTO job_executions (id, job_type, queue_id, execute_at, alive_at, created_at)
SELECT unnested.id, $2, unnested.queue_id, unnested.execute_at,
COALESCE($5, NOW()), COALESCE($5, NOW())
FROM UNNEST($1::uuid[], $3::text[], $4::timestamptz[])
AS unnested(id, queue_id, execute_at)
"#,
)
.bind(&ids)
.bind(&self.job_type)
.bind(&queue_ids)
.bind(&schedule_times)
.bind(op.maybe_now())
.execute(op.as_executor())
.await?;
for (job, schedule_at) in jobs.iter_mut().zip(&schedule_times) {
job.schedule_execution(*schedule_at);
}
self.repo.update_all_in_op(op, &mut jobs).await?;
Ok(jobs)
}
#[instrument(
name = "job_spawner.spawn_unique",
skip(self, config),
fields(job_type = %self.job_type),
err
)]
pub async fn spawn_unique(
self,
id: impl Into<JobId> + std::fmt::Debug,
config: Config,
) -> Result<(), JobError> {
let new_job = NewJob::builder()
.id(id)
.unique_per_type(true)
.job_type(self.job_type.clone())
.config(config)?
.tracing_context(es_entity::context::TracingContext::current())
.build()
.expect("Could not build new job");
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
match self.repo.create_in_op(&mut op, new_job).await {
Ok(mut job) => {
let schedule_at = op.maybe_now().unwrap_or_else(|| self.clock.now());
self.insert_execution(&mut op, &mut job, schedule_at, None)
.await?;
op.commit().await?;
}
Err(e) if e.was_duplicate() => {}
Err(e) => return Err(e.into()),
}
Ok(())
}
#[instrument(name = "job.create_internal", skip(self, op, config), fields(job_type = %self.job_type), err)]
async fn create_job_internal<C: Serialize + Send>(
&self,
op: &mut impl es_entity::AtomicOperation,
id: JobId,
config: C,
schedule_at: DateTime<Utc>,
queue_id: Option<String>,
) -> Result<Job, JobError> {
let new_job = NewJob::builder()
.id(id)
.unique_per_type(false)
.job_type(self.job_type.clone())
.config(config)?
.tracing_context(es_entity::context::TracingContext::current())
.build()
.expect("Could not build new job");
let mut job = self.repo.create_in_op(op, new_job).await?;
self.insert_execution(op, &mut job, schedule_at, queue_id.as_deref())
.await?;
Ok(job)
}
#[instrument(name = "job.insert_execution", skip_all, err)]
async fn insert_execution(
&self,
op: &mut impl es_entity::AtomicOperation,
job: &mut Job,
schedule_at: DateTime<Utc>,
queue_id: Option<&str>,
) -> Result<(), JobError> {
sqlx::query!(
r#"
INSERT INTO job_executions (id, job_type, queue_id, execute_at, alive_at, created_at)
VALUES ($1, $2, $3, $4, COALESCE($5, NOW()), COALESCE($5, NOW()))
"#,
job.id as JobId,
&job.job_type as &JobType,
queue_id,
schedule_at,
op.maybe_now()
)
.execute(op.as_executor())
.await?;
job.schedule_execution(schedule_at);
self.repo.update_in_op(op, job).await?;
Ok(())
}
}