#[cfg(feature = "tokio-postgres")]
pub mod tokio_pg {
use crate::error::AwaError;
use crate::insert::{prepare_row, prepare_row_raw, PreparedRow};
use crate::job::{InsertOpts, JobRow, JobState};
use crate::JobArgs;
use ::tokio_postgres::GenericClient;
const INSERT_SQL: &str = r#"
INSERT INTO awa.jobs
(kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
VALUES
($1, $2, $3, $4::text::awa.job_state, $5::smallint, $6::smallint,
COALESCE($7, now()), $8, $9::text[], $10, $11::text::bit(8))
RETURNING *, state::text AS state_str
"#;
pub async fn insert_job<C: GenericClient>(
client: &C,
args: &impl JobArgs,
) -> Result<JobRow, AwaError> {
insert_job_with(client, args, InsertOpts::default()).await
}
pub async fn insert_job_with<C: GenericClient>(
client: &C,
args: &impl JobArgs,
opts: InsertOpts,
) -> Result<JobRow, AwaError> {
let prepared = prepare_row(args, opts)?;
execute(client, &prepared).await
}
pub async fn insert_job_raw<C: GenericClient>(
client: &C,
kind: String,
args: serde_json::Value,
opts: InsertOpts,
) -> Result<JobRow, AwaError> {
let prepared = prepare_row_raw(kind, args, opts)?;
execute(client, &prepared).await
}
async fn execute<C: GenericClient>(
client: &C,
prepared: &PreparedRow,
) -> Result<JobRow, AwaError> {
let state_str = prepared.state.to_string();
let row = client
.query_one(
INSERT_SQL,
&[
&prepared.kind,
&prepared.queue,
&prepared.args,
&state_str,
&prepared.priority,
&prepared.max_attempts,
&prepared.run_at,
&prepared.metadata,
&prepared.tags,
&prepared.unique_key,
&prepared.unique_states,
],
)
.await
.map_err(|err| {
if let Some(db_err) = err.as_db_error() {
if db_err.code() == &::tokio_postgres::error::SqlState::UNIQUE_VIOLATION {
return AwaError::UniqueConflict {
constraint: db_err.constraint().map(|c| c.to_string()),
};
}
}
AwaError::TokioPg(err)
})?;
JobRow::try_from(&row)
}
fn parse_state(state_str: &str) -> Result<JobState, AwaError> {
state_str
.parse()
.map_err(|e: String| AwaError::Validation(e))
}
fn col<'a, T: ::tokio_postgres::types::FromSql<'a>>(
row: &'a ::tokio_postgres::Row,
name: &str,
) -> Result<T, AwaError> {
row.try_get(name)
.map_err(|e| AwaError::Validation(format!("failed to decode column {name}: {e}")))
}
impl TryFrom<&::tokio_postgres::Row> for JobRow {
type Error = AwaError;
fn try_from(row: &::tokio_postgres::Row) -> Result<Self, Self::Error> {
let state_str: String = col(row, "state_str")?;
Ok(JobRow {
id: col(row, "id")?,
kind: col(row, "kind")?,
queue: col(row, "queue")?,
args: col(row, "args")?,
state: parse_state(&state_str)?,
priority: col(row, "priority")?,
attempt: col(row, "attempt")?,
run_lease: col(row, "run_lease")?,
max_attempts: col(row, "max_attempts")?,
run_at: col(row, "run_at")?,
heartbeat_at: col(row, "heartbeat_at")?,
deadline_at: col(row, "deadline_at")?,
attempted_at: col(row, "attempted_at")?,
finalized_at: col(row, "finalized_at")?,
created_at: col(row, "created_at")?,
errors: col(row, "errors")?,
metadata: col(row, "metadata")?,
tags: col(row, "tags")?,
unique_key: col(row, "unique_key")?,
unique_states: None, callback_id: col(row, "callback_id")?,
callback_timeout_at: col(row, "callback_timeout_at")?,
callback_filter: col(row, "callback_filter")?,
callback_on_complete: col(row, "callback_on_complete")?,
callback_on_fail: col(row, "callback_on_fail")?,
callback_transform: col(row, "callback_transform")?,
progress: col(row, "progress")?,
})
}
}
}