pgqrs 0.15.3

A high-performance PostgreSQL-backed job queue for Rust applications
Documentation
use crate::error::Result;
use crate::stats::{QueueMetrics, SystemStats, WorkerHealthStats};
use crate::store::dialect::SqlDialect;
use crate::store::turso::dialect::TursoDialect;
use crate::store::turso::{format_turso_timestamp, parse_turso_timestamp};
use async_trait::async_trait;
use chrono::Utc;
use std::sync::Arc;
use turso::Database;

#[derive(Debug, Clone)]
pub struct TursoDbState {
    db: Arc<Database>,
}

impl TursoDbState {
    pub fn new(db: Arc<Database>) -> Self {
        Self { db }
    }

    fn map_queue_metrics_row(row: &turso::Row) -> Result<QueueMetrics> {
        Ok(QueueMetrics {
            name: row.get(0)?,
            total_messages: row.get(1)?,
            pending_messages: row.get(2)?,
            locked_messages: row.get(3)?,
            archived_messages: row.get(4)?,
            oldest_pending_message: row
                .get::<Option<String>>(5)?
                .map(|s| parse_turso_timestamp(&s))
                .transpose()?,
            newest_message: row
                .get::<Option<String>>(6)?
                .map(|s| parse_turso_timestamp(&s))
                .transpose()?,
        })
    }

    fn map_system_stats_row(row: &turso::Row) -> Result<SystemStats> {
        Ok(SystemStats {
            total_queues: row.get(0)?,
            total_workers: row.get(1)?,
            active_workers: row.get(2)?,
            total_messages: row.get(3)?,
            pending_messages: row.get(4)?,
            locked_messages: row.get(5)?,
            archived_messages: row.get(6)?,
            schema_version: row.get(7)?,
        })
    }

    fn map_worker_health_row(row: &turso::Row) -> Result<WorkerHealthStats> {
        Ok(WorkerHealthStats {
            queue_name: row.get(0)?,
            total_workers: row.get(1)?,
            ready_workers: row.get(2)?,
            polling_workers: row.get(3)?,
            interrupted_workers: row.get(4)?,
            suspended_workers: row.get(5)?,
            stopped_workers: row.get(6)?,
            stale_workers: row.get(7)?,
        })
    }
}

#[async_trait]
impl crate::store::DbStateTable for TursoDbState {
    async fn verify(&self) -> Result<()> {
        let required_tables = [
            ("pgqrs_queues", "Queue repository table"),
            ("pgqrs_workers", "Worker repository table"),
            ("pgqrs_messages", "Unified messages table"),
        ];

        for (table_name, description) in &required_tables {
            let table_exists: bool =
                crate::store::turso::query_scalar(TursoDialect::DB_STATE.check_table_exists)
                    .bind(*table_name)
                    .fetch_one(&self.db)
                    .await?;

            if !table_exists {
                return Err(crate::error::Error::SchemaValidation {
                    message: format!("{} ('{}') does not exist", description, table_name),
                });
            }
        }

        let orphaned_messages: i64 =
            crate::store::turso::query_scalar(TursoDialect::DB_STATE.check_orphaned_messages)
                .fetch_one(&self.db)
                .await?;
        if orphaned_messages > 0 {
            return Err(crate::error::Error::SchemaValidation {
                message: format!(
                    "Found {} messages with invalid queue_id references",
                    orphaned_messages
                ),
            });
        }

        let orphaned_message_workers: i64 = crate::store::turso::query_scalar(
            TursoDialect::DB_STATE.check_orphaned_message_workers,
        )
        .fetch_one(&self.db)
        .await?;
        if orphaned_message_workers > 0 {
            return Err(crate::error::Error::SchemaValidation {
                message: format!(
                    "Found {} messages with invalid worker refs",
                    orphaned_message_workers
                ),
            });
        }

        Ok(())
    }

