pub mod client;
use colored::*;
use dotenv::dotenv;
use serde_json::{Map, Value, json};
use std::env::var;
use std::error::Error;
use std::time::Instant;
use supabase_rs::SupabaseClient;
use supabase_rs::query::QueryBuilder;
use tokio_postgres::NoTls;
use tokio_postgres::Row;
use tokio_postgres::types::Type;
use tracing::{debug, error, info, instrument, warn};
fn env_var(name: &str) -> Result<String, String> {
dotenv().ok();
var(name).map_err(|err| format!("missing environment variable '{}': {}", name, err))
}
fn supabase_client_from_env(url_key: &str, key_key: &str) -> Result<SupabaseClient, String> {
let url: String = env_var(url_key)?;
let key: String = env_var(key_key)?;
SupabaseClient::new(url, key).map_err(|e| format!("Failed to create SupabaseClient: {:?}", e))
}
pub async fn supabase() -> Result<SupabaseClient, String> {
supabase_client_from_env("SUPABASE_URL", "SUPABASE_KEY")
}
pub async fn supabase_xbp() -> Result<SupabaseClient, String> {
supabase_client_from_env("SUPABASE_XBP_URL", "SUPABASE_XBP_KEY")
}
pub async fn athena_dexter() -> Result<SupabaseClient, String> {
supabase_client_from_env("ATHENA_DEXTER_URL", "ATHENA_DEXTER_KEY")
}
pub async fn atabex() -> Result<SupabaseClient, String> {
supabase_client_from_env("ATABEX_URL", "ATABEX_KEY")
}
pub async fn athena() -> Result<SupabaseClient, String> {
dotenv().ok();
let athena_url: String = "https://athena-db.com".to_string();
let athena_key: String = env_var("ATHENA_KEY_12")?;
SupabaseClient::new(athena_url, athena_key)
.map_err(|e| format!("Failed to create AthenaClient: {:#?}", e))
}
#[instrument(skip(query), fields(query_length = query.len(), db_name = %athena_db_name))]
pub async fn execute_query_supabase(
query: String,
athena_db_name: String,
) -> Result<Value, Box<dyn Error>> {
let connection_string: String = env_var("SUPABASE_POSTGRES_URI")
.map_err(|e| e.to_string())?
.replace("{db}", &athena_db_name);
debug!("Full connection string prepared (credentials masked)");
let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Supabase connection error: {}", e);
} else {
debug!("Supabase connection task completed successfully");
}
});
let start_time: Instant = Instant::now();
let rows: Vec<Row> = client.query(query.as_str(), &[]).await?;
let _query_duration: std::time::Duration = start_time.elapsed();
let mut results: Vec<Value> = Vec::new();
let mut row_count: i32 = 0;
for row in rows {
row_count += 1;
debug!("Processing row {}", row_count.to_string().yellow());
let mut row_data: Map<String, Value> = Map::new();
let columns: &[tokio_postgres::Column] = row.columns();
debug!("Row has {} columns", columns.len().to_string().cyan());
for (i, column) in columns.iter().enumerate() {
let column_name: &str = column.name();
let column_type = column.type_();
debug!(
"Processing column '{}' (type: {:?}) at index {}",
column_name.cyan(),
column_type,
i.to_string().yellow()
);
let value: Value = match column_type {
&Type::INT4 => {
debug!("Casting column '{}' as INT4", column_name);
json!(row.get::<_, Option<i32>>(i))
}
&Type::INT8 => {
debug!("Casting column '{}' as INT8", column_name);
json!(row.get::<_, Option<i64>>(i))
}
&Type::TEXT | &Type::VARCHAR => {
debug!("Casting column '{}' as TEXT/VARCHAR", column_name);
json!(row.get::<_, Option<String>>(i))
}
&Type::BOOL => {
debug!("Casting column '{}' as BOOL", column_name);
json!(row.get::<_, Option<bool>>(i))
}
&Type::FLOAT4 => {
debug!("Casting column '{}' as FLOAT4", column_name);
json!(row.get::<_, Option<f32>>(i))
}
&Type::FLOAT8 => {
debug!("Casting column '{}' as FLOAT8", column_name);
json!(row.get::<_, Option<f64>>(i))
}
&Type::JSONB | &Type::JSON => {
debug!("Casting column '{}' as JSON/JSONB", column_name);
json!(row.get::<_, Option<Value>>(i))
}
_ => {
warn!(
"Unknown type {:?} for column '{}', falling back to string cast",
column_type, column_name
);
json!(row.get::<_, Option<String>>(i))
}
};
row_data.insert(column_name.to_string(), value);
}
results.push(Value::Object(row_data));
if row_count % 100 == 0 {
debug!("Processed {} rows so far", row_count.to_string().green());
}
}
let duration: u128 = start_time.elapsed().as_millis();
let json_result: Value = json!({
"data": results,
"db_name": athena_db_name,
"duration": duration,
"message": "Successfully executed query",
"status": "success"
});
Ok(json_result)
}
pub async fn client_router(client_name: &str) -> Result<SupabaseClient, String> {
match client_name {
"supabase" => supabase().await,
"xbp" => supabase_xbp().await,
"athena" => athena().await,
"atabex" => atabex().await,
"athena_dexter" => athena_dexter().await,
_ => Err(format!("Unknown client name: {}", client_name)),
}
}
#[allow(unused_variables)]
pub async fn fetch_multiple_conditions(
table_name: String,
columns: Vec<&str>,
conditions: Option<Vec<Value>>,
limit: i64,
client_name: &str,
current_page: i64,
page_size: i64,
offset: i64,
total_pages: i64,
) -> Result<Vec<Value>, String> {
let client: SupabaseClient = client_router(client_name).await?;
let fetch_start: Instant = Instant::now();
let mut query: QueryBuilder = client.select(&table_name).columns(columns);
if let Some(conditions) = conditions {
for condition in conditions {
if let Some(eq_column) = condition.get("eq_column").and_then(Value::as_str) {
if let Some(eq_value) = condition.get("eq_value").and_then(Value::as_str) {
query = query.eq(eq_column, eq_value);
}
}
}
}
let calculated_offset: i64 = (current_page - 1) * page_size + offset;
let data: Result<Vec<Value>, String> = query
.order("created_at", false)
.limit(limit.try_into().unwrap())
.offset(calculated_offset.try_into().unwrap())
.execute()
.await;
match data {
Ok(result) => Ok(result),
Err(err) => {
error!(
backend = "supabase",
table = %table_name,
duration_ms = %fetch_start.elapsed().as_millis(),
error = %err,
"Supabase fetch_multiple_conditions failed"
);
Err(format!(
"Failed to fetch data in {:?}: {:?}",
table_name, err
))
}
}
}
#[allow(unused_variables)]
pub async fn fetch_multiple_conditions_with_client(
client: SupabaseClient,
table_name: String,
columns: Vec<&str>,
conditions: Option<Vec<Value>>,
limit: i64,
current_page: i64,
page_size: i64,
offset: i64,
total_pages: i64,
) -> Result<Vec<Value>, String> {
let fetch_start: Instant = Instant::now();
let mut query: QueryBuilder = client.select(&table_name).columns(columns);
if let Some(conditions) = conditions {
for condition in conditions {
if let Some(eq_column) = condition.get("eq_column").and_then(Value::as_str) {
if let Some(eq_value) = condition.get("eq_value").and_then(Value::as_str) {
query = query.eq(eq_column, eq_value);
info!(
backend = "custom_supabase",
column = %eq_column,
value = %eq_value,
"applied condition to custom Supabase query"
);
}
}
}
}
let calculated_offset: i64 = (current_page - 1) * page_size + offset;
let data: Result<Vec<Value>, String> = query
.order("created_at", false)
.limit(limit.try_into().unwrap())
.offset(calculated_offset.try_into().unwrap())
.execute()
.await;
match data {
Ok(result) => Ok(result),
Err(err) => {
error!(
backend = "custom_supabase",
table = %table_name,
duration_ms = %fetch_start.elapsed().as_millis(),
error = %err,
"custom Supabase fetch_multiple_conditions failed"
);
Err(format!(
"Failed to fetch data in {:?}: {:?}",
table_name, err
))
}
}
}