athena_rs 3.26.2

Hyper performant polyglot Database driver
Documentation
use serde_json::Value;

use crate::drivers::postgresql::sqlx_driver::{PostgresInsertError, insert_row};
use crate::drivers::supabase::{client_router, client_router_health_aware};
use supabase_rs::SupabaseClient;

#[derive(Debug)]
pub enum SupabaseInsertError {
    UnknownClient { client_name: String },
    ClientInit { message: String },
    InsertFailed { message: String },
    BackendUnavailable { message: String },
}

#[derive(Debug)]
pub(crate) enum InsertDriverError {
    Postgres(PostgresInsertError),
    Supabase(SupabaseInsertError),
    StageTimeout {
        backend: &'static str,
        stage: &'static str,
        timeout_ms: u64,
    },
}

pub(crate) async fn postgres_insert_with_timeout(
    pool: &sqlx::postgres::PgPool,
    table_name: &str,
    insert_body: &Value,
    timeout_ms: u64,
) -> Result<Value, InsertDriverError> {
    let timeout = tokio::time::Duration::from_millis(timeout_ms);
    match tokio::time::timeout(timeout, insert_row(pool, table_name, insert_body)).await {
        Ok(Ok(value)) => Ok(value),
        Ok(Err(err)) => Err(InsertDriverError::Postgres(err)),
        Err(_) => Err(InsertDriverError::StageTimeout {
            backend: "postgres",
            stage: "db_insert",
            timeout_ms,
        }),
    }
}

pub(crate) async fn supabase_insert_with_timeout(
    table_name: String,
    insert_body: Value,
    client_name: &str,
    timeout_ms: u64,
) -> Result<(Value, String), InsertDriverError> {
    let timeout = tokio::time::Duration::from_millis(timeout_ms);
    match tokio::time::timeout(
        timeout,
        insert_via_supabase(table_name, insert_body, client_name),
    )
    .await
    {
        Ok(Ok(value)) => Ok(value),
        Ok(Err(err)) => Err(InsertDriverError::Supabase(err)),
        Err(_) => Err(InsertDriverError::StageTimeout {
            backend: "supabase",
            stage: "db_insert",
            timeout_ms,
        }),
    }
}

/// Routes inserts through the configured Supabase client implementation.
pub async fn insert_via_supabase(
    table_name: String,
    data: Value,
    client_name: &str,
) -> Result<(Value, String), SupabaseInsertError> {
    if let Ok(health_client) = client_router_health_aware(client_name) {
        match health_client.insert(&table_name, data.clone()).await {
            Ok(_) => {
                return Ok((
                    data,
                    format!("Successfully inserted data into {:?}", table_name),
                ));
            }
            Err(err) => {
                if err
                    .downcast_ref::<crate::drivers::scylla::health::HostOffline>()
                    .is_some()
                {
                    return Err(SupabaseInsertError::BackendUnavailable {
                        message: format!(
                            "Backend {} temporarily unavailable (circuit breaker)",
                            client_name
                        ),
                    });
                }
                return Err(SupabaseInsertError::InsertFailed {
                    message: err.to_string(),
                });
            }
        }
    }

    let client: SupabaseClient = client_router(client_name).await.map_err(|err| {
        if err.starts_with("Unknown client name") {
            SupabaseInsertError::UnknownClient {
                client_name: client_name.to_string(),
            }
        } else {
            SupabaseInsertError::ClientInit { message: err }
        }
    })?;

    let insert_result: Result<String, String> = client.insert(&table_name, data.clone()).await;
    match insert_result {
        Ok(_) => Ok((
            data,
            format!("Successfully inserted data into {:?}", table_name),
        )),
        Err(err) => Err(SupabaseInsertError::InsertFailed { message: err }),
    }
}