pgqrs 0.15.3

A high-performance PostgreSQL-backed job queue for Rust applications
Documentation
use crate::error::Result;
use crate::store::dialect::SqlDialect;
use crate::store::turso::dialect::TursoDialect;
use crate::store::turso::parse_turso_timestamp;
use crate::types::{NewRunRecord, RunRecord, WorkflowStatus};
use async_trait::async_trait;
use serde_json::Value;
use std::str::FromStr;
use std::sync::Arc;
use turso::Database;

#[derive(Debug, Clone)]
pub struct TursoRunRecordTable {
    db: Arc<Database>,
}

impl TursoRunRecordTable {
    pub fn new(db: Arc<Database>) -> Self {
        Self { db }
    }

    fn map_row(row: &turso::Row) -> Result<RunRecord> {
        let id: i64 = row.get(0)?;
        let workflow_id: i64 = row.get(1)?;
        let message_id: i64 = row.get(2)?;

        let status_str: String = row.get(3)?;
        let status = WorkflowStatus::from_str(&status_str)
            .map_err(|e| crate::error::Error::Internal { message: e })?;

        let input_str: Option<String> = row.get(4)?;
        let input: Option<Value> = match input_str {
            Some(s) => Some(serde_json::from_str(&s)?),
            None => None,
        };

        let output_str: Option<String> = row.get(5)?;
        let output: Option<Value> = match output_str {
            Some(s) => Some(serde_json::from_str(&s)?),
            None => None,
        };

        let error_str: Option<String> = row.get(6)?;
        let error: Option<Value> = match error_str {
            Some(s) => Some(serde_json::from_str(&s)?),
            None => None,
        };

        let created_at = parse_turso_timestamp(&row.get::<String>(7)?)?;
        let updated_at = parse_turso_timestamp(&row.get::<String>(8)?)?;

        Ok(RunRecord {
            id,
            workflow_id,
            message_id,
            status,
            input,
            output,
            error,
            created_at,
            updated_at,
        })
    }
}

#[async_trait]
impl crate::store::RunRecordTable for TursoRunRecordTable {
    async fn insert(&self, data: NewRunRecord) -> Result<RunRecord> {
        let input_str = data.input.map(|v| v.to_string());

        let row = crate::store::turso::query(TursoDialect::RUN.insert)
            .bind(data.workflow_id)
            .bind(data.message_id)
            .bind(input_str)
            .fetch_one_once(&self.db)
            .await?;

        Self::map_row(&row)
    }

    async fn get(&self, id: i64) -> Result<RunRecord> {
        let row = crate::store::turso::query(TursoDialect::RUN.get)
            .bind(id)
            .fetch_one(&self.db)
            .await?;

        Self::map_row(&row)
    }

    async fn list(&self) -> Result<Vec<RunRecord>> {
        let rows = crate::store::turso::query(TursoDialect::RUN.list)
            .fetch_all(&self.db)
            .await?;

        let mut runs = Vec::with_capacity(rows.len());
        for row in rows {
            runs.push(Self::map_row(&row)?);
        }

        Ok(runs)
    }

    async fn count(&self) -> Result<i64> {
        let count: i64 = crate::store::turso::query_scalar(TursoDialect::RUN.count)
            .fetch_one(&self.db)
            .await?;
        Ok(count)
    }

    async fn delete(&self, id: i64) -> Result<u64> {
        let count = crate::store::turso::query(TursoDialect::RUN.delete)
            .bind(id)
            .execute_once(&self.db)
            .await?;
        Ok(count)
    }

    async fn start_run(&self, id: i64) -> Result<RunRecord> {
        let row = crate::store::turso::query(TursoDialect::RUN.start)
            .bind(id)
            .fetch_optional(&self.db)
            .await?;

        if let Some(row) = row {
            return Self::map_row(&row);
        }

        let status_str: Option<String> =
            crate::store::turso::query_scalar(TursoDialect::RUN.get_status)
                .bind(id)
                .fetch_optional(&self.db)
                .await?;

        if let Some(s) = status_str {
            if let Ok(status) = WorkflowStatus::from_str(&s) {
                if matches!(status, WorkflowStatus::Error | WorkflowStatus::Success) {
                    return Err(crate::error::Error::ValidationFailed {
                        reason: format!("Run {} is in terminal {} state", id, status),
                    });
                }
            }
        }

        self.get(id).await
    }

    async fn complete_run(&self, id: i64, output: serde_json::Value) -> Result<RunRecord> {
        let output_str = output.to_string();
        let _rows = crate::store::turso::query(TursoDialect::RUN.complete)
            .bind(id)
            .bind(output_str)
            .execute_once(&self.db)
            .await?;
        self.get(id).await
    }

    async fn pause_run(
        &self,
        id: i64,
        message: String,
        resume_after: std::time::Duration,
    ) -> Result<RunRecord> {
        let error = serde_json::json!({
            "message": message,
            "resume_after": resume_after.as_secs()
        });
        let error_str = error.to_string();
        let _rows = crate::store::turso::query(TursoDialect::RUN.pause)
            .bind(id)
            .bind(error_str)
            .execute_once(&self.db)
            .await?;
        self.get(id).await
    }

    async fn fail_run(&self, id: i64, error: serde_json::Value) -> Result<RunRecord> {
        let error_str = error.to_string();
        let _rows = crate::store::turso::query(TursoDialect::RUN.fail)
            .bind(id)
            .bind(error_str)
            .execute_once(&self.db)
            .await?;
        self.get(id).await
    }

    async fn get_by_message_id(&self, message_id: i64) -> Result<RunRecord> {
        let row = crate::store::turso::query(TursoDialect::RUN.get_by_message_id)
            .bind(message_id)
            .fetch_one(&self.db)
            .await?;

        Self::map_row(&row)
    }
}