use futures::stream::{BoxStream, StreamExt};
use log::{debug, LevelFilter};
use rorm_declaration::config::DatabaseDriver;
use rorm_sql::delete::Delete;
use rorm_sql::insert::Insert;
use rorm_sql::join_table::JoinTableData;
use rorm_sql::limit_clause::LimitClause;
use rorm_sql::ordering::OrderByEntry;
use rorm_sql::select::Select;
use rorm_sql::select_column::SelectColumnData;
use rorm_sql::update::Update;
use rorm_sql::value::Value;
use rorm_sql::{conditional, value, DBImpl};
use crate::error::Error;
use crate::executor::{AffectedRows, All, Executor, Nothing, One, Optional, QueryStrategy, Stream};
use crate::internal;
use crate::query_type::GetLimitClause;
use crate::row::Row;
use crate::transaction::Transaction;
pub type ColumnSelector<'a> = SelectColumnData<'a>;
pub type JoinTable<'until_build, 'post_build> = JoinTableData<'until_build, 'post_build>;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct DatabaseConfiguration {
pub driver: DatabaseDriver,
pub min_connections: u32,
pub max_connections: u32,
pub disable_logging: Option<bool>,
pub statement_log_level: Option<LevelFilter>,
pub slow_statement_log_level: Option<LevelFilter>,
}
impl DatabaseConfiguration {
pub fn new(driver: DatabaseDriver) -> Self {
DatabaseConfiguration {
driver,
min_connections: 1,
max_connections: 10,
disable_logging: None,
statement_log_level: Some(LevelFilter::Debug),
slow_statement_log_level: Some(LevelFilter::Warn),
}
}
}
#[derive(Clone)]
pub struct Database {
pub(crate) pool: internal::database::Impl,
pub(crate) db_impl: DBImpl,
}
impl Database {
pub fn get_sql_dialect(&self) -> DBImpl {
self.db_impl
}
pub async fn connect(configuration: DatabaseConfiguration) -> Result<Self, Error> {
internal::database::connect(configuration).await
}
#[deprecated(
note = "Will be removed to reduce number of methods to maintain\nUse `query::<Stream>` instead!"
)]
#[allow(clippy::too_many_arguments)]
pub fn query_stream<'db, 'post_query, 'stream>(
&'db self,
model: &str,
columns: &[ColumnSelector<'_>],
joins: &[JoinTable<'_, 'post_query>],
conditions: Option<&conditional::Condition<'post_query>>,
order_by_clause: &[OrderByEntry<'_>],
limit: Option<LimitClause>,
transaction: Option<&'stream mut Transaction<'_>>,
) -> BoxStream<'stream, Result<Row, Error>>
where
'post_query: 'stream,
'db: 'stream,
{
Self::query::<Stream>(
self,
model,
columns,
joins,
conditions,
order_by_clause,
limit,
transaction,
)
.boxed()
}
#[deprecated(
note = "Will be removed to reduce number of methods to maintain\nUse `query::<One>` instead!"
)]
#[allow(clippy::too_many_arguments)]
pub async fn query_one(
&self,
model: &str,
columns: &[ColumnSelector<'_>],
joins: &[JoinTable<'_, '_>],
conditions: Option<&conditional::Condition<'_>>,
order_by_clause: &[OrderByEntry<'_>],
offset: Option<u64>,
transaction: Option<&mut Transaction<'_>>,
) -> Result<Row, Error> {
Self::query::<One>(
self,
model,
columns,
joins,
conditions,
order_by_clause,
offset,
transaction,
)
.await
}
#[deprecated(
note = "Will be removed to reduce number of methods to maintain\nUse `query::<Optional>` instead!"
)]
#[allow(clippy::too_many_arguments)]
pub async fn query_optional(
&self,
model: &str,
columns: &[ColumnSelector<'_>],
joins: &[JoinTable<'_, '_>],
conditions: Option<&conditional::Condition<'_>>,
order_by_clause: &[OrderByEntry<'_>],
offset: Option<u64>,
transaction: Option<&mut Transaction<'_>>,
) -> Result<Option<Row>, Error> {
Self::query::<Optional>(
self,
model,
columns,
joins,
conditions,
order_by_clause,
offset,
transaction,
)
.await
}
#[deprecated(
note = "Will be removed to reduce number of methods to maintain\nUse `query::<All>` instead!"
)]
#[allow(clippy::too_many_arguments)]
pub async fn query_all(
&self,
model: &str,
columns: &[ColumnSelector<'_>],
joins: &[JoinTable<'_, '_>],
conditions: Option<&conditional::Condition<'_>>,
order_by_clause: &[OrderByEntry<'_>],
limit: Option<LimitClause>,
transaction: Option<&mut Transaction<'_>>,
) -> Result<Vec<Row>, Error> {
Self::query::<All>(
self,
model,
columns,
joins,
conditions,
order_by_clause,
limit,
transaction,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub fn query<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy + GetLimitClause>(
&'db self,
model: &str,
columns: &[ColumnSelector<'_>],
joins: &[JoinTable<'_, 'post_query>],
conditions: Option<&conditional::Condition<'post_query>>,
order_by_clause: &[OrderByEntry<'_>],
limit: Option<Q::LimitOrOffset>,
transaction: Option<&'db mut Transaction<'_>>,
) -> Q::Result<'result> {
let columns: Vec<_> = columns
.iter()
.map(|c| {
self.db_impl.select_column(
c.table_name,
c.column_name,
c.select_alias,
c.aggregation,
)
})
.collect();
let joins: Vec<_> = joins
.iter()
.map(|j| {
self.db_impl
.join_table(j.join_type, j.table_name, j.join_alias, j.join_condition)
})
.collect();
let mut q = self
.db_impl
.select(&columns, model, &joins, order_by_clause);
if let Some(condition) = conditions {
q = q.where_clause(condition);
}
if let Some(limit) = Q::get_limit_clause(limit) {
q = q.limit_clause(limit);
}
let (query_string, bind_params) = q.build();
debug!("SQL: {}", query_string);
match transaction {
None => self.execute::<Q>(query_string, bind_params),
Some(transaction) => transaction.execute::<Q>(query_string, bind_params),
}
}
pub async fn insert_returning(
&self,
model: &str,
columns: &[&str],
values: &[Value<'_>],
transaction: Option<&mut Transaction<'_>>,
returning: &[&str],
) -> Result<Row, Error> {
Self::generic_insert::<One>(self, model, columns, values, transaction, Some(returning))
.await
}
pub async fn insert(
&self,
model: &str,
columns: &[&str],
values: &[Value<'_>],
transaction: Option<&mut Transaction<'_>>,
) -> Result<(), Error> {
Self::generic_insert::<Nothing>(self, model, columns, values, transaction, None).await
}
pub(crate) fn generic_insert<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy>(
db: &'db Database,
model: &str,
columns: &[&str],
values: &[Value<'post_query>],
transaction: Option<&'db mut Transaction<'_>>,
returning: Option<&[&str]>,
) -> Q::Result<'result> {
let values = &[values];
let q = db.db_impl.insert(model, columns, values, returning);
let (query_string, bind_params): (_, Vec<Value<'post_query>>) = q.build();
debug!("SQL: {}", query_string);
match transaction {
None => db.execute::<Q>(query_string, bind_params),
Some(transaction) => transaction.execute::<Q>(query_string, bind_params),
}
}
pub async fn insert_bulk(
&self,
model: &str,
columns: &[&str],
rows: &[&[Value<'_>]],
transaction: Option<&mut Transaction<'_>>,
) -> Result<(), Error> {
return match transaction {
None => {
let mut transaction = self.start_transaction().await?;
with_transaction(self, &mut transaction, model, columns, rows).await?;
transaction.commit().await
}
Some(transaction) => {
with_transaction(self, transaction, model, columns, rows).await?;
Ok(())
}
};
async fn with_transaction(
db: &Database,
tx: &mut Transaction<'_>,
model: &str,
columns: &[&str],
rows: &[&[Value<'_>]],
) -> Result<(), Error> {
for chunk in rows.chunks(25) {
let mut insert = db.db_impl.insert(model, columns, chunk, None);
insert = insert.rollback_transaction();
let (insert_query, insert_params) = insert.build();
debug!("SQL: {}", insert_query);
tx.execute::<Nothing>(insert_query, insert_params).await?;
}
Ok(())
}
}
pub async fn insert_bulk_returning(
&self,
model: &str,
columns: &[&str],
rows: &[&[Value<'_>]],
transaction: Option<&mut Transaction<'_>>,
returning: &[&str],
) -> Result<Vec<Row>, Error> {
return match transaction {
None => {
let mut transaction = self.start_transaction().await?;
let result =
with_transaction(self, &mut transaction, model, columns, rows, returning).await;
transaction.commit().await?;
result
}
Some(transaction) => {
with_transaction(self, transaction, model, columns, rows, returning).await
}
};
async fn with_transaction(
db: &Database,
tx: &mut Transaction<'_>,
model: &str,
columns: &[&str],
rows: &[&[Value<'_>]],
returning: &[&str],
) -> Result<Vec<Row>, Error> {
let mut inserted = Vec::with_capacity(rows.len());
for chunk in rows.chunks(25) {
let mut insert = db.db_impl.insert(model, columns, chunk, Some(returning));
insert = insert.rollback_transaction();
let (insert_query, insert_params) = insert.build();
debug!("SQL: {}", insert_query);
inserted.extend(tx.execute::<All>(insert_query, insert_params).await?);
}
Ok(inserted)
}
}
pub async fn delete<'post_build>(
&self,
model: &str,
condition: Option<&conditional::Condition<'post_build>>,
transaction: Option<&mut Transaction<'_>>,
) -> Result<u64, Error> {
let mut q = self.db_impl.delete(model);
if condition.is_some() {
q = q.where_clause(condition.unwrap());
}
let (query_string, bind_params) = q.build();
debug!("SQL: {}", query_string);
match transaction {
None => {
self.execute::<AffectedRows>(query_string, bind_params)
.await
}
Some(transaction) => {
transaction
.execute::<AffectedRows>(query_string, bind_params)
.await
}
}
}
pub async fn update<'post_build>(
&self,
model: &str,
updates: &[(&str, Value<'post_build>)],
condition: Option<&conditional::Condition<'post_build>>,
transaction: Option<&mut Transaction<'_>>,
) -> Result<u64, Error> {
let mut stmt = self.db_impl.update(model);
for (column, value) in updates {
stmt = stmt.add_update(column, *value);
}
if let Some(cond) = condition {
stmt = stmt.where_clause(cond);
}
let (query_string, bind_params) = stmt.build()?;
debug!("SQL: {}", query_string);
match transaction {
None => {
self.execute::<AffectedRows>(query_string, bind_params)
.await
}
Some(transaction) => {
transaction
.execute::<AffectedRows>(query_string, bind_params)
.await
}
}
}
pub async fn raw_sql<'a>(
&self,
query_string: &'a str,
bind_params: Option<&[value::Value<'a>]>,
transaction: Option<&mut Transaction<'_>>,
) -> Result<Vec<Row>, Error> {
internal::database::raw_sql(self, query_string, bind_params, transaction).await
}
pub async fn start_transaction(&self) -> Result<Transaction, Error> {
internal::database::start_transaction(self).await
}
}