use actix_web::HttpRequest;
use serde_json::Value;
use tracing::{error, info};
use crate::AppState;
#[cfg(feature = "deadpool_experimental")]
use crate::api::gateway::deadpool_timeout::deadpool_checkout_timeout;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_crud::fetch_rows_with_columns_deadpool;
#[cfg(feature = "deadpool_experimental")]
use crate::drivers::postgresql::deadpool_raw_sql::deadpool_fallback_reason_label;
use crate::drivers::postgresql::sqlx_driver::fetch_rows_with_columns;
use crate::drivers::supabase::{
PaginationOptions, fetch_multiple_conditions, fetch_multiple_conditions_with_client,
};
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::parser::query_builder::Condition;
use supabase_rs::SupabaseClient;
use super::types::SortOptions;
#[allow(clippy::too_many_arguments)]
pub(crate) async fn execute_gateway_fetch_data(
app_state: &AppState,
req: &HttpRequest,
request_id: &str,
client_name: &str,
table_name: &str,
columns_refs: Vec<&str>,
pg_conditions: &[Condition],
conditions_json: Vec<Value>,
limit: i64,
current_page: i64,
page_size: i64,
offset: i64,
calculated_offset: i64,
sort_options: Option<&SortOptions>,
deadpool_requested: bool,
) -> Result<Vec<Value>, String> {
#[cfg(not(feature = "deadpool_experimental"))]
{
let _ = request_id;
let _ = deadpool_requested;
}
let mut data_result: Result<Vec<Value>, String> = Err("uninitialized".to_string());
if let Some(pool) = app_state.pg_registry.get_pool(client_name) {
#[cfg(feature = "deadpool_experimental")]
if deadpool_requested {
if let Some(deadpool_pool) = app_state.deadpool_registry.get_pool(client_name) {
let order_by = sort_options
.as_ref()
.map(|s| (s.column.as_str(), s.ascending));
match fetch_rows_with_columns_deadpool(
&deadpool_pool,
table_name,
&columns_refs,
pg_conditions,
limit,
calculated_offset,
order_by,
deadpool_checkout_timeout(),
)
.await
{
Ok(rows) => {
app_state
.metrics_state
.record_gateway_postgres_backend("/gateway/fetch", "deadpool");
info!(
backend = "postgres",
pg_backend = "deadpool",
row_count = rows.len(),
"fetch_rows_with_columns finished"
);
data_result = Ok(rows);
}
Err(err) => {
app_state.metrics_state.record_deadpool_fallback(
"/gateway/fetch",
deadpool_fallback_reason_label(err.reason),
);
tracing::warn!(
request_id = %request_id,
reason = ?err.reason,
"Deadpool fetch failed; falling back to sqlx"
);
}
}
}
}
if data_result.is_err() {
let order_by = sort_options
.as_ref()
.map(|s| (s.column.as_str(), s.ascending));
let fetch_result: Result<Vec<Value>, anyhow::Error> = fetch_rows_with_columns(
&pool,
table_name,
&columns_refs,
pg_conditions,
limit,
calculated_offset,
order_by,
app_state.gateway_allow_schema_names_prefixed_as_table_name,
)
.await;
data_result = match fetch_result {
Ok(rows) => {
app_state
.metrics_state
.record_gateway_postgres_backend("/gateway/fetch", "sqlx");
info!(
backend = "postgres",
pg_backend = "sqlx",
row_count = rows.len(),
"fetch_rows_with_columns finished"
);
Ok(rows)
}
Err(err) => {
if let Some(sqlx_err) = err.downcast_ref::<sqlx::Error>() {
let processed = process_sqlx_error_with_context(sqlx_err, Some(table_name));
error!(
backend = "postgres",
error_code = %processed.error_code,
trace_id = %processed.trace_id,
"fetch_rows_with_columns failed"
);
Err(processed.to_json().to_string())
} else {
error!(
backend = "postgres",
error = %err,
"fetch_rows_with_columns failed"
);
Err(err.to_string())
}
}
};
}
} else if client_name == "custom_supabase" {
let supabase_url: Option<String> = req
.headers()
.get("x-supabase-url")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let supabase_key: Option<String> = req
.headers()
.get("x-supabase-key")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
match (supabase_url, supabase_key) {
(Some(url), Some(key)) => match SupabaseClient::new(url, key) {
Ok(client) => {
let order_by = sort_options
.as_ref()
.map(|s| (s.column.clone(), s.ascending));
data_result = fetch_multiple_conditions_with_client(
client,
table_name.to_string(),
columns_refs,
Some(conditions_json),
PaginationOptions {
limit,
current_page,
page_size,
offset,
},
order_by,
)
.await;
match &data_result {
Ok(rows) => info!(
backend = "custom_supabase",
row_count = rows.len(),
"custom Supabase fetch finished"
),
Err(e) => info!(
backend = "custom_supabase",
error = %e,
"custom Supabase fetch failed"
),
}
}
Err(e) => {
error!(
backend = "custom_supabase",
error = %e,
"failed to create Supabase client with provided headers"
);
data_result = Err(format!("Failed to construct Supabase client: {:?}", e));
}
},
_ => {
error!(
backend = "custom_supabase",
"missing supabase credentials in headers"
);
data_result = Err("Missing x-supabase-url or x-supabase-key headers".to_string());
}
}
} else {
let order_by = sort_options
.as_ref()
.map(|s| (s.column.clone(), s.ascending));
data_result = fetch_multiple_conditions(
table_name.to_string(),
columns_refs,
Some(conditions_json),
PaginationOptions {
limit,
current_page,
page_size,
offset,
},
order_by,
client_name,
)
.await;
match &data_result {
Ok(rows) => info!(
backend = "supabase",
row_count = rows.len(),
"Supabase fetch_multiple_conditions finished"
),
Err(e) => info!(
backend = "supabase",
error = %e,
"fetch_multiple_conditions failed"
),
}
}
data_result
}