use crate::error::Result;
use crate::{Job, SchemaConfig};
use chrono::{DateTime, Utc};
use postgres::{self, transaction::Transaction, GenericConnection};
use serde::Serialize;
use serde_json::value;
#[derive(Clone, Debug)]
pub struct Queue {
schema_config: SchemaConfig,
name: String,
}
impl Queue {
#[inline]
pub fn new(schema_config: SchemaConfig, name: String) -> Queue {
Queue {
schema_config,
name,
}
}
pub fn enqueue<A: Serialize, C: GenericConnection>(
&self,
class: &str,
args: A,
max_attempts: u32,
conn: &C,
) -> Result<u64> {
let args_json = value::to_value(args)?;
let rows = conn.query(
&format!(
"insert into {} (class, args, queue, max_attempts)
values ($1, $2, $3, $4) returning id",
self.schema_config.queue_table()
),
&[&class, &args_json, &self.name, &(max_attempts as i32)],
)?;
let id: i64 = rows.get(0).get(0);
Ok(id as u64)
}
pub(crate) fn reserve(&self, tx: &Transaction) -> Result<Option<Job>> {
let query = &format!(
r#"select
id, queue, args, class, max_attempts, created_at, run_next_at,
last_failed_at, last_error, error_count
from {}
where queue = $1 and run_next_at < now()
order by id
limit 1
for update skip locked"#,
self.schema_config.queue_table()
);
let rows = tx.query(query, &[&self.name])?;
if rows.is_empty() {
return Ok(None);
}
let row = rows.get(0);
let id: i64 = row.get(0);
let max_attempts: i32 = row.get(4);
let error_count: i32 = row.get(9);
Ok(Some(Job {
id: id as u64,
queue: row.get(1),
args: row.get(2),
class: row.get(3),
max_attempts: max_attempts as u32,
created_at: row.get(5),
run_next_at: row.get(6),
last_failed_at: row.get(7),
last_error: row.get(8),
error_count: error_count as u32,
}))
}
pub(crate) fn complete<C: GenericConnection>(
&self,
job: &Job,
conn: &C,
) -> Result<bool> {
let id = job.id as i64;
let res = conn.execute(
&format!(
"delete from {} where id = $1 and queue = $2",
self.schema_config.queue_table()
),
&[&id, &self.name],
)?;
Ok(res > 0)
}
pub(crate) fn mark_error<C: GenericConnection>(
&self,
job: &Job,
err_msg: &str,
run_next_at: DateTime<Utc>,
conn: &C,
) -> Result<bool> {
let id = job.id as i64;
let error_count = (job.error_count + 1) as i32;
let res = conn.execute(
&format!(
r#"update {}
set
last_failed_at = now(),
last_error = $3,
error_count = $4,
run_next_at = $5
where id = $1 and queue = $2"#,
self.schema_config.queue_table()
),
&[&id, &self.name, &err_msg, &error_count, &run_next_at],
)?;
Ok(res > 0)
}
pub(crate) fn fail<C: GenericConnection>(
&self,
job: &Job,
err_msg: &str,
conn: &C,
) -> Result<bool> {
let id = job.id as i64;
let error_count = (job.error_count + 1) as i32;
let res = conn.execute(
&format!(
r#"insert into {} (
id, class, args, queue, max_attempts, created_at, last_failed_at, last_error,
error_count
)
select id, class, args, queue, max_attempts, created_at, now(), $3, $4
from {}
where id = $1 and queue = $2"#,
self.schema_config.dead_letters_table(), self.schema_config.queue_table()),
&[&id, &self.name, &err_msg, &error_count],
)?;
if res == 0 {
return Ok(false);
}
self.complete(&job, conn)
}
pub fn len<C: GenericConnection>(&self, conn: &C) -> Result<usize> {
let res = conn.query(
&format!(
"select count(*) from {} where queue = $1",
self.schema_config.queue_table()
),
&[&self.name],
)?;
let count: i64 = res.get(0).get(0);
Ok(count as usize)
}
pub fn lookup_in_queue<C: GenericConnection>(
&self,
job_id: u64,
conn: &C,
) -> Result<Option<Job>> {
let query = &format!(
r#"select
id, queue, args, class, max_attempts, created_at, run_next_at,
last_failed_at, last_error, error_count
from {}
where id = $1 and queue = $2"#,
self.schema_config.queue_table()
);
let rows = conn.query(query, &[&(job_id as i64), &self.name])?;
if rows.is_empty() {
return Ok(None);
}
let row = rows.get(0);
let id: i64 = row.get(0);
let max_attempts: i32 = row.get(4);
let error_count: i32 = row.get(9);
Ok(Some(Job {
id: id as u64,
queue: row.get(1),
args: row.get(2),
class: row.get(3),
max_attempts: max_attempts as u32,
created_at: row.get(5),
run_next_at: Some(row.get(6)),
last_failed_at: row.get(7),
last_error: row.get(8),
error_count: error_count as u32,
}))
}
pub fn lookup_in_dead_letters<C: GenericConnection>(
&self,
job_id: u64,
conn: &C,
) -> Result<Option<Job>> {
let query = &format!(
r#"select
id, queue, args, class, max_attempts, created_at,
last_failed_at, last_error, error_count
from {}
where id = $1 and queue = $2"#,
self.schema_config.dead_letters_table()
);
let rows = conn.query(query, &[&(job_id as i64), &self.name])?;
if rows.is_empty() {
return Ok(None);
}
let row = rows.get(0);
let id: i64 = row.get(0);
let max_attempts: i32 = row.get(4);
let error_count: i32 = row.get(8);
Ok(Some(Job {
id: id as u64,
queue: row.get(1),
args: row.get(2),
class: row.get(3),
max_attempts: max_attempts as u32,
created_at: row.get(5),
run_next_at: None,
last_failed_at: row.get(6),
last_error: row.get(7),
error_count: error_count as u32,
}))
}
pub fn clear<C: GenericConnection>(&self, conn: &C) -> Result<()> {
conn.execute(
&format!(
"delete from {} where queue = $1",
self.schema_config.queue_table()
),
&[&self.name],
)?;
Ok(())
}
}