raisfast 0.2.19

The last backend you'll ever need. Rust-powered headless CMS with built-in blog, ecommerce, wallet, payment and 4 plugin engines.
use std::sync::Arc;

use crate::config::app::AppConfig;
use crate::db::Pool;
use crate::db::{DbDriver, Driver};
use crate::errors::app_error::AppResult;
use crate::models::payment_channel;
use crate::models::payment_order::PaymentStatus;
use crate::services::audit::AuditService;
use crate::worker::{Job, JobHandler};

pub struct ReconcilePaymentsHandler {
    pool: Pool,
    config: Arc<AppConfig>,
}

impl ReconcilePaymentsHandler {
    #[must_use]
    pub fn new(pool: Pool, config: Arc<AppConfig>) -> Self {
        Self { pool, config }
    }
}

#[async_trait::async_trait]
impl JobHandler for ReconcilePaymentsHandler {
    async fn handle(&self, job: &Job) -> AppResult<()> {
        let Job::ReconcilePayments = job else {
            return Ok(());
        };

        let yesterday_start = (chrono::Utc::now() - chrono::Duration::days(1))
            .format("%Y-%m-%dT00:00:00Z")
            .to_string();
        let yesterday_end = chrono::Utc::now().format("%Y-%m-%dT00:00:00Z").to_string();

        let sql = format!(
            "SELECT * FROM payment_orders WHERE status = 'paid' AND paid_at >= {} AND paid_at < {} LIMIT 500",
            Driver::ph(1),
            Driver::ph(2)
        );
        let orders: Vec<crate::models::payment_order::PaymentOrder> = sqlx::query_as(&sql)
            .bind(&yesterday_start)
            .bind(&yesterday_end)
            .fetch_all(&self.pool)
            .await?;

        let mut reconciled = 0u64;
        let mut mismatches = 0u64;

        for order in &orders {
            match self.reconcile_order(order).await {
                Ok(true) => reconciled += 1,
                Ok(false) => mismatches += 1,
                Err(e) => {
                    tracing::warn!(
                        "[reconcile_payments] error reconciling order {}: {e}",
                        order.id.to_string()
                    );
                }
            }
        }

        tracing::info!(
            "[reconcile_payments] checked {} orders, {reconciled} ok, {mismatches} mismatches",
            orders.len()
        );
        Ok(())
    }
}

impl ReconcilePaymentsHandler {
    async fn reconcile_order(
        &self,
        order: &crate::models::payment_order::PaymentOrder,
    ) -> AppResult<bool> {
        let Some(ref provider_order_id) = order.provider_order_id else {
            return Ok(true);
        };

        let key = get_encrypt_key(&self.config)?;
        let provider = match crate::payment::providers::get_provider(&order.provider, &key) {
            Ok(p) => p,
            Err(_) => return Ok(true),
        };

        let channel = payment_channel::find_by_id(&self.pool, order.channel_id, None)
            .await?
            .ok_or_else(|| {
                crate::errors::app_error::AppError::Internal(anyhow::anyhow!(
                    "payment channel {} not found",
                    order.channel_id
                ))
            })?;

        let status = provider.query(&channel, provider_order_id).await?;
        let audit = AuditService::new(self.pool.clone());

        if status.status != PaymentStatus::Paid {
            let detail = format!(
                "local=paid provider={:?} order={} amount={}",
                status.status, order.id, order.amount
            );
            tracing::error!("[reconcile_payments] critical: {detail}");
            if let Err(audit_err) = audit
                .log(
                    "",
                    None,
                    None,
                    "payment_reconcile_mismatch",
                    "payment_order",
                    Some(&order.id.to_string()),
                    Some(&detail),
                    None,
                    None,
                )
                .await
            {
                tracing::error!("[reconcile_payments] audit log failed: {audit_err}");
            }
            return Ok(false);
        }

        if let Some(provider_amount) = status.amount
            && provider_amount != order.amount
        {
            let detail = format!(
                "amount_mismatch local={} provider={} order={}",
                order.amount, provider_amount, order.id
            );
            tracing::error!("[reconcile_payments] critical: {detail}");
            if let Err(audit_err) = audit
                .log(
                    "",
                    None,
                    None,
                    "payment_reconcile_amount_mismatch",
                    "payment_order",
                    Some(&order.id.to_string()),
                    Some(&detail),
                    None,
                    None,
                )
                .await
            {
                tracing::error!("[reconcile_payments] audit log failed: {audit_err}");
            }
            return Ok(false);
        }

        Ok(true)
    }
}

fn get_encrypt_key(config: &AppConfig) -> AppResult<[u8; 32]> {
    let key_str = config.app_key.as_deref().ok_or_else(|| {
        crate::errors::app_error::AppError::Internal(anyhow::anyhow!("APP_KEY not configured"))
    })?;
    let decoded = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, key_str)
        .map_err(|e| {
            crate::errors::app_error::AppError::Internal(anyhow::anyhow!(
                "APP_KEY base64 decode: {e}"
            ))
        })?;
    if decoded.len() != 32 {
        return Err(crate::errors::app_error::AppError::Internal(
            anyhow::anyhow!("APP_KEY must be 32 bytes, got {}", decoded.len()),
        ));
    }
    let mut arr = [0u8; 32];
    arr.copy_from_slice(&decoded);
    Ok(arr)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn ignores_wrong_job_type() {
        let pool = Pool::connect("sqlite::memory:").await.unwrap();
        sqlx::query(crate::db::schema::SCHEMA_SQL)
            .execute(&pool)
            .await
            .unwrap();
        let config = Arc::new(AppConfig::test_defaults());
        let handler = ReconcilePaymentsHandler::new(pool, config);
        let job = Job::GenerateSitemap;
        assert!(handler.handle(&job).await.is_ok());
    }

    #[tokio::test]
    async fn handles_reconcile_job() {
        let pool = Pool::connect("sqlite::memory:").await.unwrap();
        sqlx::query(crate::db::schema::SCHEMA_SQL)
            .execute(&pool)
            .await
            .unwrap();
        let config = Arc::new(AppConfig::test_defaults());
        let handler = ReconcilePaymentsHandler::new(pool, config);
        let job = Job::ReconcilePayments;
        assert!(handler.handle(&job).await.is_ok());
    }
}