use crate::client::AthenaClient;
use crate::client::backend::{BackendError, QueryResult};
use crate::client::gateway_api::{
GatewayDeleteRequest, GatewayFetchRequest, GatewayInsertRequest, GatewayRpcFilterOperator,
GatewayRpcRequest, GatewayUpdateRequest,
};
use serde_json::Value;
#[derive(Debug, Clone, Copy)]
pub enum OrderDirection {
Asc,
Desc,
}
#[derive(Debug, Clone)]
pub struct Condition {
pub column: String,
pub operator: ConditionOperator,
pub values: Vec<Value>,
}
#[derive(Debug, Clone, Copy)]
pub enum ConditionOperator {
Eq,
Neq,
Gt,
Lt,
In,
}
impl Condition {
pub fn new(column: impl Into<String>, operator: ConditionOperator, values: Vec<Value>) -> Self {
Self {
column: column.into(),
operator,
values,
}
}
}
pub struct SelectBuilder<'a> {
client: &'a AthenaClient,
table: String,
columns: Vec<String>,
raw_select: Option<String>,
conditions: Vec<Condition>,
order_by: Vec<(String, OrderDirection)>,
limit: Option<usize>,
offset: Option<usize>,
}
impl<'a> SelectBuilder<'a> {
pub(crate) fn new(client: &'a AthenaClient, table: impl Into<String>) -> Self {
Self {
client,
table: table.into(),
columns: Vec::new(),
raw_select: None,
conditions: Vec::new(),
order_by: Vec::new(),
limit: None,
offset: None,
}
}
pub fn columns(mut self, columns: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.columns = columns.into_iter().map(|c| c.into()).collect();
self
}
pub fn raw_select(mut self, select: impl Into<String>) -> Self {
self.raw_select = Some(select.into());
self
}
fn add_condition(
mut self,
column: &str,
operator: ConditionOperator,
values: Vec<Value>,
) -> Self {
self.conditions
.push(Condition::new(column, operator, values));
self
}
pub fn where_condition(mut self, condition: Condition) -> Self {
self.conditions.push(condition);
self
}
pub fn where_conditions(mut self, conditions: impl IntoIterator<Item = Condition>) -> Self {
self.conditions.extend(conditions);
self
}
pub fn where_eq(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Eq, vec![value.into()])
}
pub fn where_neq(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Neq, vec![value.into()])
}
pub fn where_gt(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Gt, vec![value.into()])
}
pub fn where_lt(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Lt, vec![value.into()])
}
pub fn where_in(self, column: &str, values: Vec<Value>) -> Self {
self.add_condition(column, ConditionOperator::In, values)
}
pub fn order_by(mut self, column: &str, direction: OrderDirection) -> Self {
self.order_by.push((column.to_string(), direction));
self
}
pub fn limit(mut self, value: usize) -> Self {
self.limit = Some(value);
self
}
pub fn offset(mut self, value: usize) -> Self {
self.offset = Some(value);
self
}
pub async fn execute(self) -> Result<QueryResult, BackendError> {
self.client.execute_select(self).await
}
pub(crate) fn into_request(self) -> GatewayFetchRequest {
GatewayFetchRequest {
table: self.table,
columns: self.columns,
raw_select: self.raw_select,
conditions: self.conditions,
order_by: self.order_by,
limit: self.limit,
offset: self.offset,
}
}
}
pub struct InsertBuilder<'a> {
client: &'a AthenaClient,
table: String,
payload: Value,
}
impl<'a> InsertBuilder<'a> {
pub(crate) fn new(client: &'a AthenaClient, table: impl Into<String>) -> Self {
Self {
client,
table: table.into(),
payload: Value::Null,
}
}
pub fn payload(mut self, payload: Value) -> Self {
self.payload = payload;
self
}
pub async fn execute(self) -> Result<QueryResult, BackendError> {
self.client.execute_insert(self).await
}
pub(crate) fn into_request(self) -> GatewayInsertRequest {
GatewayInsertRequest::new(self.table, self.payload)
}
}
pub struct UpdateBuilder<'a> {
client: &'a AthenaClient,
table: String,
row_id: Option<String>,
conditions: Vec<Condition>,
payload: Value,
allow_unfiltered: bool,
}
impl<'a> UpdateBuilder<'a> {
pub(crate) fn new(
client: &'a AthenaClient,
table: impl Into<String>,
row_id: Option<String>,
) -> Self {
Self {
client,
table: table.into(),
row_id,
conditions: Vec::new(),
payload: Value::Null,
allow_unfiltered: false,
}
}
pub fn row_id(mut self, row_id: impl Into<String>) -> Self {
self.row_id = Some(row_id.into());
self
}
fn add_condition(
mut self,
column: &str,
operator: ConditionOperator,
values: Vec<Value>,
) -> Self {
self.conditions
.push(Condition::new(column, operator, values));
self
}
pub fn where_condition(mut self, condition: Condition) -> Self {
self.conditions.push(condition);
self
}
pub fn where_conditions(mut self, conditions: impl IntoIterator<Item = Condition>) -> Self {
self.conditions.extend(conditions);
self
}
pub fn where_eq(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Eq, vec![value.into()])
}
pub fn where_neq(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Neq, vec![value.into()])
}
pub fn where_gt(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Gt, vec![value.into()])
}
pub fn where_lt(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Lt, vec![value.into()])
}
pub fn where_in(self, column: &str, values: Vec<Value>) -> Self {
self.add_condition(column, ConditionOperator::In, values)
}
pub fn payload(mut self, payload: Value) -> Self {
self.payload = payload;
self
}
pub fn unsafe_unfiltered(mut self) -> Self {
self.allow_unfiltered = true;
self
}
pub async fn execute(self) -> Result<QueryResult, BackendError> {
self.client.execute_update(self).await
}
pub(crate) fn into_request(self) -> GatewayUpdateRequest {
let mut request = GatewayUpdateRequest::new(self.table, self.payload);
if let Some(row_id) = self.row_id {
request = request.row_id(row_id);
}
request = request.where_conditions(self.conditions);
if self.allow_unfiltered {
request = request.unsafe_unfiltered();
}
request
}
}
pub struct DeleteBuilder<'a> {
client: &'a AthenaClient,
table: String,
row_id: Option<String>,
conditions: Vec<Condition>,
allow_unfiltered: bool,
}
impl<'a> DeleteBuilder<'a> {
pub(crate) fn new(
client: &'a AthenaClient,
table: impl Into<String>,
row_id: Option<String>,
) -> Self {
Self {
client,
table: table.into(),
row_id,
conditions: Vec::new(),
allow_unfiltered: false,
}
}
pub fn row_id(mut self, row_id: impl Into<String>) -> Self {
self.row_id = Some(row_id.into());
self
}
fn add_condition(
mut self,
column: &str,
operator: ConditionOperator,
values: Vec<Value>,
) -> Self {
self.conditions
.push(Condition::new(column, operator, values));
self
}
pub fn where_condition(mut self, condition: Condition) -> Self {
self.conditions.push(condition);
self
}
pub fn where_conditions(mut self, conditions: impl IntoIterator<Item = Condition>) -> Self {
self.conditions.extend(conditions);
self
}
pub fn where_eq(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Eq, vec![value.into()])
}
pub fn where_neq(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Neq, vec![value.into()])
}
pub fn where_gt(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Gt, vec![value.into()])
}
pub fn where_lt(self, column: &str, value: impl Into<Value>) -> Self {
self.add_condition(column, ConditionOperator::Lt, vec![value.into()])
}
pub fn where_in(self, column: &str, values: Vec<Value>) -> Self {
self.add_condition(column, ConditionOperator::In, values)
}
pub fn unsafe_unfiltered(mut self) -> Self {
self.allow_unfiltered = true;
self
}
pub async fn execute(self) -> Result<QueryResult, BackendError> {
self.client.execute_delete(self).await
}
pub(crate) fn into_request(self) -> GatewayDeleteRequest {
GatewayDeleteRequest {
table: self.table,
row_id: self.row_id,
conditions: self.conditions,
allow_unfiltered: self.allow_unfiltered,
}
}
}
pub struct RpcBuilder<'a> {
client: &'a AthenaClient,
request: GatewayRpcRequest,
}
impl<'a> RpcBuilder<'a> {
pub(crate) fn new(client: &'a AthenaClient, function: impl Into<String>, args: Value) -> Self {
Self {
client,
request: GatewayRpcRequest::new(function, args),
}
}
pub fn schema(mut self, schema: impl Into<String>) -> Self {
self.request = self.request.schema(schema);
self
}
pub fn select(mut self, select: impl Into<String>) -> Self {
self.request = self.request.select(select);
self
}
fn add_filter(
mut self,
column: impl Into<String>,
operator: GatewayRpcFilterOperator,
value: Value,
) -> Self {
self.request = self.request.filter(column, operator, value);
self
}
pub fn eq(self, column: impl Into<String>, value: impl Into<Value>) -> Self {
self.add_filter(column, GatewayRpcFilterOperator::Eq, value.into())
}
pub fn neq(self, column: impl Into<String>, value: impl Into<Value>) -> Self {
self.add_filter(column, GatewayRpcFilterOperator::Neq, value.into())
}
pub fn gt(self, column: impl Into<String>, value: impl Into<Value>) -> Self {
self.add_filter(column, GatewayRpcFilterOperator::Gt, value.into())
}
pub fn gte(self, column: impl Into<String>, value: impl Into<Value>) -> Self {
self.add_filter(column, GatewayRpcFilterOperator::Gte, value.into())
}
pub fn lt(self, column: impl Into<String>, value: impl Into<Value>) -> Self {
self.add_filter(column, GatewayRpcFilterOperator::Lt, value.into())
}
pub fn lte(self, column: impl Into<String>, value: impl Into<Value>) -> Self {
self.add_filter(column, GatewayRpcFilterOperator::Lte, value.into())
}
pub fn in_(self, column: impl Into<String>, values: Vec<Value>) -> Self {
self.add_filter(column, GatewayRpcFilterOperator::In, Value::Array(values))
}
pub fn like(self, column: impl Into<String>, value: impl Into<Value>) -> Self {
self.add_filter(column, GatewayRpcFilterOperator::Like, value.into())
}
pub fn ilike(self, column: impl Into<String>, value: impl Into<Value>) -> Self {
self.add_filter(column, GatewayRpcFilterOperator::ILike, value.into())
}
pub fn is_null(self, column: impl Into<String>) -> Self {
self.add_filter(column, GatewayRpcFilterOperator::Is, Value::Null)
}
pub fn order(mut self, column: impl Into<String>, direction: OrderDirection) -> Self {
self.request = self.request.order_by(column, direction);
self
}
pub fn limit(mut self, value: usize) -> Self {
self.request = self.request.limit(value);
self
}
pub fn offset(mut self, value: usize) -> Self {
self.request = self.request.offset(value);
self
}
pub fn count_exact(mut self) -> Self {
self.request = self.request.count_exact();
self
}
pub async fn execute(self) -> Result<QueryResult, BackendError> {
self.client.execute_rpc(self).await
}
pub(crate) fn into_request(self) -> GatewayRpcRequest {
self.request
}
}