athena_rs 2.13.0

Database gateway API
Documentation
//! Unified Athena client surface used by downstream crates.
pub mod backend;
pub mod backends;
pub mod builder;
pub mod config;
pub mod error;
pub mod query_builder;
pub mod translator;

use crate::drivers::scylla::client::ScyllaConnectionInfo;
use crate::drivers::supabase::client::SupabaseConnectionInfo;
use backend::{
    BackendError, BackendResult, BackendType, DatabaseBackend, HealthStatus, QueryLanguage,
    QueryResult, TranslatedQuery,
};
use backends::{
    gateway::GatewayBackend, postgres::PostgresBackend, scylla::ScyllaBackend,
    supabase::SupabaseBackend,
};
use builder::AthenaClientBuilder;
use config::ClientConfig;
use query_builder::{
    DeleteBuilder, DeleteQuery, InsertBuilder, InsertQuery, SelectBuilder, SelectQuery,
    UpdateBuilder, UpdateQuery,
};
use translator::{CqlTranslator, PostgrestTranslator, QueryTranslator, SqlTranslator};

pub struct AthenaClient {
    backend: Box<dyn DatabaseBackend>,
    config: ClientConfig,
}

impl AthenaClient {
    pub fn builder() -> AthenaClientBuilder {
        AthenaClientBuilder::new()
    }

    pub async fn build(builder: AthenaClientBuilder) -> BackendResult<Self> {
        let config = builder.build_config()?;
        Self::from_config(config).await
    }

    pub async fn new(
        url: impl Into<String>,
        key: impl Into<String>,
        client: impl Into<String>,
    ) -> BackendResult<Self> {
        Self::new_with_backend(url, key, client, BackendType::Native).await
    }

    pub async fn new_with_backend(
        url: impl Into<String>,
        key: impl Into<String>,
        client: impl Into<String>,
        backend: BackendType,
    ) -> BackendResult<Self> {
        let builder = Self::builder()
            .backend(backend)
            .url(url)
            .key(key)
            .client(client);
        Self::build(builder).await
    }

    pub async fn new_with_backend_name(
        url: impl Into<String>,
        key: impl Into<String>,
        client: impl Into<String>,
        backend_name: &str,
    ) -> BackendResult<Self> {
        let backend = parse_backend_name(backend_name);
        Self::new_with_backend(url, key, client, backend).await
    }

    pub async fn new_direct(url: impl Into<String>, key: impl Into<String>) -> BackendResult<Self> {
        let builder = Self::builder().url(url).key(key);
        Self::build(builder).await
    }

    pub fn select(&self, table: &str) -> SelectBuilder<'_> {
        SelectBuilder::new(self, table)
    }

    pub fn insert(&self, table: &str) -> InsertBuilder<'_> {
        InsertBuilder::new(self, table)
    }

    pub fn update(&self, table: &str, row_id: Option<String>) -> UpdateBuilder<'_> {
        UpdateBuilder::new(self, table, row_id)
    }

    pub fn delete(&self, table: &str, row_id: Option<String>) -> DeleteBuilder<'_> {
        DeleteBuilder::new(self, table, row_id)
    }

    pub async fn execute_sql(&self, sql: &str) -> BackendResult<QueryResult> {
        let translated = TranslatedQuery::new(sql, QueryLanguage::Sql, Vec::new(), None);
        self.backend.execute_query(translated).await
    }

    pub async fn execute_cql(&self, cql: &str) -> BackendResult<QueryResult> {
        let translated = TranslatedQuery::new(cql, QueryLanguage::Cql, Vec::new(), None);
        self.backend.execute_query(translated).await
    }

    pub async fn health_check(&self) -> BackendResult<HealthStatus> {
        self.backend.health_check().await
    }

    pub fn config(&self) -> &ClientConfig {
        &self.config
    }

    fn translate_query<FSql, FPostgrest, FCql>(
        &self,
        sql_fn: FSql,
        postgrest_fn: FPostgrest,
        cql_fn: FCql,
    ) -> BackendResult<TranslatedQuery>
    where
        FSql: FnOnce(&SqlTranslator) -> BackendResult<TranslatedQuery>,
        FPostgrest: FnOnce(&PostgrestTranslator) -> BackendResult<TranslatedQuery>,
        FCql: FnOnce(&CqlTranslator) -> BackendResult<TranslatedQuery>,
    {
        match self.backend.backend_type() {
            BackendType::Supabase | BackendType::Postgrest => postgrest_fn(&PostgrestTranslator),
            BackendType::Scylla => cql_fn(&CqlTranslator),
            BackendType::PostgreSQL | BackendType::Native | BackendType::Neon => {
                sql_fn(&SqlTranslator)
            }
        }
    }

    pub(crate) async fn execute_select(
        &self,
        builder: SelectBuilder<'_>,
    ) -> BackendResult<QueryResult> {
        let query = builder.into_parts();
        let translated = self.translate_query(
            |translator| translate_select(translator, &query),
            |translator| translate_select(translator, &query),
            |translator| translate_select(translator, &query),
        )?;
        self.backend.execute_query(translated).await
    }

    pub(crate) async fn execute_insert(
        &self,
        builder: InsertBuilder<'_>,
    ) -> BackendResult<QueryResult> {
        let query = builder.into_parts();
        let translated = self.translate_query(
            |translator| translate_insert(translator, &query),
            |translator| translate_insert(translator, &query),
            |translator| translate_insert(translator, &query),
        )?;
        self.backend.execute_query(translated).await
    }

    pub(crate) async fn execute_update(
        &self,
        builder: UpdateBuilder<'_>,
    ) -> BackendResult<QueryResult> {
        let query = builder.into_parts();
        let translated = self.translate_query(
            |translator| translate_update(translator, &query),
            |translator| translate_update(translator, &query),
            |translator| translate_update(translator, &query),
        )?;
        self.backend.execute_query(translated).await
    }

    pub(crate) async fn execute_delete(
        &self,
        builder: DeleteBuilder<'_>,
    ) -> BackendResult<QueryResult> {
        let query = builder.into_parts();
        let translated = self.translate_query(
            |translator| translate_delete(translator, &query),
            |translator| translate_delete(translator, &query),
            |translator| translate_delete(translator, &query),
        )?;
        self.backend.execute_query(translated).await
    }

    async fn from_config(config: ClientConfig) -> BackendResult<Self> {
        if let Some(client_name) = config.client_name.clone() {
            let key = config.connection.key.clone().ok_or_else(|| {
                BackendError::Generic(
                    "Athena key is required when using client-routed gateway mode".to_string(),
                )
            })?;
            let backend = GatewayBackend::new(
                config.connection.url.clone(),
                key,
                client_name,
                config.backend_type,
            );
            return Ok(Self {
                backend: Box::new(backend),
                config,
            });
        }

        let backend: Box<dyn DatabaseBackend> = match config.backend_type {
            BackendType::Supabase => {
                let key =
                    config.connection.key.clone().ok_or_else(|| {
                        BackendError::Generic("Supabase key is required".to_string())
                    })?;
                let info = SupabaseConnectionInfo::new(config.connection.url.clone(), key);
                Box::new(SupabaseBackend::new(info)?)
            }
            BackendType::Scylla => {
                let info = ScyllaConnectionInfo {
                    host: config.connection.url.clone(),
                    username: config.connection.database.clone().unwrap_or_default(),
                    password: config.connection.key.clone().unwrap_or_default(),
                };
                Box::new(ScyllaBackend::new(info))
            }
            BackendType::PostgreSQL
            | BackendType::Postgrest
            | BackendType::Native
            | BackendType::Neon => {
                let backend =
                    PostgresBackend::from_connection_string(&config.connection.url).await?;
                Box::new(backend)
            }
        };

        Ok(Self { backend, config })
    }
}

