raisfast 0.2.19

The last backend you'll ever need. Rust-powered headless CMS with built-in blog, ecommerce, wallet, payment and 4 plugin engines.
use std::sync::Arc;

use crate::config::app::AppConfig;
use crate::db::Pool;
use crate::errors::app_error::AppResult;
use crate::models::wallet_outbox;
use crate::models::wallet_transaction::WalletTxType;
use crate::worker::{Job, JobHandler};

pub struct ProcessWalletOutboxHandler {
    pool: Pool,
    _config: Arc<AppConfig>,
}

impl ProcessWalletOutboxHandler {
    #[must_use]
    pub fn new(pool: Pool, config: Arc<AppConfig>) -> Self {
        Self {
            pool,
            _config: config,
        }
    }
}

#[async_trait::async_trait]
impl JobHandler for ProcessWalletOutboxHandler {
    async fn handle(&self, job: &Job) -> AppResult<()> {
        let Job::ProcessWalletOutbox = job else {
            return Ok(());
        };

        let entries = wallet_outbox::fetch_pending(&self.pool, 100).await?;

        if entries.is_empty() {
            return Ok(());
        }

        let mut processed = 0u64;
        let mut failed = 0u64;

        for entry in &entries {
            if let Err(e) = wallet_outbox::mark_processing(&self.pool, entry.id).await {
                tracing::warn!(
                    "[wallet_outbox] CAS failed for entry {}, skipping: {e}",
                    entry.id.to_string()
                );
                continue;
            }

            let result = match entry.entry_type.as_str() {
                "credit" => {
                    crate::services::wallet::credit_wallet(
                        &self.pool,
                        entry.user_id,
                        &entry.currency,
                        entry.amount,
                        parse_tx_type(&entry.tx_type),
                        &entry.transaction_no,
                        entry
                            .reference_type
                            .as_deref()
                            .and_then(parse_reference_type),
                        entry.reference_id.as_deref(),
                        entry.metadata.as_deref(),
                    )
                    .await
                }
                "debit" => {
                    crate::services::wallet::debit_wallet(
                        &self.pool,
                        entry.user_id,
                        &entry.currency,
                        entry.amount,
                        parse_tx_type(&entry.tx_type),
                        &entry.transaction_no,
                        entry
                            .reference_type
                            .as_deref()
                            .and_then(parse_reference_type),
                        entry.reference_id.as_deref(),
                        entry.metadata.as_deref(),
                    )
                    .await
                }
                other => Err(crate::errors::app_error::AppError::Internal(
                    anyhow::anyhow!("unknown entry_type: {other}"),
                )),
            };

            match result {
                Ok(_) => {
                    wallet_outbox::mark_completed(&self.pool, entry.id).await?;
                    processed += 1;
                }
                Err(e) => {
                    let err_str = format!("{e}");
                    tracing::warn!(
                        "[wallet_outbox] failed to process entry {}: {err_str}",
                        entry.id.to_string()
                    );
                    wallet_outbox::mark_failed(&self.pool, entry.id, &err_str).await?;
                    failed += 1;
                }
            }
        }

        if processed > 0 || failed > 0 {
            tracing::info!("[wallet_outbox] processed {processed}, failed {failed}");
        }

        Ok(())
    }
}

fn parse_tx_type(s: &str) -> WalletTxType {
    match s {
        "recharge" => WalletTxType::Recharge,
        "payment" => WalletTxType::Payment,
        "refund" => WalletTxType::Refund,
        "transfer_out" => WalletTxType::TransferOut,
        "transfer_in" => WalletTxType::TransferIn,
        _ => WalletTxType::Recharge,
    }
}

fn parse_reference_type(s: &str) -> Option<crate::models::wallet_transaction::WalletReferenceType> {
    use crate::models::wallet_transaction::WalletReferenceType;
    match s {
        "admin" => Some(WalletReferenceType::Admin),
        "checkin" => Some(WalletReferenceType::Checkin),
        "order_reward" => Some(WalletReferenceType::OrderReward),
        "api_usage" => Some(WalletReferenceType::ApiUsage),
        "points_mall" => Some(WalletReferenceType::PointsMall),
        "order" => Some(WalletReferenceType::Order),
        "expiry" => Some(WalletReferenceType::Expiry),
        "payment" => Some(WalletReferenceType::Payment),
        "payment_refund" => Some(WalletReferenceType::PaymentRefund),
        _ => None,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn ignores_wrong_job_type() {
        let pool = Pool::connect("sqlite::memory:").await.unwrap();
        sqlx::query(crate::db::schema::SCHEMA_SQL)
            .execute(&pool)
            .await
            .unwrap();
        let config = Arc::new(AppConfig::test_defaults());
        let handler = ProcessWalletOutboxHandler::new(pool, config);
        let job = Job::GenerateSitemap;
        assert!(handler.handle(&job).await.is_ok());
    }

    #[tokio::test]
    async fn handles_empty_outbox() {
        let pool = Pool::connect("sqlite::memory:").await.unwrap();
        sqlx::query(crate::db::schema::SCHEMA_SQL)
            .execute(&pool)
            .await
            .unwrap();
        let config = Arc::new(AppConfig::test_defaults());
        let handler = ProcessWalletOutboxHandler::new(pool, config);
        let job = Job::ProcessWalletOutbox;
        assert!(handler.handle(&job).await.is_ok());
    }
}