raisfast 0.2.19

The last backend you'll ever need. Rust-powered headless CMS with built-in blog, ecommerce, wallet, payment and 4 plugin engines.
use std::sync::Arc;

use crate::commands::CreateWalletOutboxCmd;
use crate::config::app::AppConfig;
use crate::db::Pool;
use crate::errors::app_error::AppResult;
use crate::models::payment_channel;
use crate::models::payment_order::{self, PaymentStatus};
use crate::models::wallet_transaction::{WalletReferenceType, WalletTxType};
use crate::worker::{Job, JobHandler};

pub struct RetryPaymentCallbackHandler {
    pool: Pool,
    config: Arc<AppConfig>,
}

impl RetryPaymentCallbackHandler {
    #[must_use]
    pub fn new(pool: Pool, config: Arc<AppConfig>) -> Self {
        Self { pool, config }
    }
}

#[async_trait::async_trait]
impl JobHandler for RetryPaymentCallbackHandler {
    async fn handle(&self, job: &Job) -> AppResult<()> {
        let order_id = match job {
            Job::RetryPaymentCallback { payment_order_id } => *payment_order_id,
            _ => return Ok(()),
        };

        let order = payment_order::find_by_id(&self.pool, order_id, None)
            .await?
            .ok_or_else(|| {
                crate::errors::app_error::AppError::Internal(anyhow::anyhow!(
                    "payment order {order_id} not found"
                ))
            })?;

        if order.status != PaymentStatus::Pending {
            tracing::info!(
                "[retry_payment_callback] order {} status is {:?}, skipping",
                order.id.to_string(),
                order.status
            );
            return Ok(());
        }

        let Some(ref provider_order_id) = order.provider_order_id else {
            tracing::warn!(
                "[retry_payment_callback] order {} has no provider_order_id, expiring",
                order.id.to_string()
            );
            crate::in_transaction!(&self.pool, tx, {
                crate::models::payment_order::tx_update_status_cas(
                    &mut tx,
                    order.id,
                    PaymentStatus::Expired,
                    Some("expired_at"),
                    PaymentStatus::Pending,
                )
                .await?;
                Ok(())
            })?;
            return Ok(());
        };

        let key = get_encrypt_key(&self.config)?;
        let provider = match crate::payment::providers::get_provider(&order.provider, &key) {
            Ok(p) => p,
            Err(e) => {
                tracing::warn!(
                    "[retry_payment_callback] provider '{}' not available for order {}: {e}",
                    order.provider,
                    order.id.to_string()
                );
                return Ok(());
            }
        };

        let channel = payment_channel::find_by_id(&self.pool, order.channel_id, None)
            .await?
            .ok_or_else(|| {
                crate::errors::app_error::AppError::Internal(anyhow::anyhow!(
                    "payment channel {} not found",
                    order.channel_id
                ))
            })?;

        let status = provider.query(&channel, provider_order_id).await?;

        match status.status {
            PaymentStatus::Paid => {
                tracing::info!(
                    "[retry_payment_callback] order {} confirmed paid via provider query",
                    order.id.to_string()
                );
                crate::in_transaction!(&self.pool, tx, {
                    let rows = crate::models::payment_order::tx_update_status_cas(
                        &mut tx,
                        order.id,
                        PaymentStatus::Paid,
                        Some("paid_at"),
                        PaymentStatus::Pending,
                    )
                    .await?;
                    if rows == 0 {
                        tracing::info!(
                            "[retry_payment_callback] order {} CAS failed, skipping",
                            order.id.to_string()
                        );
                        return Ok(());
                    }

                    let outbox_cmd = CreateWalletOutboxCmd {
                        user_id: order.user_id,
                        currency: order.currency.clone(),
                        amount: order.amount,
                        entry_type: "credit".into(),
                        tx_type: WalletTxType::Recharge,
                        transaction_no: format!("PAY-{}", order.id),
                        reference_type: Some(WalletReferenceType::Payment),
                        reference_id: Some(order.id.to_string()),
                        metadata: None,
                    };
                    crate::models::wallet_outbox::tx_insert(
                        &mut tx,
                        &outbox_cmd,
                        order.tenant_id.as_deref(),
                    )
                    .await?;

                    Ok(())
                })?;
            }
            PaymentStatus::Cancelled => {
                crate::in_transaction!(&self.pool, tx, {
                    let rows = crate::models::payment_order::tx_update_status_cas(
                        &mut tx,
                        order.id,
                        PaymentStatus::Cancelled,
                        Some("cancelled_at"),
                        PaymentStatus::Pending,
                    )
                    .await?;
                    if rows == 0 {
                        tracing::info!(
                            "[retry_payment_callback] order {} CAS failed for cancel",
                            order.id.to_string()
                        );
                    }
                    Ok(())
                })?;
            }
            PaymentStatus::Expired => {
                crate::in_transaction!(&self.pool, tx, {
                    let rows = crate::models::payment_order::tx_update_status_cas(
                        &mut tx,
                        order.id,
                        PaymentStatus::Expired,
                        Some("expired_at"),
                        PaymentStatus::Pending,
                    )
                    .await?;
                    if rows == 0 {
                        tracing::info!(
                            "[retry_payment_callback] order {} CAS failed for expire",
                            order.id.to_string()
                        );
                    }
                    Ok(())
                })?;
            }
            PaymentStatus::Pending => {
                tracing::info!(
                    "[retry_payment_callback] order {} still pending at provider, will retry later",
                    order.id.to_string()
                );
            }
            _ => {
                tracing::info!(
                    "[retry_payment_callback] order {} provider status {:?}, no action",
                    order.id.to_string(),
                    status.status
                );
            }
        }

        Ok(())
    }
}

fn get_encrypt_key(config: &AppConfig) -> AppResult<[u8; 32]> {
    let key_str = config.app_key.as_deref().ok_or_else(|| {
        crate::errors::app_error::AppError::Internal(anyhow::anyhow!("APP_KEY not configured"))
    })?;
    let decoded = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, key_str)
        .map_err(|e| {
            crate::errors::app_error::AppError::Internal(anyhow::anyhow!(
                "APP_KEY base64 decode: {e}"
            ))
        })?;
    if decoded.len() != 32 {
        return Err(crate::errors::app_error::AppError::Internal(
            anyhow::anyhow!("APP_KEY must be 32 bytes, got {}", decoded.len()),
        ));
    }
    let mut arr = [0u8; 32];
    arr.copy_from_slice(&decoded);
    Ok(arr)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::snowflake_id::SnowflakeId;

    #[tokio::test]
    async fn ignores_wrong_job_type() {
        let pool = Pool::connect("sqlite::memory:").await.unwrap();
        sqlx::query(crate::db::schema::SCHEMA_SQL)
            .execute(&pool)
            .await
            .unwrap();
        let config = Arc::new(AppConfig::test_defaults());
        let handler = RetryPaymentCallbackHandler::new(pool, config);
        let job = Job::GenerateSitemap;
        assert!(handler.handle(&job).await.is_ok());
    }

    #[tokio::test]
    async fn handles_retry_job() {
        let pool = Pool::connect("sqlite::memory:").await.unwrap();
        sqlx::query(crate::db::schema::SCHEMA_SQL)
            .execute(&pool)
            .await
            .unwrap();
        let config = Arc::new(AppConfig::test_defaults());
        let handler = RetryPaymentCallbackHandler::new(pool, config);
        let job = Job::RetryPaymentCallback {
            payment_order_id: SnowflakeId(42),
        };
        let result = handler.handle(&job).await;
        assert!(result.is_err());
    }
}