athena_rs 1.1.0

Database gateway API
Documentation
//! Supabase Postgres pooler integration.
//!
//! Executes SQL via Supabase's pooled Postgres endpoints.
//!
//! ## Modules
//!
//! - `client`: Health-aware Supabase client wrapper with circuit breaker patterns
//!
//! ## Legacy Functions
//!
//! The module also provides legacy direct client creation functions for backward compatibility.
pub mod client;

use colored::*;
use dotenv::dotenv;
use serde_json::{Map, Value, json};
use std::env::var;
use std::error::Error;
use std::time::Instant;
use supabase_rs::SupabaseClient;
use supabase_rs::query::QueryBuilder;
use tokio_postgres::NoTls;
use tokio_postgres::Row;
use tokio_postgres::types::Type;
use tracing::{debug, error, info, instrument, warn};

fn env_var(name: &str) -> Result<String, String> {
    dotenv().ok();
    var(name).map_err(|err| format!("missing environment variable '{}': {}", name, err))
}

fn supabase_client_from_env(url_key: &str, key_key: &str) -> Result<SupabaseClient, String> {
    let url: String = env_var(url_key)?;
    let key: String = env_var(key_key)?;
    SupabaseClient::new(url, key).map_err(|e| format!("Failed to create SupabaseClient: {:?}", e))
}

pub async fn supabase() -> Result<SupabaseClient, String> {
    supabase_client_from_env("SUPABASE_URL", "SUPABASE_KEY")
}

pub async fn supabase_xbp() -> Result<SupabaseClient, String> {
    supabase_client_from_env("SUPABASE_XBP_URL", "SUPABASE_XBP_KEY")
}

pub async fn athena_dexter() -> Result<SupabaseClient, String> {
    supabase_client_from_env("ATHENA_DEXTER_URL", "ATHENA_DEXTER_KEY")
}

pub async fn atabex() -> Result<SupabaseClient, String> {
    supabase_client_from_env("ATABEX_URL", "ATABEX_KEY")
}

pub async fn athena() -> Result<SupabaseClient, String> {
    dotenv().ok();

    let athena_url: String = "https://athena-db.com".to_string();
    let athena_key: String = env_var("ATHENA_KEY_12")?;

    SupabaseClient::new(athena_url, athena_key)
        .map_err(|e| format!("Failed to create AthenaClient: {:#?}", e))
}

#[instrument(skip(query), fields(query_length = query.len(), db_name = %athena_db_name))]
/// Execute a SQL query via Supabase's pooled Postgres and return JSON results.
pub async fn execute_query_supabase(
    query: String,
    athena_db_name: String,
) -> Result<Value, Box<dyn Error>> {
    let connection_string: String = env_var("SUPABASE_POSTGRES_URI")
        .map_err(|e| e.to_string())?
        .replace("{db}", &athena_db_name);

    debug!("Full connection string prepared (credentials masked)");

    let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await?;

    // Spawn the connection task
    tokio::spawn(async move {
        if let Err(e) = connection.await {
            error!("Supabase connection error: {}", e);
        } else {
            debug!("Supabase connection task completed successfully");
        }
    });

    let start_time: Instant = Instant::now();

    let rows: Vec<Row> = client.query(query.as_str(), &[]).await?;
    let _query_duration: std::time::Duration = start_time.elapsed();

    let mut results: Vec<Value> = Vec::new();
    let mut row_count: i32 = 0;

    for row in rows {
        row_count += 1;
        debug!("Processing row {}", row_count.to_string().yellow());

        let mut row_data: Map<String, Value> = Map::new();
        let columns: &[tokio_postgres::Column] = row.columns();

        debug!("Row has {} columns", columns.len().to_string().cyan());

        for (i, column) in columns.iter().enumerate() {
            let column_name: &str = column.name();
            let column_type = column.type_();

            debug!(
                "Processing column '{}' (type: {:?}) at index {}",
                column_name.cyan(),
                column_type,
                i.to_string().yellow()
            );

            // FIXME: Floris; move this to a different dedicated cast function
            let value: Value = match column_type {
                &Type::INT4 => {
                    debug!("Casting column '{}' as INT4", column_name);
                    json!(row.get::<_, Option<i32>>(i))
                }
                &Type::INT8 => {
                    debug!("Casting column '{}' as INT8", column_name);
                    json!(row.get::<_, Option<i64>>(i))
                }
                &Type::TEXT | &Type::VARCHAR => {
                    debug!("Casting column '{}' as TEXT/VARCHAR", column_name);
                    json!(row.get::<_, Option<String>>(i))
                }
                &Type::BOOL => {
                    debug!("Casting column '{}' as BOOL", column_name);
                    json!(row.get::<_, Option<bool>>(i))
                }
                &Type::FLOAT4 => {
                    debug!("Casting column '{}' as FLOAT4", column_name);
                    json!(row.get::<_, Option<f32>>(i))
                }
                &Type::FLOAT8 => {
                    debug!("Casting column '{}' as FLOAT8", column_name);
                    json!(row.get::<_, Option<f64>>(i))
                }
                &Type::JSONB | &Type::JSON => {
                    debug!("Casting column '{}' as JSON/JSONB", column_name);
                    json!(row.get::<_, Option<Value>>(i))
                }
                _ => {
                    warn!(
                        "Unknown type {:?} for column '{}', falling back to string cast",
                        column_type, column_name
                    );
                    json!(row.get::<_, Option<String>>(i))
                }
            };

            row_data.insert(column_name.to_string(), value);
        }

        results.push(Value::Object(row_data));

        if row_count % 100 == 0 {
            debug!("Processed {} rows so far", row_count.to_string().green());
        }
    }

    let duration: u128 = start_time.elapsed().as_millis();

    let json_result: Value = json!({
        "data": results,
        "db_name": athena_db_name,
        "duration": duration,
        "message": "Successfully executed query",
        "status": "success"
    });

    Ok(json_result)
}

