use crate::error::Result;
use crate::types::QueueRecord;
use async_trait::async_trait;
use sqlx::PgPool;
const INSERT_QUEUE: &str = r#"
INSERT INTO pgqrs_queues (queue_name)
VALUES ($1)
RETURNING id, queue_name, created_at;
"#;
const GET_QUEUE_BY_ID: &str = r#"
SELECT id, queue_name, created_at
FROM pgqrs_queues
WHERE id = $1;
"#;
const GET_QUEUE_BY_NAME: &str = r#"
SELECT id, queue_name, created_at
FROM pgqrs_queues
WHERE queue_name = $1;
"#;
const LIST_ALL_QUEUES: &str = r#"
SELECT id, queue_name, created_at
FROM pgqrs_queues
ORDER BY created_at DESC;
"#;
const DELETE_QUEUE_BY_ID: &str = r#"
DELETE FROM pgqrs_queues
WHERE id = $1;
"#;
const DELETE_QUEUE_BY_NAME: &str = r#"
DELETE FROM pgqrs_queues
WHERE queue_name = $1;
"#;
const CHECK_QUEUE_EXISTS: &str = r#"
SELECT EXISTS(SELECT 1 FROM pgqrs_queues WHERE queue_name = $1);
"#;
#[derive(Debug, Clone)]
pub struct Queues {
pub pool: PgPool,
}
impl Queues {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
pub async fn delete_by_name_tx<'a, 'b: 'a>(
name: &str,
tx: &'a mut sqlx::Transaction<'b, sqlx::Postgres>,
) -> Result<u64> {
let rows_affected = sqlx::query(DELETE_QUEUE_BY_NAME)
.bind(name)
.execute(&mut **tx)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: format!("DELETE_QUEUE_BY_NAME ({})", name),
source: Box::new(e),
context: format!("Failed to delete queue '{}' in transaction", name),
})?
.rows_affected();
Ok(rows_affected)
}
}
#[async_trait]
impl crate::store::QueueTable for Queues {
async fn insert(&self, data: crate::types::NewQueueRecord) -> Result<QueueRecord> {
let queue = sqlx::query_as::<_, QueueRecord>(INSERT_QUEUE)
.bind(&data.queue_name)
.fetch_one(&self.pool)
.await
.map_err(|e| {
if let sqlx::Error::Database(db_err) = &e {
if db_err.code().as_deref() == Some("23505") {
return crate::error::Error::QueueAlreadyExists {
name: data.queue_name.clone(),
};
}
}
crate::error::Error::QueryFailed {
query: format!("INSERT_QUEUE ({})", data.queue_name),
source: Box::new(e),
context: format!("Failed to create queue '{}'", data.queue_name),
}
})?;
Ok(queue)
}
async fn get(&self, id: i64) -> Result<QueueRecord> {
let queue = sqlx::query_as::<_, QueueRecord>(GET_QUEUE_BY_ID)
.bind(id)
.fetch_one(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: format!("GET_QUEUE_BY_ID ({})", id),
source: Box::new(e),
context: format!("Failed to get queue {}", id),
})?;
Ok(queue)
}
async fn list(&self) -> Result<Vec<QueueRecord>> {
let queues = sqlx::query_as::<_, QueueRecord>(LIST_ALL_QUEUES)
.fetch_all(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: "LIST_ALL_QUEUES".into(),
source: Box::new(e),
context: "Failed to list all queues".into(),
})?;
Ok(queues)
}
async fn count(&self) -> Result<i64> {
let query = "SELECT COUNT(*) FROM pgqrs_queues";
let count = sqlx::query_scalar(query)
.fetch_one(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: "COUNT_QUEUES".into(),
source: Box::new(e),
context: "Failed to count queues".into(),
})?;
Ok(count)
}
async fn delete(&self, id: i64) -> Result<u64> {
let rows_affected = sqlx::query(DELETE_QUEUE_BY_ID)
.bind(id)
.execute(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: format!("DELETE_QUEUE_BY_ID ({})", id),
source: Box::new(e),
context: format!("Failed to delete queue {}", id),
})?
.rows_affected();
Ok(rows_affected)
}
async fn get_by_name(&self, name: &str) -> Result<QueueRecord> {
let queue = sqlx::query_as::<_, QueueRecord>(GET_QUEUE_BY_NAME)
.bind(name)
.fetch_one(&self.pool)
.await
.map_err(|e| match e {
sqlx::Error::RowNotFound => crate::error::Error::QueueNotFound {
name: name.to_string(),
},
_ => crate::error::Error::QueryFailed {
query: format!("GET_QUEUE_BY_NAME ({})", name),
source: Box::new(e),
context: format!("Failed to get queue '{}'", name),
},
})?;
Ok(queue)
}
async fn exists(&self, name: &str) -> Result<bool> {
let exists: bool = sqlx::query_scalar(CHECK_QUEUE_EXISTS)
.bind(name)
.fetch_one(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: format!("CHECK_QUEUE_EXISTS ({})", name),
source: Box::new(e),
context: format!("Failed to check if queue '{}' exists", name),
})?;
Ok(exists)
}
async fn delete_by_name(&self, name: &str) -> Result<u64> {
let rows_affected = sqlx::query(DELETE_QUEUE_BY_NAME)
.bind(name)
.execute(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: format!("DELETE_QUEUE_BY_NAME ({})", name),
source: Box::new(e),
context: format!("Failed to delete queue '{}'", name),
})?
.rows_affected();
Ok(rows_affected)
}
}