use std::sync::Arc;
use rorm_declaration::config::DatabaseDriver;
use rorm_sql::delete::Delete;
use rorm_sql::insert::Insert;
use rorm_sql::join_table::JoinTableData;
use rorm_sql::ordering::OrderByEntry;
use rorm_sql::select::Select;
use rorm_sql::select_column::SelectColumnData;
use rorm_sql::update::Update;
use rorm_sql::value::Value;
use rorm_sql::{conditional, value};
use tracing::warn;
use crate::error::Error;
use crate::executor::{AffectedRows, All, Executor, Nothing, One, QueryStrategy};
use crate::internal;
use crate::query_type::GetLimitClause;
use crate::row::Row;
use crate::transaction::Transaction;
pub type ColumnSelector<'a> = SelectColumnData<'a>;
pub type JoinTable<'until_build, 'post_build> = JoinTableData<'until_build, 'post_build>;
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct DatabaseConfiguration {
pub driver: DatabaseDriver,
pub min_connections: u32,
pub max_connections: u32,
}
impl DatabaseConfiguration {
pub fn new(driver: DatabaseDriver) -> Self {
DatabaseConfiguration {
driver,
min_connections: 1,
max_connections: 10,
}
}
}
#[derive(Clone)]
pub struct Database(pub(crate) internal::database::Impl, Arc<()>);
impl Database {
pub async fn connect(configuration: DatabaseConfiguration) -> Result<Self, Error> {
Ok(Self(
internal::database::connect(configuration).await?,
Arc::new(()),
))
}
#[deprecated = "Use `Executor::execute` instead"]
pub async fn raw_sql<'a>(
&self,
query_string: &'a str,
bind_params: Option<&[value::Value<'a>]>,
transaction: Option<&mut Transaction>,
) -> Result<Vec<Row>, Error> {
internal::database::raw_sql(self, query_string, bind_params, transaction).await
}
pub async fn start_transaction(&self) -> Result<Transaction, Error> {
internal::database::start_transaction(self).await
}
pub async fn close(self) {
internal::database::close(self).await
}
}
impl Drop for Database {
fn drop(&mut self) {
if Arc::strong_count(&self.1) == 1 && !internal::database::is_closed(self) {
warn!("Database has been dropped without calling close. This might case the last queries to not being flushed properly");
}
}
}
#[allow(clippy::too_many_arguments)]
pub fn query<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy + GetLimitClause>(
executor: impl Executor<'db>,
model: &str,
columns: &[ColumnSelector<'_>],
joins: &[JoinTable<'_, 'post_query>],
conditions: Option<&conditional::Condition<'post_query>>,
order_by_clause: &[OrderByEntry<'_>],
limit: Option<Q::LimitOrOffset>,
) -> Q::Result<'result> {
let columns: Vec<_> = columns
.iter()
.map(|c| {
executor.dialect().select_column(
c.table_name,
c.column_name,
c.select_alias,
c.aggregation,
)
})
.collect();
let joins: Vec<_> = joins
.iter()
.map(|j| {
executor.dialect().join_table(
j.join_type,
j.table_name,
j.join_alias,
j.join_condition.clone(),
)
})
.collect();
let mut q = executor
.dialect()
.select(&columns, model, &joins, order_by_clause);
if let Some(condition) = conditions {
q = q.where_clause(condition);
}
if let Some(limit) = Q::get_limit_clause(limit) {
q = q.limit_clause(limit);
}
let (query_string, bind_params) = q.build();
executor.execute::<Q>(query_string, bind_params)
}
pub async fn insert_returning(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
values: &[Value<'_>],
returning: &[&str],
) -> Result<Row, Error> {
generic_insert::<One>(executor, model, columns, values, Some(returning)).await
}
pub async fn insert(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
values: &[Value<'_>],
) -> Result<(), Error> {
generic_insert::<Nothing>(executor, model, columns, values, None).await
}
pub(crate) fn generic_insert<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy>(
executor: impl Executor<'db>,
model: &str,
columns: &[&str],
values: &[Value<'post_query>],
returning: Option<&[&str]>,
) -> Q::Result<'result> {
let values = &[values];
let q = executor.dialect().insert(model, columns, values, returning);
let (query_string, bind_params): (_, Vec<Value<'post_query>>) = q.build();
executor.execute::<Q>(query_string, bind_params)
}
pub async fn insert_bulk(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
rows: &[&[Value<'_>]],
) -> Result<(), Error> {
let mut guard = executor.ensure_transaction().await?;
let tr: &mut Transaction = guard.get_transaction();
for chunk in rows.chunks(25) {
let mut insert = tr.dialect().insert(model, columns, chunk, None);
insert = insert.rollback_transaction();
let (insert_query, insert_params) = insert.build();
tr.execute::<Nothing>(insert_query, insert_params).await?;
}
guard.commit().await?;
Ok(())
}
pub async fn insert_bulk_returning(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
rows: &[&[Value<'_>]],
returning: &[&str],
) -> Result<Vec<Row>, Error> {
let mut guard = executor.ensure_transaction().await?;
let tr: &mut Transaction = guard.get_transaction();
let mut inserted = Vec::with_capacity(rows.len());
for chunk in rows.chunks(25) {
let mut insert = tr.dialect().insert(model, columns, chunk, Some(returning));
insert = insert.rollback_transaction();
let (insert_query, insert_params) = insert.build();
inserted.extend(tr.execute::<All>(insert_query, insert_params).await?);
}
guard.commit().await?;
Ok(inserted)
}
pub async fn delete<'post_build>(
executor: impl Executor<'_>,
model: &str,
condition: Option<&conditional::Condition<'post_build>>,
) -> Result<u64, Error> {
let mut q = executor.dialect().delete(model);
if let Some(condition) = condition {
q = q.where_clause(condition);
}
let (query_string, bind_params) = q.build();
executor
.execute::<AffectedRows>(query_string, bind_params)
.await
}
pub async fn update<'post_build>(
executor: impl Executor<'_>,
model: &str,
updates: &[(&str, Value<'post_build>)],
condition: Option<&conditional::Condition<'post_build>>,
) -> Result<u64, Error> {
let mut stmt = executor.dialect().update(model);
for (column, value) in updates {
stmt = stmt.add_update(column, *value);
}
if let Some(cond) = condition {
stmt = stmt.where_clause(cond);
}
let (query_string, bind_params) = stmt.build()?;
executor
.execute::<AffectedRows>(query_string, bind_params)
.await
}