athena_rs 3.18.0

Hyper performant polyglot Database driver
Documentation
//! Backend execution for gateway fetch: Postgres (deadpool optional + sqlx), custom Supabase, registry Supabase.

use actix_web::HttpRequest;
use serde_json::Value;
use sqlx::{Pool, Postgres};
use tracing::{error, info};

use crate::AppState;
#[cfg(feature = "deadpool_experimental")]
use crate::api::gateway::deadpool_timeout::deadpool_checkout_timeout;
use crate::api::rate_limit::check_outbound_supabase_optional;
use crate::drivers::postgresql::column_resolver::get_available_columns;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_crud::fetch_rows_with_columns_deadpool;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_raw_sql::deadpool_fallback_reason_label;
use crate::drivers::postgresql::sqlx_driver::fetch_rows_with_columns;
use crate::drivers::supabase::{
    PaginationOptions, fetch_multiple_conditions, fetch_multiple_conditions_with_client,
};
use crate::error::sqlx_parser::process_sqlx_error_with_context_and_columns;
use crate::parser::query_builder::Condition;
use supabase_rs::SupabaseClient;

use super::types::SortOptions;

/// Runs the configured backend and returns raw JSON rows or an error string (often JSON for sqlx).
#[allow(clippy::too_many_arguments)]
pub(crate) async fn execute_gateway_fetch_data(
    app_state: &AppState,
    req: &HttpRequest,
    request_id: &str,
    client_name: &str,
    direct_pg_pool: Option<Pool<Postgres>>,
    table_name: &str,
    columns_refs: Vec<&str>,
    pg_conditions: &[Condition],
    conditions_json: Vec<Value>,
    limit: i64,
    current_page: i64,
    page_size: i64,
    offset: i64,
    calculated_offset: i64,
    sort_options: Option<&SortOptions>,
    deadpool_requested: bool,
) -> Result<Vec<Value>, String> {
    #[cfg(not(feature = "deadpool_experimental"))]
    {
        let _ = request_id;
        let _ = deadpool_requested;
    }

    let mut data_result: Result<Vec<Value>, String> = Err("uninitialized".to_string());

    let resolved_pg_pool: Option<Pool<Postgres>> =
        direct_pg_pool.or_else(|| app_state.pg_registry.get_pool(client_name));

    if let Some(pool) = resolved_pg_pool {
        #[cfg(feature = "deadpool_experimental")]
        if deadpool_requested {
            if let Some(deadpool_pool) = app_state.deadpool_registry.get_pool(client_name) {
                let order_by: Option<(&str, bool)> = sort_options
                    .as_ref()
                    .map(|s| (s.column.as_str(), s.ascending));
                match fetch_rows_with_columns_deadpool(
                    &deadpool_pool,
                    table_name,
                    &columns_refs,
                    pg_conditions,
                    limit,
                    calculated_offset,
                    order_by,
                    deadpool_checkout_timeout(),
                )
                .await
                {
                    Ok(rows) => {
                        app_state
                            .metrics_state
                            .record_gateway_athena_backend("/gateway/fetch", "deadpool");
                        info!(
                            backend = "athena",
                            driver = "deadpool",
                            row_count = rows.len(),
                            "fetch_rows_with_columns finished"
                        );
                        data_result = Ok(rows);
                    }
                    Err(err) => {
                        app_state.metrics_state.record_deadpool_fallback(
                            "/gateway/fetch",
                            deadpool_fallback_reason_label(err.reason),
                        );
                        tracing::warn!(
                            request_id = %request_id,
                            reason = ?err.reason,
                            "Deadpool fetch failed; falling back to sqlx"
                        );
                    }
                }
            }
        }

        if data_result.is_err() {
            let order_by: Option<(&str, bool)> = sort_options
                .as_ref()
                .map(|s| (s.column.as_str(), s.ascending));
            let fetch_result: Result<Vec<Value>, anyhow::Error> = fetch_rows_with_columns(
                &pool,
                table_name,
                &columns_refs,
                pg_conditions,
                limit,
                calculated_offset,
                order_by,
                app_state.gateway_allow_schema_names_prefixed_as_table_name,
            )
            .await;

            data_result = match fetch_result {
                Ok(rows) => {
                    app_state
                        .metrics_state
                        .record_gateway_athena_backend("/gateway/fetch", "sqlx");
                    info!(
                        backend = "athena",
                        driver = "sqlx",
                        row_count = rows.len(),
                        "fetch_rows_with_columns finished"
                    );
                    Ok(rows)
                }
                Err(err) => {
                    if let Some(sqlx_err) = err.downcast_ref::<sqlx::Error>() {
                        let available_columns = get_available_columns(
                            &pool,
                            table_name,
                            app_state.gateway_allow_schema_names_prefixed_as_table_name,
                        )
                        .await
                        .ok();
                        let processed = process_sqlx_error_with_context_and_columns(
                            sqlx_err,
                            Some(table_name),
                            available_columns.as_deref(),
                        );
                        error!(
                            backend = "athena",
                            error_code = %processed.error_code,
                            trace_id = %processed.trace_id,
                            "fetch_rows_with_columns failed"
                        );
                        Err(processed.to_json().to_string())
                    } else {
                        error!(
                            backend = "athena",
                            error = %err,
                            "fetch_rows_with_columns failed"
                        );
                        Err(err.to_string())
                    }
                }
            };
        }
    } else if client_name == "custom_supabase" {
        if !check_outbound_supabase_optional(&app_state.outbound_rate_limit_supabase) {
            tracing::debug!(
                backend = "custom_supabase",
                "outbound supabase rate limited"
            );
            data_result = Err("OutboundSupabaseThrottled".to_string());
        } else {
            let supabase_url: Option<String> = req
                .headers()
                .get("x-supabase-url")
                .and_then(|v| v.to_str().ok())
                .map(|s| s.to_string());
            let supabase_key: Option<String> = req
                .headers()
                .get("x-supabase-key")
                .and_then(|v| v.to_str().ok())
                .map(|s| s.to_string());

            match (supabase_url, supabase_key) {
                (Some(url), Some(key)) => match SupabaseClient::new(url, key) {
                    Ok(client) => {
                        let order_by: Option<(String, bool)> = sort_options
                            .as_ref()
                            .map(|s| (s.column.clone(), s.ascending));
                        data_result = fetch_multiple_conditions_with_client(
                            client,
                            table_name.to_string(),
                            columns_refs,
                            Some(conditions_json),
                            PaginationOptions {
                                limit,
                                current_page,
                                page_size,
                                offset,
                            },
                            order_by,
                        )
                        .await;

                        match &data_result {
                            Ok(rows) => info!(
                                backend = "custom_supabase",
                                row_count = rows.len(),
                                "custom Supabase fetch finished"
                            ),
                            Err(e) => info!(
                                backend = "custom_supabase",
                                error = %e,
                                "custom Supabase fetch failed"
                            ),
                        }
                    }
                    Err(e) => {
                        error!(
                            backend = "custom_supabase",
                            error = %e,
                            "failed to create Supabase client with provided headers"
                        );
                        data_result = Err(format!("Failed to construct Supabase client: {:?}", e));
                    }
                },
                _ => {
                    error!(
                        backend = "custom_supabase",
                        "missing supabase credentials in headers"
                    );
                    data_result =
                        Err("Missing x-supabase-url or x-supabase-key headers".to_string());
                }
            }
        }
    } else {
        if !check_outbound_supabase_optional(&app_state.outbound_rate_limit_supabase) {
            tracing::debug!(backend = "supabase", "outbound supabase rate limited");
            data_result = Err("OutboundSupabaseThrottled".to_string());
        } else {
            let order_by: Option<(String, bool)> = sort_options
                .as_ref()
                .map(|s| (s.column.clone(), s.ascending));
            data_result = fetch_multiple_conditions(
                table_name.to_string(),
                columns_refs,
                Some(conditions_json),
                PaginationOptions {
                    limit,
                    current_page,
                    page_size,
                    offset,
                },
                order_by,
                client_name,
            )
            .await;

            match &data_result {
                Ok(rows) => info!(
                    backend = "supabase",
                    row_count = rows.len(),
                    "Supabase fetch_multiple_conditions finished"
                ),
                Err(e) => info!(
                    backend = "supabase",
                    error = %e,
                    "fetch_multiple_conditions failed"
                ),
            }
        }
    }

    data_result
}