raisfast 0.2.20

The last backend you'll ever need. Rust-powered headless CMS with built-in blog, ecommerce, wallet, payment and 4 plugin engines.
use serde::{Deserialize, Serialize};
use sqlx::FromRow;

use crate::db::{DbDriver, Driver};
use crate::errors::app_error::AppResult;
use crate::types::snowflake_id::SnowflakeId;
use crate::utils::tz::Timestamp;

define_enum!(
    OutboxStatus {
        Pending = "pending",
        Processing = "processing",
        Completed = "completed",
        Failed = "failed",
        Dead = "dead",
    }
);

#[derive(Debug, FromRow, Serialize, Deserialize, Clone)]
pub struct WalletOutbox {
    pub id: SnowflakeId,
    pub user_id: SnowflakeId,
    pub currency: String,
    pub amount: i64,
    pub entry_type: String,
    pub tx_type: String,
    pub transaction_no: String,
    pub reference_type: Option<String>,
    pub reference_id: Option<String>,
    pub metadata: Option<String>,
    pub tenant_id: Option<String>,
    pub status: OutboxStatus,
    pub attempts: i64,
    pub max_attempts: i64,
    pub last_error: Option<String>,
    pub created_at: Timestamp,
    pub updated_at: Timestamp,
}

pub async fn tx_insert(
    tx: &mut crate::db::pool::DbConnection,
    cmd: &crate::commands::CreateWalletOutboxCmd,
    tenant_id: Option<&str>,
) -> AppResult<()> {
    let id = crate::utils::id::new_id();
    let now = crate::utils::tz::now_utc();
    raisfast_derive::crud_insert!(
        &mut *tx,
        "wallet_outbox",
        [
            "id" => id,
            "user_id" => cmd.user_id,
            "currency" => &cmd.currency,
            "amount" => cmd.amount,
            "entry_type" => &cmd.entry_type,
            "tx_type" => cmd.tx_type,
            "transaction_no" => &cmd.transaction_no,
            "reference_type" => cmd.reference_type,
            "reference_id" => &cmd.reference_id,
            "metadata" => &cmd.metadata,
            "status" => OutboxStatus::Pending,
            "created_at" => &now,
            "updated_at" => &now
        ],
        tenant: tenant_id
    )?;
    Ok(())
}

pub async fn fetch_pending(pool: &crate::db::Pool, limit: i64) -> AppResult<Vec<WalletOutbox>> {
    raisfast_derive::check_schema!(
        "wallet_outbox",
        "status",
        "attempts",
        "max_attempts",
        "created_at"
    );
    let sql = format!(
        "SELECT * FROM wallet_outbox WHERE status IN ('pending', 'failed') AND attempts < max_attempts ORDER BY created_at ASC LIMIT {}",
        Driver::ph(1)
    );
    sqlx::query_as::<_, WalletOutbox>(&sql)
        .bind(limit)
        .fetch_all(pool)
        .await
        .map_err(Into::into)
}

pub async fn mark_processing(pool: &crate::db::Pool, id: SnowflakeId) -> AppResult<()> {
    raisfast_derive::check_schema!("wallet_outbox", "status", "updated_at", "id");
    let sql = format!(
        "UPDATE wallet_outbox SET status = 'processing', updated_at = {} WHERE id = {} AND status IN ('pending', 'failed')",
        crate::db::Driver::now_fn(),
        Driver::ph(1)
    );
    sqlx::query(&sql).bind(id).execute(pool).await?;
    Ok(())
}

pub async fn mark_completed(pool: &crate::db::Pool, id: SnowflakeId) -> AppResult<()> {
    raisfast_derive::crud_update!(pool, "wallet_outbox",
        raw: ["status" => "'completed'", "updated_at" => crate::db::Driver::now_fn()],
        where: ("id", id)
    )?;
    Ok(())
}

pub async fn mark_failed(pool: &crate::db::Pool, id: SnowflakeId, error: &str) -> AppResult<()> {
    raisfast_derive::check_schema!(
        "wallet_outbox",
        "status",
        "attempts",
        "max_attempts",
        "last_error",
        "updated_at",
        "id"
    );
    let sql = format!(
        "UPDATE wallet_outbox SET status = CASE WHEN attempts + 1 >= max_attempts THEN 'dead' ELSE 'failed' END, attempts = attempts + 1, last_error = {}, updated_at = {} WHERE id = {}",
        Driver::ph(1),
        crate::db::Driver::now_fn(),
        Driver::ph(2)
    );
    sqlx::query(&sql).bind(error).bind(id).execute(pool).await?;
    Ok(())
}