use std::sync::Arc;
use rorm_declaration::config::DatabaseDriver;
use rorm_sql::conditional;
use rorm_sql::delete::Delete;
use rorm_sql::insert::Insert;
use rorm_sql::join_table::JoinTableData;
use rorm_sql::ordering::OrderByEntry;
#[cfg(feature = "postgres-only")]
use rorm_sql::select::LockingClause;
use rorm_sql::select_column::SelectColumnData;
use rorm_sql::update::Update;
use rorm_sql::value::Value;
use tracing::warn;
use crate::error::Error;
use crate::executor::AffectedRows;
use crate::executor::All;
use crate::executor::Executor;
use crate::executor::Nothing;
use crate::executor::One;
use crate::executor::QueryStrategy;
use crate::internal::any::AnyPool;
use crate::query_type::GetLimitClause;
use crate::row::Row;
use crate::transaction::Transaction;
use crate::transaction::TransactionError;
pub type ColumnSelector<'a> = SelectColumnData<'a>;
pub type JoinTable<'until_build, 'post_build> = JoinTableData<'until_build, 'post_build>;
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct DatabaseConfiguration {
pub driver: DatabaseDriver,
pub min_connections: u32,
pub max_connections: u32,
}
impl DatabaseConfiguration {
pub fn new(driver: DatabaseDriver) -> Self {
DatabaseConfiguration {
driver,
min_connections: 1,
max_connections: 10,
}
}
}
#[derive(Clone)]
pub struct Database(pub(crate) AnyPool, Arc<()>);
impl Database {
pub async fn connect(configuration: DatabaseConfiguration) -> Result<Self, Error> {
Ok(Self(AnyPool::connect(configuration).await?, Arc::new(())))
}
pub async fn start_transaction(&self) -> Result<Transaction, Error> {
Ok(Transaction::new(self.0.begin().await?))
}
pub async fn close(self) {
self.0.close().await;
}
}
impl Drop for Database {
fn drop(&mut self) {
if Arc::strong_count(&self.1) == 1 && !self.0.is_closed() {
warn!("Database has been dropped without calling close. This might cause the last queries to not be flushed properly");
}
}
}
#[allow(clippy::too_many_arguments)] pub fn query<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy + GetLimitClause>(
executor: impl Executor<'db>,
model: &str,
columns: &[ColumnSelector<'_>],
joins: &[JoinTable<'_, 'post_query>],
conditions: Option<&conditional::Condition<'post_query>>,
order_by_clause: &[OrderByEntry<'_>],
limit: Option<Q::LimitOrOffset>,
distinct: bool,
#[cfg(feature = "postgres-only")] locking_clause: Option<LockingClause>,
) -> Q::Result<'result> {
let columns: Vec<_> = columns
.iter()
.map(|c| {
executor.dialect().select_column(
c.table_name,
c.column_name,
c.select_alias,
c.aggregation,
)
})
.collect();
let joins: Vec<_> = joins
.iter()
.map(|j| {
executor.dialect().join_table(
j.join_type,
j.table_name,
j.join_alias,
j.join_condition.clone(),
)
})
.collect();
let mut q = executor
.dialect()
.select(&columns, model, &joins, order_by_clause);
if let Some(condition) = conditions {
q = q.where_clause(condition);
}
if let Some(limit) = Q::get_limit_clause(limit) {
q = q.limit_clause(limit);
}
if distinct {
q = q.distinct();
}
#[cfg(feature = "postgres-only")]
if let Some(x) = locking_clause {
q = q.locking_clause(x);
}
let (query_string, bind_params) = q.build();
executor.execute::<Q>(query_string, bind_params)
}
pub async fn insert_returning(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
values: &[Value<'_>],
returning: &[&str],
) -> Result<Row, Error> {
generic_insert::<One>(executor, model, columns, values, Some(returning)).await
}
pub async fn insert(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
values: &[Value<'_>],
) -> Result<(), Error> {
generic_insert::<Nothing>(executor, model, columns, values, None).await
}
pub(crate) fn generic_insert<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy>(
executor: impl Executor<'db>,
model: &str,
columns: &[&str],
values: &[Value<'post_query>],
returning: Option<&[&str]>,
) -> Q::Result<'result> {
let values = &[values];
let q = executor.dialect().insert(model, columns, values, returning);
let (query_string, bind_params): (_, Vec<Value<'post_query>>) = q.build();
executor.execute::<Q>(query_string, bind_params)
}
pub async fn insert_bulk(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
rows: &[&[Value<'_>]],
) -> Result<(), Error> {
let mut guard = executor.ensure_transaction().await?;
let tr: &mut Transaction = guard.get_transaction();
for chunk in rows.chunks(25) {
let mut insert = tr.dialect().insert(model, columns, chunk, None);
insert = insert.rollback_transaction();
let (insert_query, insert_params) = insert.build();
tr.execute::<Nothing>(insert_query, insert_params).await?;
}
guard.commit().await.map_err(|x| match x {
TransactionError::Database(x) => x,
TransactionError::Hook(_) => {
unreachable!("Potentially create transaction does not use hooks")
}
})?;
Ok(())
}
pub async fn insert_bulk_returning(
executor: impl Executor<'_>,
model: &str,
columns: &[&str],
rows: &[&[Value<'_>]],
returning: &[&str],
) -> Result<Vec<Row>, Error> {
let mut guard = executor.ensure_transaction().await?;
let tr: &mut Transaction = guard.get_transaction();
let mut inserted = Vec::with_capacity(rows.len());
for chunk in rows.chunks(25) {
let mut insert = tr.dialect().insert(model, columns, chunk, Some(returning));
insert = insert.rollback_transaction();
let (insert_query, insert_params) = insert.build();
inserted.extend(tr.execute::<All>(insert_query, insert_params).await?);
}
guard.commit().await.map_err(|x| match x {
TransactionError::Database(x) => x,
TransactionError::Hook(_) => {
unreachable!("Potentially create transaction does not use hooks")
}
})?;
Ok(inserted)
}
pub async fn delete<'post_build>(
executor: impl Executor<'_>,
model: &str,
condition: Option<&conditional::Condition<'post_build>>,
) -> Result<u64, Error> {
let mut q = executor.dialect().delete(model);
if let Some(condition) = condition {
q = q.where_clause(condition);
}
let (query_string, bind_params) = q.build();
executor
.execute::<AffectedRows>(query_string, bind_params)
.await
}
pub async fn update<'post_build>(
executor: impl Executor<'_>,
model: &str,
updates: &[(&str, Value<'post_build>)],
condition: Option<&conditional::Condition<'post_build>>,
) -> Result<u64, Error> {
let mut stmt = executor.dialect().update(model);
for (column, value) in updates {
stmt = stmt.add_update(column, *value);
}
if let Some(cond) = condition {
stmt = stmt.where_clause(cond);
}
let (query_string, bind_params) = stmt.build()?;
executor
.execute::<AffectedRows>(query_string, bind_params)
.await
}