use log::{debug, LevelFilter};
use rorm_declaration::config::DatabaseDriver;
use rorm_sql::delete::Delete;
use rorm_sql::insert::Insert;
use rorm_sql::join_table::JoinTableData;
use rorm_sql::ordering::OrderByEntry;
use rorm_sql::select::Select;
use rorm_sql::select_column::SelectColumnData;
use rorm_sql::update::Update;
use rorm_sql::value::Value;
use rorm_sql::{conditional, value, DBImpl};
use crate::error::Error;
use crate::executor::{AffectedRows, All, Executor, Nothing, One, QueryStrategy};
use crate::internal;
use crate::query_type::GetLimitClause;
use crate::row::Row;
use crate::transaction::Transaction;
pub type ColumnSelector<'a> = SelectColumnData<'a>;
pub type JoinTable<'until_build, 'post_build> = JoinTableData<'until_build, 'post_build>;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct DatabaseConfiguration {
pub driver: DatabaseDriver,
pub min_connections: u32,
pub max_connections: u32,
pub disable_logging: Option<bool>,
pub statement_log_level: Option<LevelFilter>,
pub slow_statement_log_level: Option<LevelFilter>,
}
impl DatabaseConfiguration {
pub fn new(driver: DatabaseDriver) -> Self {
DatabaseConfiguration {
driver,
min_connections: 1,
max_connections: 10,
disable_logging: None,
statement_log_level: Some(LevelFilter::Debug),
slow_statement_log_level: Some(LevelFilter::Warn),
}
}
}
#[derive(Clone)]
pub struct Database {
pub(crate) pool: internal::database::Impl,
pub(crate) db_impl: DBImpl,
}
impl Database {
pub async fn connect(configuration: DatabaseConfiguration) -> Result<Self, Error> {
internal::database::connect(configuration).await
}
pub async fn raw_sql<'a>(
&self,
query_string: &'a str,
bind_params: Option<&[value::Value<'a>]>,
transaction: Option<&mut Transaction>,
) -> Result<Vec<Row>, Error> {
internal::database::raw_sql(self, query_string, bind_params, transaction).await
}
pub async fn start_transaction(&self) -> Result<Transaction, Error> {
internal::database::start_transaction(self).await
}
}
#[allow(clippy::too_many_arguments)]
pub fn query<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy + GetLimitClause>(
executor: impl Executor<'db>,
model: &str,
columns: &[ColumnSelector<'_>],
joins: &[JoinTable<'_, 'post_query>],
conditions: Option<&conditional::Condition<'post_query>>,
order_by_clause: &[OrderByEntry<'_>],
limit: Option<Q::LimitOrOffset>,
) -> Q::Result<'result> {
let columns: Vec<_> = columns
.iter()
.map(|c| {
executor.dialect().select_column(
c.table_name,
c.column_name,
c.select_alias,
c.aggregation,
)
})
.collect();
let joins: Vec<_> = joins
.iter()
.map(|j| {
executor
.dialect()
.join_table(j.join_type, j.table_name, j.join_alias, j.join_condition)
})
.collect();
let mut q = executor
.dialect()
.select(&columns, model, &joins, order_by_clause);
if let Some(condition) = conditions {
q = q.where_clause(condition);
}
if let Some(limit) = Q::get_limit_clause(limit) {
q = q.limit_clause(limit);
}
let (query_string, bind_params) = q.build();
debug!("SQL: {}", query_string);
executor.execute::<Q>(query_string, bind_params)
}
pub async fn insert_returning(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
values: &[Value<'_>],
returning: &[&str],
) -> Result<Row, Error> {
generic_insert::<One>(executor, model, columns, values, Some(returning)).await
}
pub async fn insert(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
values: &[Value<'_>],
) -> Result<(), Error> {
generic_insert::<Nothing>(executor, model, columns, values, None).await
}
pub(crate) fn generic_insert<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy>(
executor: impl Executor<'db>,
model: &str,
columns: &[&str],
values: &[Value<'post_query>],
returning: Option<&[&str]>,
) -> Q::Result<'result> {
let values = &[values];
let q = executor.dialect().insert(model, columns, values, returning);
let (query_string, bind_params): (_, Vec<Value<'post_query>>) = q.build();
debug!("SQL: {}", query_string);
executor.execute::<Q>(query_string, bind_params)
}
pub async fn insert_bulk(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
rows: &[&[Value<'_>]],
) -> Result<(), Error> {
let mut guard = executor.ensure_transaction().await?;
let tr: &mut Transaction = guard.get_transaction();
for chunk in rows.chunks(25) {
let mut insert = tr.dialect().insert(model, columns, chunk, None);
insert = insert.rollback_transaction();
let (insert_query, insert_params) = insert.build();
debug!("SQL: {}", insert_query);
tr.execute::<Nothing>(insert_query, insert_params).await?;
}
guard.commit().await?;
Ok(())
}
pub async fn insert_bulk_returning(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
rows: &[&[Value<'_>]],
returning: &[&str],
) -> Result<Vec<Row>, Error> {
let mut guard = executor.ensure_transaction().await?;
let tr: &mut Transaction = guard.get_transaction();
let mut inserted = Vec::with_capacity(rows.len());
for chunk in rows.chunks(25) {
let mut insert = tr.dialect().insert(model, columns, chunk, Some(returning));
insert = insert.rollback_transaction();
let (insert_query, insert_params) = insert.build();
debug!("SQL: {}", insert_query);
inserted.extend(tr.execute::<All>(insert_query, insert_params).await?);
}
guard.commit().await?;
Ok(inserted)
}
pub async fn delete<'post_build>(
executor: impl Executor<'_>,
model: &str,
condition: Option<&conditional::Condition<'post_build>>,
) -> Result<u64, Error> {
let mut q = executor.dialect().delete(model);
if condition.is_some() {
q = q.where_clause(condition.unwrap());
}
let (query_string, bind_params) = q.build();
debug!("SQL: {}", query_string);
executor
.execute::<AffectedRows>(query_string, bind_params)
.await
}
pub async fn update<'post_build>(
executor: impl Executor<'_>,
model: &str,
updates: &[(&str, Value<'post_build>)],
condition: Option<&conditional::Condition<'post_build>>,
) -> Result<u64, Error> {
let mut stmt = executor.dialect().update(model);
for (column, value) in updates {
stmt = stmt.add_update(column, *value);
}
if let Some(cond) = condition {
stmt = stmt.where_clause(cond);
}
let (query_string, bind_params) = stmt.build()?;
debug!("SQL: {}", query_string);
executor
.execute::<AffectedRows>(query_string, bind_params)
.await
}
#[cfg(test)]
mod test {
use futures::future::BoxFuture;
use crate::{database, Database, Error, Row};
#[allow(unused)]
fn should_compile(db: &'static Database) {
let fut = database::insert_bulk_returning(db, "", &[], &[], &[]);
let fut: BoxFuture<'_, Result<Vec<Row>, Error>> = Box::pin(fut);
drop(fut);
}
}