    async fn purge_queue(&self, queue_id: i64) -> Result<()> {
        let conn = self
            .db
            .connect()
            .map_err(|e| crate::error::Error::Internal {
                message: e.to_string(),
            })?;
        conn.execute("BEGIN", ()).await?;
        if let Err(e) = crate::store::turso::query(TursoDialect::DB_STATE.purge_queue_messages)
            .bind(queue_id)
            .execute_once_on_connection(&conn)
            .await
        {
            let _ = conn.execute("ROLLBACK", ()).await;
            return Err(e);
        }
        if let Err(e) = crate::store::turso::query(TursoDialect::DB_STATE.purge_queue_workers)
            .bind(queue_id)
            .execute_once_on_connection(&conn)
            .await
        {
            let _ = conn.execute("ROLLBACK", ()).await;
            return Err(e);
        }
        conn.execute("COMMIT", ()).await?;
        Ok(())
    }

    async fn queue_metrics(&self, queue_id: i64) -> Result<QueueMetrics> {
        let row = crate::store::turso::query(TursoDialect::DB_STATE.queue_metrics)
            .bind(queue_id)
            .fetch_one(&self.db)
            .await?;
        Self::map_queue_metrics_row(&row)
    }

    async fn all_queues_metrics(&self) -> Result<Vec<QueueMetrics>> {
        let rows = crate::store::turso::query(TursoDialect::DB_STATE.all_queues_metrics)
            .fetch_all(&self.db)
            .await?;
        let mut metrics = Vec::with_capacity(rows.len());
        for row in rows {
            metrics.push(Self::map_queue_metrics_row(&row)?);
        }
        Ok(metrics)
    }

    async fn system_stats(&self) -> Result<SystemStats> {
        let row = crate::store::turso::query(TursoDialect::DB_STATE.system_stats)
            .fetch_one(&self.db)
            .await?;
        Self::map_system_stats_row(&row)
    }

    async fn worker_health_stats(
        &self,
        heartbeat_timeout: chrono::Duration,
        group_by_queue: bool,
    ) -> Result<Vec<WorkerHealthStats>> {
        let threshold = Utc::now() - heartbeat_timeout;
        let query = if group_by_queue {
            TursoDialect::DB_STATE.worker_health_by_queue
        } else {
            TursoDialect::DB_STATE.worker_health_global
        };
        let rows = crate::store::turso::query(query)
            .bind(format_turso_timestamp(&threshold))
            .fetch_all(&self.db)
            .await?;
        let mut stats = Vec::with_capacity(rows.len());
        for row in rows {
            stats.push(Self::map_worker_health_row(&row)?);
        }
        Ok(stats)
    }

    async fn purge_old_workers(&self, older_than: chrono::Duration) -> Result<u64> {
        let threshold = Utc::now() - older_than;
        let rows = crate::store::turso::query(TursoDialect::DB_STATE.purge_old_workers)
            .bind(format_turso_timestamp(&threshold))
            .fetch_all(&self.db)
            .await?;
        if rows.is_empty() {
            return Ok(0);
        }
        let mut ids = Vec::with_capacity(rows.len());
        for row in rows {
            ids.push(row.get::<i64>(0)?);
        }
        let conn = self
            .db
            .connect()
            .map_err(|e| crate::error::Error::Internal {
                message: e.to_string(),
            })?;
        conn.execute("BEGIN", ())
            .await
            .map_err(|e| crate::error::Error::QueryFailed {
                query: "BEGIN".into(),
                source: Box::new(e),
                context: "Turso purge old workers".into(),
            })?;
        for id in &ids {
            if let Err(e) = conn
                .execute(
                    "DELETE FROM pgqrs_workers WHERE id = ?",
                    vec![turso::Value::Integer(*id)],
                )
                .await
            {
                let _ = conn.execute("ROLLBACK", ()).await;
                return Err(crate::error::Error::QueryFailed {
                    query: "DELETE_OLD_WORKER".into(),
                    source: Box::new(e),
                    context: "Turso purge old workers".into(),
                });
            }
        }
        conn.execute("COMMIT", ())
            .await
            .map_err(|e| crate::error::Error::QueryFailed {
                query: "COMMIT".into(),
                source: Box::new(e),
                context: "Turso purge old workers".into(),
            })?;
        Ok(ids.len() as u64)
    }
}