/// ## Client router
///
/// This function routes to the correct client based on the provided client name.
///
/// ### Parameters
///
/// - `client_name`: A `&str` representing the name of the client to route to.
///
/// ### Returns
///
/// - `Result<SupabaseClient, String>`: An instance of the SupabaseClient configured with the provided URL and key, or an error.
///
/// ### Errors
///
/// - Returns an error message if the client name is not found or environment variables are missing.
///
/// ### Example
///
/// ```rust,no_run
/// # use athena_rs::drivers::supabase::client_router;
/// # async fn doc_example() {
/// let client = client_router("athena").await;
/// # let _ = client;
/// # }
/// ```
pub async fn client_router(client_name: &str) -> Result<SupabaseClient, String> {
    match client_name {
        "supabase" => supabase().await,
        "xbp" => supabase_xbp().await,
        "athena" => athena().await,
        "atabex" => atabex().await,
        "athena_dexter" => athena_dexter().await,
        _ => Err(format!("Unknown client name: {}", client_name)),
    }
}

/// FIXME: unused variables
#[allow(unused_variables)]
pub async fn fetch_multiple_conditions(
    table_name: String,
    columns: Vec<&str>,
    conditions: Option<Vec<Value>>,
    limit: i64,
    client_name: &str,
    current_page: i64,
    page_size: i64,
    offset: i64,
    total_pages: i64,
) -> Result<Vec<Value>, String> {
    let client: SupabaseClient = client_router(client_name).await?;
    let fetch_start: Instant = Instant::now();

    let mut query: QueryBuilder = client.select(&table_name).columns(columns);

    if let Some(conditions) = conditions {
        for condition in conditions {
            if let Some(eq_column) = condition.get("eq_column").and_then(Value::as_str) {
                if let Some(eq_value) = condition.get("eq_value").and_then(Value::as_str) {
                    query = query.eq(eq_column, eq_value);
                }
            }
        }
    }

    let calculated_offset: i64 = (current_page - 1) * page_size + offset;
    let data: Result<Vec<Value>, String> = query
        .order("created_at", false)
        .limit(limit.try_into().unwrap())
        .offset(calculated_offset.try_into().unwrap())
        .execute()
        .await;

    match data {
        Ok(result) => Ok(result),
        Err(err) => {
            error!(
                backend = "supabase",
                table = %table_name,
                duration_ms = %fetch_start.elapsed().as_millis(),
                error = %err,
                "Supabase fetch_multiple_conditions failed"
            );
            Err(format!(
                "Failed to fetch data in {:?}: {:?}",
                table_name, err
            ))
        }
    }
}

/// Same as `fetch_multiple_conditions` but uses a provided SupabaseClient instance.
#[allow(unused_variables)]
pub async fn fetch_multiple_conditions_with_client(
    client: SupabaseClient,
    table_name: String,
    columns: Vec<&str>,
    conditions: Option<Vec<Value>>,
    limit: i64,
    current_page: i64,
    page_size: i64,
    offset: i64,
    total_pages: i64,
) -> Result<Vec<Value>, String> {
    let fetch_start: Instant = Instant::now();
    let mut query: QueryBuilder = client.select(&table_name).columns(columns);

    if let Some(conditions) = conditions {
        for condition in conditions {
            if let Some(eq_column) = condition.get("eq_column").and_then(Value::as_str) {
                if let Some(eq_value) = condition.get("eq_value").and_then(Value::as_str) {
                    query = query.eq(eq_column, eq_value);
                    info!(
                        backend = "custom_supabase",
                        column = %eq_column,
                        value = %eq_value,
                        "applied condition to custom Supabase query"
                    );
                }
            }
        }
    }

    let calculated_offset: i64 = (current_page - 1) * page_size + offset;

    let data: Result<Vec<Value>, String> = query
        .order("created_at", false)
        .limit(limit.try_into().unwrap())
        .offset(calculated_offset.try_into().unwrap())
        .execute()
        .await;

    match data {
        Ok(result) => Ok(result),
        Err(err) => {
            error!(
                backend = "custom_supabase",
                table = %table_name,
                duration_ms = %fetch_start.elapsed().as_millis(),
                error = %err,
                "custom Supabase fetch_multiple_conditions failed"
            );
            Err(format!(
                "Failed to fetch data in {:?}: {:?}",
                table_name, err
            ))
        }
    }
}