athena_rs 2.9.1

Database gateway API
Documentation
//! Backend execution for gateway fetch: Postgres (deadpool optional + sqlx), custom Supabase, registry Supabase.

use actix_web::HttpRequest;
use serde_json::Value;
use tracing::{error, info};

use crate::AppState;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_crud::fetch_rows_with_columns_deadpool;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_raw_sql::deadpool_fallback_reason_label;
use crate::drivers::postgresql::sqlx_driver::fetch_rows_with_columns;
use crate::drivers::supabase::{
    PaginationOptions, fetch_multiple_conditions, fetch_multiple_conditions_with_client,
};
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::parser::query_builder::Condition;
use supabase_rs::SupabaseClient;

use super::types::SortOptions;

/// Runs the configured backend and returns raw JSON rows or an error string (often JSON for sqlx).
#[allow(clippy::too_many_arguments)]
pub(crate) async fn execute_gateway_fetch_data(
    app_state: &AppState,
    req: &HttpRequest,
    request_id: &str,
    client_name: &str,
    table_name: &str,
    columns_refs: Vec<&str>,
    pg_conditions: &[Condition],
    conditions_json: Vec<Value>,
    limit: i64,
    current_page: i64,
    page_size: i64,
    offset: i64,
    calculated_offset: i64,
    sort_options: Option<&SortOptions>,
    deadpool_requested: bool,
) -> Result<Vec<Value>, String> {
    #[cfg(not(feature = "deadpool_experimental"))]
    {
        let _ = request_id;
        let _ = deadpool_requested;
    }

    let mut data_result: Result<Vec<Value>, String> = Err("uninitialized".to_string());

    if let Some(pool) = app_state.pg_registry.get_pool(client_name) {
        #[cfg(feature = "deadpool_experimental")]
        if deadpool_requested {
            if let Some(deadpool_pool) = app_state.deadpool_registry.get_pool(client_name) {
                let checkout_timeout_ms: u64 = std::env::var("ATHENA_DEADPOOL_CHECKOUT_TIMEOUT_MS")
                    .ok()
                    .and_then(|v| v.parse().ok())
                    .unwrap_or(800);
                let order_by = sort_options
                    .as_ref()
                    .map(|s| (s.column.as_str(), s.ascending));
                match fetch_rows_with_columns_deadpool(
                    &deadpool_pool,
                    table_name,
                    &columns_refs,
                    pg_conditions,
                    limit,
                    calculated_offset,
                    order_by,
                    std::time::Duration::from_millis(checkout_timeout_ms),
                )
                .await
                {
                    Ok(rows) => {
                        app_state
                            .metrics_state
                            .record_gateway_postgres_backend("/gateway/fetch", "deadpool");
                        info!(
                            backend = "postgres",
                            pg_backend = "deadpool",
                            row_count = rows.len(),
                            "fetch_rows_with_columns finished"
                        );
                        data_result = Ok(rows);
                    }
                    Err(err) => {
                        app_state.metrics_state.record_deadpool_fallback(
                            "/gateway/fetch",
                            deadpool_fallback_reason_label(err.reason),
                        );
                        tracing::warn!(
                            request_id = %request_id,
                            reason = ?err.reason,
                            "Deadpool fetch failed; falling back to sqlx"
                        );
                    }
                }
            }
        }

        if data_result.is_err() {
            let order_by = sort_options
                .as_ref()
                .map(|s| (s.column.as_str(), s.ascending));
            let fetch_result: Result<Vec<Value>, anyhow::Error> = fetch_rows_with_columns(
                &pool,
                table_name,
                &columns_refs,
                pg_conditions,
                limit,
                calculated_offset,
                order_by,
                app_state.gateway_allow_schema_names_prefixed_as_table_name,
            )
            .await;

            data_result = match fetch_result {
                Ok(rows) => {
                    app_state
                        .metrics_state
                        .record_gateway_postgres_backend("/gateway/fetch", "sqlx");
                    info!(
                        backend = "postgres",
                        pg_backend = "sqlx",
                        row_count = rows.len(),
                        "fetch_rows_with_columns finished"
                    );
                    Ok(rows)
                }
                Err(err) => {
                    if let Some(sqlx_err) = err.downcast_ref::<sqlx::Error>() {
                        let processed = process_sqlx_error_with_context(sqlx_err, Some(table_name));
                        error!(
                            backend = "postgres",
                            error_code = %processed.error_code,
                            trace_id = %processed.trace_id,
                            "fetch_rows_with_columns failed"
                        );
                        Err(processed.to_json().to_string())
                    } else {
                        error!(
                            backend = "postgres",
                            error = %err,
                            "fetch_rows_with_columns failed"
                        );
                        Err(err.to_string())
                    }
                }
            };
        }
    } else if client_name == "custom_supabase" {
        let supabase_url: Option<String> = req
            .headers()
            .get("x-supabase-url")
            .and_then(|v| v.to_str().ok())
            .map(|s| s.to_string());
        let supabase_key: Option<String> = req
            .headers()
            .get("x-supabase-key")
            .and_then(|v| v.to_str().ok())
            .map(|s| s.to_string());

        match (supabase_url, supabase_key) {
            (Some(url), Some(key)) => match SupabaseClient::new(url, key) {
                Ok(client) => {
                    let order_by = sort_options
                        .as_ref()
                        .map(|s| (s.column.clone(), s.ascending));
                    data_result = fetch_multiple_conditions_with_client(
                        client,
                        table_name.to_string(),
                        columns_refs,
                        Some(conditions_json),
                        PaginationOptions {
                            limit,
                            current_page,
                            page_size,
                            offset,
                        },
                        order_by,
                    )
                    .await;

                    match &data_result {
                        Ok(rows) => info!(
                            backend = "custom_supabase",
                            row_count = rows.len(),
                            "custom Supabase fetch finished"
                        ),
                        Err(e) => info!(
                            backend = "custom_supabase",
                            error = %e,
                            "custom Supabase fetch failed"
                        ),
                    }
                }
                Err(e) => {
                    error!(
                        backend = "custom_supabase",
                        error = %e,
                        "failed to create Supabase client with provided headers"
                    );
                    data_result = Err(format!("Failed to construct Supabase client: {:?}", e));
                }
            },
            _ => {
                error!(
                    backend = "custom_supabase",
                    "missing supabase credentials in headers"
                );
                data_result = Err("Missing x-supabase-url or x-supabase-key headers".to_string());
            }
        }
    } else {
        let order_by = sort_options
            .as_ref()
            .map(|s| (s.column.clone(), s.ascending));
        data_result = fetch_multiple_conditions(
            table_name.to_string(),
            columns_refs,
            Some(conditions_json),
            PaginationOptions {
                limit,
                current_page,
                page_size,
                offset,
            },
            order_by,
            client_name,
        )
        .await;

        match &data_result {
            Ok(rows) => info!(
                backend = "supabase",
                row_count = rows.len(),
                "Supabase fetch_multiple_conditions finished"
            ),
            Err(e) => info!(
                backend = "supabase",
                error = %e,
                "fetch_multiple_conditions failed"
            ),
        }
    }

    data_result
}