use crate::error::{EnqueueError, Error, PerformError};
use crate::job::Job;
use sqlx::prelude::*;
use sqlx::Postgres;
#[derive(FromRow)]
pub struct BackgroundJob {
pub id: i64,
pub job_type: String,
pub data: serde_json::Value,
}
pub async fn migrate(pool: impl Acquire<'_, Database = Postgres>) -> Result<(), Error> {
sqlx::migrate!("./migrations")
.run(pool)
.await
.map_err(Into::into)
}
#[cfg(feature = "analyze")]
pub async fn enqueue_job<T: Job + Send>(
conn: impl Executor<'_, Database = Postgres>,
job: T,
) -> Result<(), EnqueueError> {
let res = sqlx::query_as::<_, (sqlx::types::Json<serde_json::Value>,)>("EXPLAIN (FORMAT JSON, ANALYZE, BUFFERS) INSERT INTO _background_tasks (job_type, data) VALUES ($1, $2)")
.bind(T::JOB_TYPE)
.bind(sqlx::types::Json(job))
.fetch_one(conn)
.await?;
log::debug!(
"EXPLAIN/ANALYZE {}",
serde_json::to_string_pretty(&res.0 .0).unwrap()
);
Ok(())
}
#[cfg(not(feature = "analyze"))]
pub async fn enqueue_job<T: Job + Send>(
conn: impl Executor<'_, Database = Postgres>,
job: T,
) -> Result<(), EnqueueError> {
sqlx::query("INSERT INTO _background_tasks (job_type, data) VALUES ($1, $2)")
.bind(T::JOB_TYPE)
.bind(sqlx::types::Json(job))
.execute(conn)
.await?;
Ok(())
}
pub async fn enqueue_jobs_batch<T: Job + Send>(
conn: &mut sqlx::PgConnection,
jobs: Vec<T>,
) -> Result<(), EnqueueError> {
let mut batch = crate::batch::Batch::new(
"jobs",
r#"INSERT INTO "_background_tasks" (
job_type, data
) VALUES
"#,
r#""#,
);
for job in jobs.into_iter() {
batch.reserve(3)?;
if batch.current_num_arguments() > 0 {
batch.append(",");
}
batch.append("(");
batch.bind(T::JOB_TYPE)?;
batch.append(",");
batch.bind(sqlx::types::Json(job))?;
batch.append(")");
}
batch.execute(conn).await?;
Ok(())
}
pub async fn find_next_unlocked_job(
conn: impl Executor<'_, Database = Postgres>,
) -> Result<Option<BackgroundJob>, sqlx::Error> {
sqlx::query_as::<_, BackgroundJob>(
"SELECT id, job_type, data
FROM _background_tasks
ORDER BY id FOR UPDATE SKIP LOCKED",
)
.fetch_optional(conn)
.await
.map_err(Into::into)
}
pub async fn delete_successful_job(
conn: impl Executor<'_, Database = Postgres>,
id: i64,
) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM _background_tasks WHERE id=$1")
.bind(id)
.execute(conn)
.await?;
Ok(())
}
pub async fn update_failed_job(
conn: impl Executor<'_, Database = Postgres>,
id: i64,
) -> Result<(), PerformError> {
sqlx::query(
"UPDATE _background_tasks SET retries = retries + 1, last_retry = NOW() WHERE id = $1",
)
.bind(id)
.execute(conn)
.await?;
Ok(())
}
#[cfg(any(test, feature = "test_components"))]
pub async fn failed_job_count(
conn: impl Executor<'_, Database = Postgres>,
) -> Result<i64, sqlx::Error> {
let count =
sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM _background_tasks WHERE retries > 0")
.fetch_one(conn)
.await?;
Ok(count.0)
}