use crate::error::Result;
use crate::store::dialect::SqlDialect;
use crate::store::sqlite::dialect::SqliteDialect;
use crate::store::sqlite::parse_sqlite_timestamp;
use crate::types::QueueRecord;
use async_trait::async_trait;
use sqlx::{Row, SqlitePool};
#[derive(Debug, Clone)]
pub struct SqliteQueueTable {
pool: SqlitePool,
}
impl SqliteQueueTable {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
fn map_row(row: sqlx::sqlite::SqliteRow) -> Result<QueueRecord> {
let id: i64 = row.try_get("id")?;
let queue_name: String = row.try_get("queue_name")?;
let created_at_str: String = row.try_get("created_at")?;
let created_at = parse_sqlite_timestamp(&created_at_str)?;
Ok(QueueRecord {
id,
queue_name,
created_at,
})
}
}
#[async_trait]
impl crate::store::QueueTable for SqliteQueueTable {
async fn insert(&self, data: crate::types::NewQueueRecord) -> Result<QueueRecord> {
let row = sqlx::query(SqliteDialect::QUEUE.insert)
.bind(&data.queue_name)
.fetch_one(&self.pool)
.await
.map_err(|e| {
if let sqlx::Error::Database(db_err) = &e {
if let Some(code) = db_err.code() {
if code == "2067" || code == "1555" || code == "19" {
return crate::error::Error::QueueAlreadyExists {
name: data.queue_name.clone(),
};
}
}
}
crate::error::Error::QueryFailed {
query: format!("INSERT_QUEUE ({})", data.queue_name),
source: Box::new(e),
context: format!("Failed to create queue '{}'", data.queue_name),
}
})?;
Self::map_row(row)
}
async fn get(&self, id: i64) -> Result<QueueRecord> {
let row = sqlx::query(SqliteDialect::QUEUE.get)
.bind(id)
.fetch_one(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: format!("GET_QUEUE_BY_ID ({})", id),
source: Box::new(e),
context: format!("Failed to get queue {}", id),
})?;
Self::map_row(row)
}
async fn list(&self) -> Result<Vec<QueueRecord>> {
let rows = sqlx::query(SqliteDialect::QUEUE.list)
.fetch_all(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: "LIST_ALL_QUEUES".into(),
source: Box::new(e),
context: "Failed to list all queues".into(),
})?;
let mut queues = Vec::with_capacity(rows.len());
for row in rows {
queues.push(Self::map_row(row)?);
}
Ok(queues)
}
async fn count(&self) -> Result<i64> {
let query = "SELECT COUNT(*) FROM pgqrs_queues";
let count: i64 = sqlx::query_scalar(query)
.fetch_one(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: "COUNT_QUEUES".into(),
source: Box::new(e),
context: "Failed to count queues".into(),
})?;
Ok(count)
}
async fn delete(&self, id: i64) -> Result<u64> {
let result = sqlx::query(SqliteDialect::QUEUE.delete)
.bind(id)
.execute(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: format!("DELETE_QUEUE_BY_ID ({})", id),
source: Box::new(e),
context: format!("Failed to delete queue {}", id),
})?;
Ok(result.rows_affected())
}
async fn get_by_name(&self, name: &str) -> Result<QueueRecord> {
let row = sqlx::query(SqliteDialect::QUEUE.get_by_name)
.bind(name)
.fetch_one(&self.pool)
.await
.map_err(|e| match e {
sqlx::Error::RowNotFound => crate::error::Error::QueueNotFound {
name: name.to_string(),
},
_ => crate::error::Error::QueryFailed {
query: format!("GET_QUEUE_BY_NAME ({})", name),
source: Box::new(e),
context: format!("Failed to get queue '{}'", name),
},
})?;
Self::map_row(row)
}
async fn exists(&self, name: &str) -> Result<bool> {
let exists: bool = sqlx::query_scalar(SqliteDialect::QUEUE.exists)
.bind(name)
.fetch_one(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: format!("CHECK_QUEUE_EXISTS ({})", name),
source: Box::new(e),
context: format!("Failed to check if queue '{}' exists", name),
})?;
Ok(exists)
}
async fn delete_by_name(&self, name: &str) -> Result<u64> {
let result = sqlx::query(SqliteDialect::QUEUE.delete_by_name)
.bind(name)
.execute(&self.pool)
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: format!("DELETE_QUEUE_BY_NAME ({})", name),
source: Box::new(e),
context: format!("Failed to delete queue '{}'", name),
})?;
Ok(result.rows_affected())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::QueueTable;
use crate::types::NewQueueRecord;
async fn create_test_pool() -> SqlitePool {
let pool = SqlitePool::connect("sqlite::memory:")
.await
.expect("Failed to create pool");
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS pgqrs_queues (
id INTEGER PRIMARY KEY AUTOINCREMENT,
queue_name TEXT NOT NULL UNIQUE,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"#,
)
.execute(&pool)
.await
.expect("Failed to create queues table");
pool
}
#[tokio::test]
async fn test_queue_insert_and_get() {
let pool = create_test_pool().await;
let table = SqliteQueueTable::new(pool);
let queue = table
.insert(NewQueueRecord {
queue_name: "test_queue".to_string(),
})
.await
.expect("Failed to insert queue");
assert_eq!(queue.queue_name, "test_queue");
assert!(queue.id > 0);
let fetched = table.get(queue.id).await.expect("Failed to get queue");
assert_eq!(fetched.queue_name, "test_queue");
assert_eq!(fetched.id, queue.id);
}
#[tokio::test]
async fn test_queue_get_by_name() {
let pool = create_test_pool().await;
let table = SqliteQueueTable::new(pool);
table
.insert(NewQueueRecord {
queue_name: "named_queue".to_string(),
})
.await
.expect("Failed to insert queue");
let queue = table
.get_by_name("named_queue")
.await
.expect("Failed to get by name");
assert_eq!(queue.queue_name, "named_queue");
}
#[tokio::test]
async fn test_queue_list_and_count() {
let pool = create_test_pool().await;
let table = SqliteQueueTable::new(pool);
table
.insert(NewQueueRecord {
queue_name: "queue1".to_string(),
})
.await
.expect("Failed to insert");
table
.insert(NewQueueRecord {
queue_name: "queue2".to_string(),
})
.await
.expect("Failed to insert");
let queues = table.list().await.expect("Failed to list");
assert!(queues.len() >= 2);
let count = table.count().await.expect("Failed to count");
assert!(count >= 2);
}
#[tokio::test]
async fn test_queue_exists() {
let pool = create_test_pool().await;
let table = SqliteQueueTable::new(pool);
table
.insert(NewQueueRecord {
queue_name: "exists_test".to_string(),
})
.await
.expect("Failed to insert");
let exists = table
.exists("exists_test")
.await
.expect("Failed to check exists");
assert!(exists);
let not_exists = table
.exists("nonexistent")
.await
.expect("Failed to check exists");
assert!(!not_exists);
}
#[tokio::test]
async fn test_queue_delete() {
let pool = create_test_pool().await;
let table = SqliteQueueTable::new(pool);
let queue = table
.insert(NewQueueRecord {
queue_name: "delete_test".to_string(),
})
.await
.expect("Failed to insert");
let deleted = table.delete(queue.id).await.expect("Failed to delete");
assert_eq!(deleted, 1);
let exists = table
.exists("delete_test")
.await
.expect("Failed to check exists");
assert!(!exists);
}
#[tokio::test]
async fn test_queue_delete_by_name() {
let pool = create_test_pool().await;
let table = SqliteQueueTable::new(pool);
table
.insert(NewQueueRecord {
queue_name: "delete_by_name_test".to_string(),
})
.await
.expect("Failed to insert");
let deleted = table
.delete_by_name("delete_by_name_test")
.await
.expect("Failed to delete");
assert_eq!(deleted, 1);
}
}