pub mod client;
use crate::drivers::scylla::health::HostOffline;
use client::{HealthAwareSupabaseClient, SupabaseConnectionInfo};
use colored::*;
use dotenv::dotenv;
use serde_json::{Map, Value, json};
use std::env;
use std::env::var;
use std::error::Error;
use std::time::Instant;
use supabase_rs::SupabaseClient;
use supabase_rs::query::QueryBuilder;
use tokio_postgres::NoTls;
use tokio_postgres::Row;
use tokio_postgres::types::Type;
use tracing::{debug, error, info, instrument, warn};
fn env_var(name: &str) -> Result<String, String> {
dotenv().ok();
var(name).map_err(|err| format!("missing environment variable '{}': {}", name, err))
}
fn cast_row_value(row: &Row, column_index: usize, column_name: &str, column_type: &Type) -> Value {
match column_type {
&Type::INT2 => {
debug!("Casting column '{}' as INT2", column_name);
json!(row.get::<_, Option<i16>>(column_index))
}
&Type::INT4 => {
debug!("Casting column '{}' as INT4", column_name);
json!(row.get::<_, Option<i32>>(column_index))
}
&Type::INT8 => {
debug!("Casting column '{}' as INT8", column_name);
json!(row.get::<_, Option<i64>>(column_index))
}
&Type::TEXT | &Type::VARCHAR | &Type::BPCHAR | &Type::NAME => {
debug!("Casting column '{}' as TEXT-like", column_name);
json!(row.get::<_, Option<String>>(column_index))
}
&Type::BOOL => {
debug!("Casting column '{}' as BOOL", column_name);
json!(row.get::<_, Option<bool>>(column_index))
}
&Type::FLOAT4 => {
debug!("Casting column '{}' as FLOAT4", column_name);
json!(row.get::<_, Option<f32>>(column_index))
}
&Type::FLOAT8 => {
debug!("Casting column '{}' as FLOAT8", column_name);
json!(row.get::<_, Option<f64>>(column_index))
}
&Type::JSONB | &Type::JSON => {
debug!("Casting column '{}' as JSON/JSONB", column_name);
json!(row.get::<_, Option<Value>>(column_index))
}
&Type::TIMESTAMP | &Type::TIMESTAMPTZ | &Type::DATE | &Type::TIME | &Type::TIMETZ => {
debug!("Casting column '{}' as temporal string", column_name);
json!(row.get::<_, Option<String>>(column_index))
}
_ => {
warn!(
"Unknown type {:?} for column '{}', falling back to string cast",
column_type, column_name
);
json!(row.get::<_, Option<String>>(column_index))
}
}
}
fn supabase_client_from_env(url_key: &str, key_key: &str) -> Result<SupabaseClient, String> {
let url: String = env_var(url_key)?;
let key: String = env_var(key_key)?;
SupabaseClient::new(url, key).map_err(|e| format!("Failed to create SupabaseClient: {:?}", e))
}
pub async fn supabase() -> Result<SupabaseClient, String> {
supabase_client_from_env("SUPABASE_URL", "SUPABASE_KEY")
}
pub async fn supabase_xbp() -> Result<SupabaseClient, String> {
supabase_client_from_env("SUPABASE_XBP_URL", "SUPABASE_XBP_KEY")
}
pub async fn athena_dexter() -> Result<SupabaseClient, String> {
supabase_client_from_env("ATHENA_DEXTER_URL", "ATHENA_DEXTER_KEY")
}
pub async fn atabex() -> Result<SupabaseClient, String> {
supabase_client_from_env("ATABEX_URL", "ATABEX_KEY")
}
pub async fn athena() -> Result<SupabaseClient, String> {
dotenv().ok();
let athena_url: String = "https://athena-db.com".to_string();
let athena_key: String = env_var("ATHENA_KEY_12")?;
SupabaseClient::new(athena_url, athena_key)
.map_err(|e| format!("Failed to create AthenaClient: {:#?}", e))
}
#[instrument(skip(query), fields(query_length = query.len(), db_name = %athena_db_name))]
pub async fn execute_query_supabase(
query: String,
athena_db_name: String,
) -> Result<Value, Box<dyn Error>> {
let connection_string: String = env_var("SUPABASE_POSTGRES_URI")
.map_err(|e| e.to_string())?
.replace("{db}", &athena_db_name);
debug!("Full connection string prepared (credentials masked)");
let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Supabase connection error: {}", e);
} else {
debug!("Supabase connection task completed successfully");
}
});
let start_time: Instant = Instant::now();
let rows: Vec<Row> = client.query(query.as_str(), &[]).await?;
let _query_duration: std::time::Duration = start_time.elapsed();
let mut results: Vec<Value> = Vec::new();
let mut row_count: i32 = 0;
for row in rows {
row_count += 1;
debug!("Processing row {}", row_count.to_string().yellow());
let mut row_data: Map<String, Value> = Map::new();
let columns: &[tokio_postgres::Column] = row.columns();
debug!("Row has {} columns", columns.len().to_string().cyan());
for (i, column) in columns.iter().enumerate() {
let column_name: &str = column.name();
let column_type = column.type_();
debug!(
"Processing column '{}' (type: {:?}) at index {}",
column_name.cyan(),
column_type,
i.to_string().yellow()
);
let value: Value = cast_row_value(&row, i, column_name, column_type);
row_data.insert(column_name.to_string(), value);
}
results.push(Value::Object(row_data));
if row_count % 100 == 0 {
debug!("Processed {} rows so far", row_count.to_string().green());
}
}
let duration: u128 = start_time.elapsed().as_millis();
let json_result: Value = json!({
"data": results,
"db_name": athena_db_name,
"duration": duration,
"message": "Successfully executed query",
"status": "success"
});
Ok(json_result)
}
pub fn client_router_health_aware(client_name: &str) -> Result<HealthAwareSupabaseClient, String> {
dotenv().ok();
let info: SupabaseConnectionInfo = match client_name {
"supabase" => SupabaseConnectionInfo::from_env("SUPABASE_URL", "SUPABASE_KEY"),
"xbp" => SupabaseConnectionInfo::from_env("SUPABASE_XBP_URL", "SUPABASE_XBP_KEY"),
"athena" => {
let url =
env::var("ATHENA_DB_URL").unwrap_or_else(|_| "https://athena-db.com".to_string());
let key =
env::var("ATHENA_KEY_12").map_err(|e| format!("Missing ATHENA_KEY_12: {}", e))?;
Ok(SupabaseConnectionInfo::new(url, key))
}
"atabex" => SupabaseConnectionInfo::from_env("ATABEX_URL", "ATABEX_KEY"),
"athena_dexter" => {
SupabaseConnectionInfo::from_env("ATHENA_DEXTER_URL", "ATHENA_DEXTER_KEY")
}
_ => return Err(format!("Unknown client name: {}", client_name)),
}
.map_err(|e| format!("Missing env for {}: {}", client_name, e))?;
HealthAwareSupabaseClient::new(info)
.map_err(|e| format!("Failed to create health-aware client: {}", e))
}
pub async fn client_router(client_name: &str) -> Result<SupabaseClient, String> {
match client_name {
"supabase" => supabase().await,
"xbp" => supabase_xbp().await,
"athena" => athena().await,
"atabex" => atabex().await,
"athena_dexter" => athena_dexter().await,
_ => Err(format!("Unknown client name: {}", client_name)),
}
}
pub struct PaginationOptions {
pub limit: i64,
pub current_page: i64,
pub page_size: i64,
pub offset: i64,
}
pub type OrderByOption = Option<(String, bool)>;
pub async fn fetch_multiple_conditions(
table_name: String,
columns: Vec<&str>,
conditions: Option<Vec<Value>>,
pagination: PaginationOptions,
order_by: OrderByOption,
client_name: &str,
) -> Result<Vec<Value>, String> {
if let Ok(health_client) = client_router_health_aware(client_name) {
return fetch_multiple_conditions_health_aware(
health_client,
table_name,
columns,
conditions,
pagination,
order_by,
client_name,
)
.await;
}
let client: SupabaseClient = client_router(client_name).await?;
fetch_multiple_conditions_with_client(
client, table_name, columns, conditions, pagination, order_by,
)
.await
}
async fn fetch_multiple_conditions_health_aware(
health_client: HealthAwareSupabaseClient,
table_name: String,
columns: Vec<&str>,
conditions: Option<Vec<Value>>,
pagination: PaginationOptions,
order_by: OrderByOption,
client_name: &str,
) -> Result<Vec<Value>, String> {
let fetch_start: Instant = Instant::now();
let (order_col, order_asc): (&str, bool) = order_by
.as_ref()
.filter(|(col, _)| !col.is_empty())
.map(|(col, asc)| (col.as_str(), *asc))
.unwrap_or(("created_at", false));
let mut builder = health_client.select(&table_name).columns(columns);
if let Some(conds) = &conditions {
for condition in conds {
if let Some(eq_column) = condition.get("eq_column").and_then(Value::as_str)
&& let Some(eq_value) = condition.get("eq_value").and_then(Value::as_str)
{
builder = builder.eq(eq_column, eq_value);
}
}
}
let calculated_offset: i64 =
(pagination.current_page - 1) * pagination.page_size + pagination.offset;
let builder = builder
.order(order_col, order_asc)
.limit(pagination.limit.try_into().unwrap_or(i64::MAX as usize))
.offset(calculated_offset.try_into().unwrap_or(0));
match builder.execute().await {
Ok(result) => Ok(result),
Err(err) => {
if err.downcast_ref::<HostOffline>().is_some() {
let remaining = err
.downcast_ref::<HostOffline>()
.and_then(|o| o.until().checked_duration_since(std::time::Instant::now()))
.unwrap_or(std::time::Duration::ZERO);
error!(
backend = %client_name,
table = %table_name,
until_secs = remaining.as_secs(),
"Supabase host offline (circuit breaker)"
);
return Err(format!(
"HostOffline:{}:{}",
client_name,
remaining.as_secs().max(1)
));
}
error!(
backend = %client_name,
table = %table_name,
duration_ms = %fetch_start.elapsed().as_millis(),
error = %err,
"Supabase fetch_multiple_conditions (health-aware) failed"
);
Err(format!(
"Failed to fetch data in {:?}: {:?}",
table_name, err
))
}
}
}
pub async fn fetch_multiple_conditions_with_client(
client: SupabaseClient,
table_name: String,
columns: Vec<&str>,
conditions: Option<Vec<Value>>,
pagination: PaginationOptions,
order_by: OrderByOption,
) -> Result<Vec<Value>, String> {
let fetch_start: Instant = Instant::now();
let mut query: QueryBuilder = client.select(&table_name).columns(columns);
if let Some(conditions) = conditions {
for condition in conditions {
if let Some(eq_column) = condition.get("eq_column").and_then(Value::as_str)
&& let Some(eq_value) = condition.get("eq_value").and_then(Value::as_str)
{
query = query.eq(eq_column, eq_value);
info!(
backend = "custom_supabase",
column = %eq_column,
value = %eq_value,
"applied condition to custom Supabase query"
);
}
}
}
let (order_col, order_asc): (&str, bool) = order_by
.as_ref()
.filter(|(col, _)| !col.is_empty())
.map(|(col, asc)| (col.as_str(), *asc))
.unwrap_or(("created_at", false));
query = query.order(order_col, order_asc);
let calculated_offset: i64 =
(pagination.current_page - 1) * pagination.page_size + pagination.offset;
let data: Result<Vec<Value>, String> = query
.limit(pagination.limit.try_into().unwrap())
.offset(calculated_offset.try_into().unwrap())
.execute()
.await;
match data {
Ok(result) => Ok(result),
Err(err) => {
error!(
backend = "custom_supabase",
table = %table_name,
duration_ms = %fetch_start.elapsed().as_millis(),
error = %err,
"custom Supabase fetch_multiple_conditions failed"
);
Err(format!(
"Failed to fetch data in {:?}: {:?}",
table_name, err
))
}
}
}