pub mod backend;
pub mod backends;
pub mod builder;
pub mod config;
pub mod error;
pub mod gateway_api;
pub mod query_builder;
pub mod translator;
use crate::drivers::scylla::client::ScyllaConnectionInfo;
use crate::drivers::supabase::client::SupabaseConnectionInfo;
use backend::{
BackendError, BackendResult, BackendType, DatabaseBackend, HealthStatus, QueryResult,
};
use backends::{
gateway::GatewayBackend, postgres::PostgresBackend, scylla::ScyllaBackend,
supabase::SupabaseBackend,
};
use builder::AthenaClientBuilder;
use config::ClientConfig;
pub use gateway_api::{
Gateway, GatewayDeleteRequest, GatewayDriverRequest, GatewayFetchRequest, GatewayInsertRequest,
GatewayOperation, GatewayPath, GatewayQueryResult, GatewayRequest, GatewayRequestFactory,
GatewayRequestPayload, GatewayRoutes, GatewayRpcFilter, GatewayRpcFilterOperator,
GatewayRpcRequest, GatewaySqlRequest, GatewayUpdateRequest, GatewayUpdateScope,
build_gateway_endpoint, request,
};
use query_builder::{DeleteBuilder, InsertBuilder, SelectBuilder, UpdateBuilder};
use serde_json::Value;
use translator::{CqlTranslator, PostgrestTranslator, QueryTranslator, SqlTranslator};
pub use backend::{QueryLanguage, TranslatedQuery};
pub use query_builder::Condition;
pub use query_builder::ConditionOperator;
pub use query_builder::OrderDirection;
pub use query_builder::RpcBuilder;
pub const GATEWAY_FETCH_PATH: &str = Gateway::FETCH_PATH;
pub const GATEWAY_INSERT_PATH: &str = Gateway::INSERT_PATH;
pub const GATEWAY_UPDATE_PATH: &str = Gateway::UPDATE_PATH;
pub const GATEWAY_DELETE_PATH: &str = Gateway::DELETE_PATH;
pub const GATEWAY_QUERY_PATH: &str = Gateway::QUERY_PATH;
pub const GATEWAY_SQL_PATH: &str = Gateway::SQL_PATH;
pub const GATEWAY_RPC_PATH: &str = Gateway::RPC_PATH;
pub const LEGACY_SQL_PATH: &str = Gateway::LEGACY_SQL_PATH;
pub struct AthenaClient {
backend: Box<dyn DatabaseBackend>,
config: ClientConfig,
}
impl AthenaClient {
pub fn builder() -> AthenaClientBuilder {
AthenaClientBuilder::new()
}
pub async fn build(builder: AthenaClientBuilder) -> BackendResult<Self> {
let config: ClientConfig = builder.build_config()?;
Self::from_config(config).await
}
pub async fn new(
url: impl Into<String>,
key: impl Into<String>,
client: impl Into<String>,
) -> BackendResult<Self> {
Self::new_with_backend(url, key, client, BackendType::Native).await
}
pub async fn new_with_backend(
url: impl Into<String>,
key: impl Into<String>,
client: impl Into<String>,
backend: BackendType,
) -> BackendResult<Self> {
let builder: AthenaClientBuilder = Self::builder()
.backend(backend)
.url(url)
.key(key)
.client(client);
Self::build(builder).await
}
pub async fn new_with_backend_name(
url: impl Into<String>,
key: impl Into<String>,
client: impl Into<String>,
backend_name: &str,
) -> BackendResult<Self> {
let backend: BackendType = parse_backend_name(backend_name);
Self::new_with_backend(url, key, client, backend).await
}
pub async fn new_direct(url: impl Into<String>, key: impl Into<String>) -> BackendResult<Self> {
let builder: AthenaClientBuilder = Self::builder().url(url).key(key);
Self::build(builder).await
}
pub fn gateway_routes(&self) -> GatewayRoutes {
GatewayRoutes::for_base_url(&self.config.connection.url)
}
pub fn gateway_endpoint(&self, path: &str) -> String {
build_gateway_endpoint(&self.config.connection.url, path)
}
pub fn select(&self, table: &str) -> SelectBuilder<'_> {
SelectBuilder::new(self, table)
}
pub fn fetch(&self, table: &str) -> SelectBuilder<'_> {
self.select(table)
}
pub fn insert(&self, table: &str) -> InsertBuilder<'_> {
InsertBuilder::new(self, table)
}
pub fn update(&self, table: &str) -> UpdateBuilder<'_> {
UpdateBuilder::new(self, table, None)
}
pub fn update_by_id(&self, table: &str, row_id: impl Into<String>) -> UpdateBuilder<'_> {
UpdateBuilder::new(self, table, Some(row_id.into()))
}
pub fn delete(&self, table: &str) -> DeleteBuilder<'_> {
DeleteBuilder::new(self, table, None)
}
pub fn delete_by_id(&self, table: &str, row_id: impl Into<String>) -> DeleteBuilder<'_> {
DeleteBuilder::new(self, table, Some(row_id.into()))
}
pub fn rpc(&self, function: &str, args: Value) -> query_builder::RpcBuilder<'_> {
query_builder::RpcBuilder::new(self, function, args)
}
pub async fn gateway_request<R>(&self, request: R) -> BackendResult<GatewayQueryResult>
where
R: Into<GatewayRequest>,
{
let request: GatewayRequest = request.into();
match request.into_payload() {
GatewayRequestPayload::Fetch(request) => {
let mut builder: SelectBuilder<'_> = self.select(&request.table);
if !request.columns.is_empty() {
builder = builder.columns(request.columns);
}
if let Some(raw_select) = request.raw_select {
builder = builder.raw_select(raw_select);
}
builder = builder.where_conditions(request.conditions);
for (column, direction) in request.order_by {
builder = builder.order_by(&column, direction);
}
if let Some(limit) = request.limit {
builder = builder.limit(limit);
}
if let Some(offset) = request.offset {
builder = builder.offset(offset);
}
builder.execute().await
}
GatewayRequestPayload::Insert(request) => {
self.insert(&request.table)
.payload(request.payload)
.execute()
.await
}
GatewayRequestPayload::Update(request) => {
let update_scope: GatewayUpdateScope = request.scope;
let mut builder: UpdateBuilder<'_> = if let Some(row_id) = update_scope.row_id {
self.update_by_id(&request.table, row_id)
} else {
self.update(&request.table)
};
builder = builder.where_conditions(update_scope.conditions);
if update_scope.allow_unfiltered {
builder = builder.unsafe_unfiltered();
}
builder.payload(request.payload).execute().await
}
GatewayRequestPayload::Delete(request) => {
let mut builder: DeleteBuilder<'_> = if let Some(row_id) = request.row_id {
self.delete_by_id(&request.table, row_id)
} else {
self.delete(&request.table)
};
builder = builder.where_conditions(request.conditions);
if request.allow_unfiltered {
builder = builder.unsafe_unfiltered();
}
builder.execute().await
}
GatewayRequestPayload::Sql(request) => self.sql(&request.query).await,
GatewayRequestPayload::Rpc(request) => self.rpc_request(request).await,
}
}
pub async fn fetch_request<R>(&self, request: R) -> BackendResult<GatewayQueryResult>
where
R: Into<GatewayRequest>,
{
self.gateway_request(request).await
}
pub async fn insert_request<R>(&self, request: R) -> BackendResult<GatewayQueryResult>
where
R: Into<GatewayRequest>,
{
self.gateway_request(request).await
}
pub async fn update_request<R>(&self, request: R) -> BackendResult<GatewayQueryResult>
where
R: Into<GatewayRequest>,
{
self.gateway_request(request).await
}
pub async fn delete_request<R>(&self, request: R) -> BackendResult<GatewayQueryResult>
where
R: Into<GatewayRequest>,
{
self.gateway_request(request).await
}
pub async fn rpc_request(
&self,
request: GatewayRpcRequest,
) -> BackendResult<GatewayQueryResult> {
self.execute_rpc_request(request).await
}
pub async fn execute_sql(&self, sql: &str) -> BackendResult<QueryResult> {
let translated: TranslatedQuery = TranslatedQuery::sql(sql, Vec::new(), None);
self.backend.execute_query(translated).await
}
pub async fn sql(&self, sql: &str) -> BackendResult<QueryResult> {
self.execute_sql(sql).await
}
pub async fn sql_request<R>(&self, request: R) -> BackendResult<GatewayQueryResult>
where
R: Into<GatewayRequest>,
{
self.gateway_request(request).await
}
pub async fn execute_cql(&self, cql: &str) -> BackendResult<QueryResult> {
let translated: TranslatedQuery = TranslatedQuery::cql(cql, Vec::new(), None);
self.backend.execute_query(translated).await
}
pub async fn health_check(&self) -> BackendResult<HealthStatus> {
self.backend.health_check().await
}
pub fn config(&self) -> &ClientConfig {
&self.config
}
fn uses_gateway_crud_routing(&self) -> bool {
self.config.client_name.is_some() && self.backend.supports_sql()
}
fn translate_request(&self, request: &GatewayRequest) -> BackendResult<TranslatedQuery> {
if self.uses_gateway_crud_routing() {
return PostgrestTranslator.translate_request(request);
}
match self.backend.backend_type() {
BackendType::Supabase | BackendType::Postgrest => {
PostgrestTranslator.translate_request(request)
}
BackendType::Scylla => CqlTranslator.translate_request(request),
BackendType::PostgreSQL | BackendType::Native | BackendType::Neon => {
SqlTranslator.translate_request(request)
}
}
}
pub(crate) async fn execute_select(
&self,
builder: SelectBuilder<'_>,
) -> BackendResult<QueryResult> {
let request: GatewayRequest = builder.into_request().into();
let translated: TranslatedQuery = self.translate_request(&request)?;
self.backend.execute_query(translated).await
}
pub(crate) async fn execute_insert(
&self,
builder: InsertBuilder<'_>,
) -> BackendResult<QueryResult> {
let request: GatewayRequest = builder.into_request().into();
let translated: TranslatedQuery = self.translate_request(&request)?;
self.backend.execute_query(translated).await
}
pub(crate) async fn execute_update(
&self,
builder: UpdateBuilder<'_>,
) -> BackendResult<QueryResult> {
let request: GatewayUpdateRequest = builder.into_request();
if !query_has_scope(request.scope.row_id.as_deref(), &request.scope.conditions)
&& !request.scope.allow_unfiltered
{
return Err(BackendError::Generic(
"Refusing unfiltered update; add `where_*`/`row_id(...)` or call `unsafe_unfiltered()`"
.to_string(),
));
}
let request: GatewayRequest = request.into();
let translated: TranslatedQuery = self.translate_request(&request)?;
self.backend.execute_query(translated).await
}
pub(crate) async fn execute_delete(
&self,
builder: DeleteBuilder<'_>,
) -> BackendResult<QueryResult> {
let request: GatewayDeleteRequest = builder.into_request();
if !query_has_scope(request.row_id.as_deref(), &request.conditions)
&& !request.allow_unfiltered
{
return Err(BackendError::Generic(
"Refusing unfiltered delete; add `where_*`/`row_id(...)` or call `unsafe_unfiltered()`"
.to_string(),
));
}
let request: GatewayRequest = request.into();
let translated: TranslatedQuery = self.translate_request(&request)?;
self.backend.execute_query(translated).await
}
pub(crate) async fn execute_rpc(
&self,
builder: query_builder::RpcBuilder<'_>,
) -> BackendResult<QueryResult> {
self.execute_rpc_request(builder.into_request()).await
}
async fn execute_rpc_request(&self, request: GatewayRpcRequest) -> BackendResult<QueryResult> {
if let Some(gateway_backend) = self.backend.as_any().downcast_ref::<GatewayBackend>() {
return gateway_backend.execute_rpc_request(&request).await;
}
Err(BackendError::Generic(
"RPC requests are only supported in Athena gateway client mode".to_string(),
))
}
async fn from_config(config: ClientConfig) -> BackendResult<Self> {
if let Some(client_name) = config.client_name.clone() {
let key: String = config.connection.key.clone().ok_or_else(|| {
BackendError::Generic(
"Athena key is required when using client-routed gateway mode".to_string(),
)
})?;
let backend: GatewayBackend = GatewayBackend::new(
config.connection.url.clone(),
key,
client_name,
config.backend_type,
);
return Ok(Self {
backend: Box::new(backend),
config,
});
}
let backend: Box<dyn DatabaseBackend> = match config.backend_type {
BackendType::Supabase => {
let key: String =
config.connection.key.clone().ok_or_else(|| {
BackendError::Generic("Supabase key is required".to_string())
})?;
let info = SupabaseConnectionInfo::new(config.connection.url.clone(), key);
Box::new(SupabaseBackend::new(info)?)
}
BackendType::Scylla => {
let info: ScyllaConnectionInfo = ScyllaConnectionInfo {
host: config.connection.url.clone(),
username: config.connection.database.clone().unwrap_or_default(),
password: config.connection.key.clone().unwrap_or_default(),
};
Box::new(ScyllaBackend::new(info))
}
BackendType::PostgreSQL
| BackendType::Postgrest
| BackendType::Native
| BackendType::Neon => {
let backend: PostgresBackend =
PostgresBackend::from_connection_string(&config.connection.url).await?;
Box::new(backend)
}
};
Ok(Self { backend, config })
}
}
fn parse_backend_name(backend_name: &str) -> BackendType {
match backend_name.to_ascii_lowercase().as_str() {
"supabase" => BackendType::Supabase,
"postgrest" => BackendType::Postgrest,
"scylla" => BackendType::Scylla,
"neon" => BackendType::Neon,
"postgresql" | "postgres" => BackendType::PostgreSQL,
_ => BackendType::Native,
}
}
fn query_has_scope(row_id: Option<&str>, conditions: &[Condition]) -> bool {
row_id.is_some() || !conditions.is_empty()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_backend_name_maps_supported_aliases() {
assert_eq!(parse_backend_name("supabase"), BackendType::Supabase);
assert_eq!(parse_backend_name("postgrest"), BackendType::Postgrest);
assert_eq!(parse_backend_name("scylla"), BackendType::Scylla);
assert_eq!(parse_backend_name("neon"), BackendType::Neon);
assert_eq!(parse_backend_name("postgresql"), BackendType::PostgreSQL);
assert_eq!(parse_backend_name("postgres"), BackendType::PostgreSQL);
assert_eq!(parse_backend_name("unknown"), BackendType::Native);
}
#[tokio::test]
async fn new_defaults_to_native_and_sets_client_routing() {
let client: AthenaClient =
AthenaClient::new("http://localhost:4052", "secret", "reporting")
.await
.expect("client should build without network");
assert_eq!(client.config().backend_type, BackendType::Native);
assert_eq!(client.config().client_name.as_deref(), Some("reporting"));
assert_eq!(client.config().connection.url, "http://localhost:4052");
}
#[tokio::test]
async fn new_with_backend_name_resolves_backend_case_insensitive() {
let client: AthenaClient = AthenaClient::new_with_backend_name(
"http://localhost:4052",
"secret",
"reporting",
"NeOn",
)
.await
.expect("client should build without network");
assert_eq!(client.config().backend_type, BackendType::Neon);
}
#[tokio::test]
async fn gateway_routed_clients_use_postgrest_translation_for_crud() {
let client: AthenaClient =
AthenaClient::new("http://localhost:4052", "secret", "reporting")
.await
.expect("client should build without network");
let translated: TranslatedQuery = client
.translate_request(&GatewayRequest::fetch("users"))
.expect("translation should succeed");
assert_eq!(translated.language, QueryLanguage::Postgrest);
}
#[tokio::test]
async fn gateway_routes_expose_expected_endpoints() {
let client: AthenaClient =
AthenaClient::new("http://localhost:4052", "secret", "reporting")
.await
.expect("client should build without network");
let routes: GatewayRoutes = client.gateway_routes();
assert_eq!(routes.fetch, "http://localhost:4052/gateway/fetch");
assert_eq!(routes.insert, "http://localhost:4052/gateway/insert");
assert_eq!(routes.update, "http://localhost:4052/gateway/update");
assert_eq!(routes.delete, "http://localhost:4052/gateway/delete");
assert_eq!(routes.query, "http://localhost:4052/gateway/query");
assert_eq!(routes.sql, "http://localhost:4052/gateway/sql");
}
#[test]
fn query_has_scope_requires_row_id_or_conditions() {
assert!(!query_has_scope(None, &[]));
assert!(query_has_scope(Some("row_1"), &[]));
assert!(query_has_scope(
None,
&[Condition::new(
"status",
ConditionOperator::Eq,
vec![serde_json::json!("active")]
)]
));
}
}