use crate::errors::EnqueueError;
use futures_util::FutureExt;
use futures_util::future::BoxFuture;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::Value;
use sqlx::PgPool;
use std::future::Future;
use tracing::instrument;
pub const DEFAULT_QUEUE: &str = "default";
pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
const JOB_NAME: &'static str;
const PRIORITY: i16 = 0;
const DEDUPLICATED: bool = false;
const QUEUE: &'static str = DEFAULT_QUEUE;
type Context: Clone + Send + 'static;
fn run(&self, ctx: Self::Context) -> impl Future<Output = anyhow::Result<()>> + Send;
#[instrument(name = "workers.enqueue", skip(self, pool), fields(message = Self::JOB_NAME))]
fn enqueue<'a>(&'a self, pool: &'a PgPool) -> BoxFuture<'a, Result<Option<i64>, EnqueueError>> {
let data = match serde_json::to_value(self) {
Ok(data) => data,
Err(err) => return async move { Err(EnqueueError::SerializationError(err)) }.boxed(),
};
let priority = Self::PRIORITY;
if Self::DEDUPLICATED {
let future = enqueue_deduplicated(pool, Self::JOB_NAME, data, priority);
future.boxed()
} else {
let future = enqueue_simple(pool, Self::JOB_NAME, data, priority);
async move { Ok(Some(future.await?)) }.boxed()
}
}
}
fn enqueue_deduplicated<'a>(
pool: &'a PgPool,
job_type: &'a str,
data: Value,
priority: i16,
) -> BoxFuture<'a, Result<Option<i64>, EnqueueError>> {
async move {
let result = sqlx::query_scalar::<_, Option<i64>>(
r"
INSERT INTO background_jobs (job_type, data, priority)
SELECT $1, $2, $3
WHERE NOT EXISTS (
SELECT 1 FROM background_jobs
WHERE job_type = $1 AND data = $2 AND priority = $3
FOR UPDATE SKIP LOCKED
)
RETURNING id
",
)
.bind(job_type)
.bind(data)
.bind(priority)
.fetch_optional(pool)
.await?;
Ok(result.flatten())
}
.boxed()
}
fn enqueue_simple<'a>(
pool: &'a PgPool,
job_type: &'a str,
data: Value,
priority: i16,
) -> BoxFuture<'a, Result<i64, EnqueueError>> {
async move {
let id = sqlx::query_scalar::<_, i64>(
"INSERT INTO background_jobs (job_type, data, priority) VALUES ($1, $2, $3) RETURNING id"
)
.bind(job_type)
.bind(data)
.bind(priority)
.fetch_one(pool)
.await?;
Ok(id)
}
.boxed()
}