pgqrs 0.15.0

A high-performance PostgreSQL-backed job queue for Rust applications
Documentation
use crate::error::Result;
use crate::store::turso::{format_turso_timestamp, parse_turso_timestamp};
use crate::types::{NewWorkflowRecord, WorkflowRecord};
use async_trait::async_trait;
use chrono::Utc;
use std::sync::Arc;
use turso::Database;

#[derive(Debug, Clone)]
pub struct TursoWorkflowTable {
    db: Arc<Database>,
}

impl TursoWorkflowTable {
    pub fn new(db: Arc<Database>) -> Self {
        Self { db }
    }

    fn map_row(row: &turso::Row) -> Result<WorkflowRecord> {
        let id: i64 = row.get(0)?;
        let name: String = row.get(1)?;
        let queue_id: i64 = row.get(2)?;
        let created_at = parse_turso_timestamp(&row.get::<String>(3)?)?;

        Ok(WorkflowRecord {
            id,
            name,
            queue_id,
            created_at,
        })
    }

    pub async fn complete_workflow(&self, id: i64, output: serde_json::Value) -> Result<()> {
        let output_str = output.to_string();
        let now = Utc::now();
        let now_str = format_turso_timestamp(&now);

        crate::store::turso::query(
            r#"
            UPDATE pgqrs_workflows
            SET output = ?, updated_at = ?
            WHERE id = ?
            "#,
        )
        .bind(output_str)
        .bind(now_str)
        .bind(id)
        .execute_once(&self.db)
        .await?;

        Ok(())
    }

    pub async fn fail_workflow(&self, id: i64, error: serde_json::Value) -> Result<()> {
        let error_str = error.to_string();
        let now = Utc::now();
        let now_str = format_turso_timestamp(&now);

        crate::store::turso::query(
            r#"
            UPDATE pgqrs_workflows
            SET error = ?, updated_at = ?
            WHERE id = ?
            "#,
        )
        .bind(error_str)
        .bind(now_str)
        .bind(id)
        .execute_once(&self.db)
        .await?;

        Ok(())
    }
}

#[async_trait]
impl crate::store::WorkflowTable for TursoWorkflowTable {
    async fn get_by_name(&self, name: &str) -> Result<WorkflowRecord> {
        let row = crate::store::turso::query(
            r#"
            SELECT id, name, queue_id, created_at
            FROM pgqrs_workflows
            WHERE name = $1
            "#,
        )
        .bind(name)
        .fetch_one(&self.db)
        .await?;

        Self::map_row(&row)
    }

    async fn insert(&self, data: NewWorkflowRecord) -> Result<WorkflowRecord> {
        let now = Utc::now();
        let now_str = format_turso_timestamp(&now);

        let row = crate::store::turso::query(
            r#"
            INSERT INTO pgqrs_workflows (name, queue_id, created_at)
            VALUES (?, ?, ?)
            RETURNING id, name, queue_id, created_at
            "#,
        )
        .bind(data.name.as_str())
        .bind(data.queue_id)
        .bind(now_str)
        .fetch_one_once(&self.db)
        .await?;

        Self::map_row(&row)
    }

    async fn get(&self, id: i64) -> Result<WorkflowRecord> {
        let row = crate::store::turso::query(
            r#"
            SELECT id, name, queue_id, created_at
            FROM pgqrs_workflows
            WHERE id = ?
            "#,
        )
        .bind(id)
        .fetch_one(&self.db)
        .await?;

        Self::map_row(&row)
    }

    async fn list(&self) -> Result<Vec<WorkflowRecord>> {
        let rows = crate::store::turso::query(
            r#"
            SELECT id, name, queue_id, created_at
            FROM pgqrs_workflows
            ORDER BY created_at DESC
            "#,
        )
        .fetch_all(&self.db)
        .await?;

        let mut workflows = Vec::with_capacity(rows.len());
        for row in rows {
            workflows.push(Self::map_row(&row)?);
        }
        Ok(workflows)
    }

    async fn count(&self) -> Result<i64> {
        let count: i64 = crate::store::turso::query_scalar("SELECT COUNT(*) FROM pgqrs_workflows")
            .fetch_one(&self.db)
            .await?;
        Ok(count)
    }

    async fn delete(&self, id: i64) -> Result<u64> {
        let count = crate::store::turso::query("DELETE FROM pgqrs_workflows WHERE id = ?")
            .bind(id)
            .execute_once(&self.db)
            .await?;
        Ok(count)
    }
}