fn parse_backend_name(backend_name: &str) -> BackendType {
    match backend_name.to_ascii_lowercase().as_str() {
        "supabase" => BackendType::Supabase,
        "postgrest" => BackendType::Postgrest,
        "scylla" => BackendType::Scylla,
        "neon" => BackendType::Neon,
        "postgresql" | "postgres" => BackendType::PostgreSQL,
        _ => BackendType::Native,
    }
}

fn translate_select<T>(translator: &T, query: &SelectQuery) -> BackendResult<TranslatedQuery>
where
    T: QueryTranslator,
{
    translator.translate_select(query)
}

fn translate_insert<T>(translator: &T, query: &InsertQuery) -> BackendResult<TranslatedQuery>
where
    T: QueryTranslator,
{
    translator.translate_insert(query)
}

fn translate_update<T>(translator: &T, query: &UpdateQuery) -> BackendResult<TranslatedQuery>
where
    T: QueryTranslator,
{
    translator.translate_update(query)
}

fn translate_delete<T>(translator: &T, query: &DeleteQuery) -> BackendResult<TranslatedQuery>
where
    T: QueryTranslator,
{
    translator.translate_delete(query)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn parse_backend_name_maps_supported_aliases() {
        assert_eq!(parse_backend_name("supabase"), BackendType::Supabase);
        assert_eq!(parse_backend_name("postgrest"), BackendType::Postgrest);
        assert_eq!(parse_backend_name("scylla"), BackendType::Scylla);
        assert_eq!(parse_backend_name("neon"), BackendType::Neon);
        assert_eq!(parse_backend_name("postgresql"), BackendType::PostgreSQL);
        assert_eq!(parse_backend_name("postgres"), BackendType::PostgreSQL);
        assert_eq!(parse_backend_name("unknown"), BackendType::Native);
    }

    #[tokio::test]
    async fn new_defaults_to_native_and_sets_client_routing() {
        let client = AthenaClient::new("http://localhost:4052", "secret", "reporting")
            .await
            .expect("client should build without network");

        assert_eq!(client.config().backend_type, BackendType::Native);
        assert_eq!(client.config().client_name.as_deref(), Some("reporting"));
        assert_eq!(client.config().connection.url, "http://localhost:4052");
    }

    #[tokio::test]
    async fn new_with_backend_name_resolves_backend_case_insensitive() {
        let client = AthenaClient::new_with_backend_name(
            "http://localhost:4052",
            "secret",
            "reporting",
            "NeOn",
        )
        .await
        .expect("client should build without network");

        assert_eq!(client.config().backend_type, BackendType::Neon);
    }
}