rorm_db/
database.rs

1//! [`Database`] struct and several common operations
2
3use std::sync::Arc;
4
5use rorm_declaration::config::DatabaseDriver;
6use rorm_sql::delete::Delete;
7use rorm_sql::insert::Insert;
8use rorm_sql::join_table::JoinTableData;
9use rorm_sql::ordering::OrderByEntry;
10use rorm_sql::select::Select;
11use rorm_sql::select_column::SelectColumnData;
12use rorm_sql::update::Update;
13use rorm_sql::value::Value;
14use rorm_sql::{conditional, value};
15use tracing::warn;
16
17use crate::error::Error;
18use crate::executor::{AffectedRows, All, Executor, Nothing, One, QueryStrategy};
19use crate::internal;
20use crate::query_type::GetLimitClause;
21use crate::row::Row;
22use crate::transaction::Transaction;
23
24/**
25Type alias for [`SelectColumnData`]..
26
27As all databases use currently the same fields, a type alias is sufficient.
28*/
29pub type ColumnSelector<'a> = SelectColumnData<'a>;
30
31/**
32Type alias for [`JoinTableData`].
33
34As all databases use currently the same fields, a type alias is sufficient.
35*/
36pub type JoinTable<'until_build, 'post_build> = JoinTableData<'until_build, 'post_build>;
37
38/// Configuration use in [`Database::connect`].
39#[derive(Debug)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
41pub struct DatabaseConfiguration {
42    /// The driver and its corresponding settings
43    pub driver: DatabaseDriver,
44
45    /// Minimal connections to initialize upfront.
46    ///
47    /// Must be greater than `0` and can't be larger than `max_connections`.
48    pub min_connections: u32,
49
50    /// Maximum connections that allowed to be created.
51    ///
52    /// Must be greater than `0`.
53    pub max_connections: u32,
54}
55
56impl DatabaseConfiguration {
57    /**
58    Create a new database configuration with some defaults set.
59
60    **Defaults**:
61    - `min_connections`: 1
62    - `max_connections`: 10
63
64    **Parameter**:
65    - `driver`: [`DatabaseDriver`]: Configuration of the database driver.
66    */
67    pub fn new(driver: DatabaseDriver) -> Self {
68        DatabaseConfiguration {
69            driver,
70            min_connections: 1,
71            max_connections: 10,
72        }
73    }
74}
75
76/// Handle to a pool of database connections
77///
78/// Executing sql statements is done through the [`Executor`] trait
79/// which is implemented on `&Database`.
80///
81/// Common operations are implemented as functions in the [`database`](self) module.
82///
83/// Cloning is cheap i.e. two `Arc`s.
84#[derive(Clone)]
85pub struct Database(pub(crate) internal::database::Impl, Arc<()>);
86
87impl Database {
88    /// Connects to the database using `configuration`
89    pub async fn connect(configuration: DatabaseConfiguration) -> Result<Self, Error> {
90        Ok(Self(
91            internal::database::connect(configuration).await?,
92            Arc::new(()),
93        ))
94    }
95
96    /**
97    Execute raw SQL statements on the database.
98
99    If possible, the statement is executed as prepared statement.
100
101    To bind parameter, use ? as placeholder in SQLite and MySQL
102    and $1, $2, $n in Postgres.
103
104    **Parameter**:
105    - `query_string`: Reference to a valid SQL query.
106    - `bind_params`: Optional list of values to bind in the query.
107    - `transaction`: Optional transaction to execute the query on.
108
109    **Returns** a list of rows. If there are no values to retrieve, an empty
110    list is returned.
111     */
112    #[deprecated = "Use `Executor::execute` instead"]
113    pub async fn raw_sql<'a>(
114        &self,
115        query_string: &'a str,
116        bind_params: Option<&[value::Value<'a>]>,
117        transaction: Option<&mut Transaction>,
118    ) -> Result<Vec<Row>, Error> {
119        internal::database::raw_sql(self, query_string, bind_params, transaction).await
120    }
121
122    /// Starts a new transaction
123    ///
124    /// `&mut Transaction` implements [`Executor`] like `&Database` does
125    /// but its database operations can be reverted using [`Transaction::rollback`]
126    /// or simply dropping the transaction without calling [`Transaction::commit`].
127    pub async fn start_transaction(&self) -> Result<Transaction, Error> {
128        internal::database::start_transaction(self).await
129    }
130
131    /// Closes the database connection
132    ///
133    /// While calling this method is not strictly necessary,
134    /// terminating your program without it
135    /// might result in some final queries not being flushed properly.
136    ///
137    /// This method consumes the database handle,
138    /// but actually all handles created using `clone` will become invalid after this call.
139    /// This means any further operation would result in an `Err`
140    pub async fn close(self) {
141        internal::database::close(self).await
142    }
143}
144
145impl Drop for Database {
146    /// Checks whether [`Database::close`] has been called before the last instance is dropped
147    fn drop(&mut self) {
148        // The use of strong_count should be correct:
149        // - the arc is private and we don't create WeakRefs
150        // => when observing a strong_count of 1, there can't be any remaining refs
151        if Arc::strong_count(&self.1) == 1 && !internal::database::is_closed(self) {
152            warn!("Database has been dropped without calling close. This might case the last queries to not being flushed properly");
153        }
154    }
155}
156
157/// Executes a simple `SELECT` query.
158///
159/// It is generic over a [`QueryStrategy`] which specifies how and how many rows to query.
160///
161/// **Parameter**:
162/// - `model`: Model to query.
163/// - `columns`: Columns to retrieve values from.
164/// - `joins`: Join tables expressions.
165/// - `conditions`: Optional conditions to apply.
166/// - `order_by_clause`: Columns to order the rows by.
167/// - `limit`: Optional limit / offset to apply to the query.
168///     Depending on the query strategy, this is either [`LimitClause`](rorm_sql::limit_clause::LimitClause)
169///     (for [`All`] and [`Stream`](crate::executor::Stream))
170///     or a simple [`u64`] (for [`One`] and [`Optional`](crate::executor::Optional)).
171#[allow(clippy::too_many_arguments)]
172pub fn query<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy + GetLimitClause>(
173    executor: impl Executor<'db>,
174    model: &str,
175    columns: &[ColumnSelector<'_>],
176    joins: &[JoinTable<'_, 'post_query>],
177    conditions: Option<&conditional::Condition<'post_query>>,
178    order_by_clause: &[OrderByEntry<'_>],
179    limit: Option<Q::LimitOrOffset>,
180) -> Q::Result<'result> {
181    let columns: Vec<_> = columns
182        .iter()
183        .map(|c| {
184            executor.dialect().select_column(
185                c.table_name,
186                c.column_name,
187                c.select_alias,
188                c.aggregation,
189            )
190        })
191        .collect();
192    let joins: Vec<_> = joins
193        .iter()
194        .map(|j| {
195            executor.dialect().join_table(
196                j.join_type,
197                j.table_name,
198                j.join_alias,
199                j.join_condition.clone(),
200            )
201        })
202        .collect();
203    let mut q = executor
204        .dialect()
205        .select(&columns, model, &joins, order_by_clause);
206
207    if let Some(condition) = conditions {
208        q = q.where_clause(condition);
209    }
210
211    if let Some(limit) = Q::get_limit_clause(limit) {
212        q = q.limit_clause(limit);
213    }
214
215    let (query_string, bind_params) = q.build();
216
217    executor.execute::<Q>(query_string, bind_params)
218}
219
220/// Inserts a single row and returns columns from it.
221///
222/// **Parameter**:
223/// - `model`: Table to insert to
224/// - `columns`: Columns to set `values` for.
225/// - `values`: Values to bind to the corresponding columns.
226/// - `returning`: Columns to query from the inserted row.
227pub async fn insert_returning(
228    executor: impl Executor<'_>,
229    model: &str,
230    columns: &[&str],
231    values: &[Value<'_>],
232    returning: &[&str],
233) -> Result<Row, Error> {
234    generic_insert::<One>(executor, model, columns, values, Some(returning)).await
235}
236
237/// Inserts a single row.
238///
239/// **Parameter**:
240/// - `model`: Table to insert to
241/// - `columns`: Columns to set `values` for.
242/// - `values`: Values to bind to the corresponding columns.
243pub async fn insert(
244    executor: impl Executor<'_>,
245    model: &str,
246    columns: &[&str],
247    values: &[Value<'_>],
248) -> Result<(), Error> {
249    generic_insert::<Nothing>(executor, model, columns, values, None).await
250}
251
252/// Generic implementation of:
253/// - [`Database::insert`]
254/// - [`Database::insert_returning`]
255pub(crate) fn generic_insert<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy>(
256    executor: impl Executor<'db>,
257    model: &str,
258    columns: &[&str],
259    values: &[Value<'post_query>],
260    returning: Option<&[&str]>,
261) -> Q::Result<'result> {
262    let values = &[values];
263    let q = executor.dialect().insert(model, columns, values, returning);
264
265    let (query_string, bind_params): (_, Vec<Value<'post_query>>) = q.build();
266
267    executor.execute::<Q>(query_string, bind_params)
268}
269
270/// This method is used to bulk insert rows.
271///
272/// If one insert statement fails, the complete operation will be rolled back.
273///
274/// **Parameter**:
275/// - `model`: Table to insert to
276/// - `columns`: Columns to set `rows` for.
277/// - `rows`: List of values to bind to the corresponding columns.
278/// - `transaction`: Optional transaction to execute the query on.
279pub async fn insert_bulk(
280    executor: impl Executor<'_>,
281    model: &str,
282    columns: &[&str],
283    rows: &[&[Value<'_>]],
284) -> Result<(), Error> {
285    let mut guard = executor.ensure_transaction().await?;
286    let tr: &mut Transaction = guard.get_transaction();
287
288    for chunk in rows.chunks(25) {
289        let mut insert = tr.dialect().insert(model, columns, chunk, None);
290        insert = insert.rollback_transaction();
291        let (insert_query, insert_params) = insert.build();
292
293        tr.execute::<Nothing>(insert_query, insert_params).await?;
294    }
295
296    guard.commit().await?;
297    Ok(())
298}
299
300/// This method is used to bulk insert rows.
301///
302/// If one insert statement fails, the complete operation will be rolled back.
303///
304/// **Parameter**:
305/// - `model`: Table to insert to
306/// - `columns`: Columns to set `rows` for.
307/// - `rows`: List of values to bind to the corresponding columns.
308/// - `transaction`: Optional transaction to execute the query on.
309pub async fn insert_bulk_returning(
310    executor: impl Executor<'_>,
311    model: &str,
312    columns: &[&str],
313    rows: &[&[Value<'_>]],
314    returning: &[&str],
315) -> Result<Vec<Row>, Error> {
316    let mut guard = executor.ensure_transaction().await?;
317    let tr: &mut Transaction = guard.get_transaction();
318
319    let mut inserted = Vec::with_capacity(rows.len());
320    for chunk in rows.chunks(25) {
321        let mut insert = tr.dialect().insert(model, columns, chunk, Some(returning));
322        insert = insert.rollback_transaction();
323        let (insert_query, insert_params) = insert.build();
324
325        inserted.extend(tr.execute::<All>(insert_query, insert_params).await?);
326    }
327
328    guard.commit().await?;
329
330    Ok(inserted)
331}
332
333/// This method is used to delete rows from a table.
334///
335/// **Parameter**:
336/// - `model`: Name of the model to delete rows from
337/// - `condition`: Optional condition to apply.
338/// - `transaction`: Optional transaction to execute the query on.
339///
340/// **Returns** the rows affected of the delete statement. Note that this also includes
341/// relations, etc.
342pub async fn delete<'post_build>(
343    executor: impl Executor<'_>,
344    model: &str,
345    condition: Option<&conditional::Condition<'post_build>>,
346) -> Result<u64, Error> {
347    let mut q = executor.dialect().delete(model);
348    if condition.is_some() {
349        q = q.where_clause(condition.unwrap());
350    }
351
352    let (query_string, bind_params) = q.build();
353
354    executor
355        .execute::<AffectedRows>(query_string, bind_params)
356        .await
357}
358
359/// This method is used to update rows in a table.
360///
361/// **Parameter**:
362/// - `model`: Name of the model to update rows from
363/// - `updates`: A list of updates. An update is a tuple that consists of a list of columns to
364///     update as well as the value to set to the columns.
365/// - `condition`: Optional condition to apply.
366/// - `transaction`: Optional transaction to execute the query on.
367///
368/// **Returns** the rows affected from the update statement. Note that this also includes
369/// relations, etc.
370pub async fn update<'post_build>(
371    executor: impl Executor<'_>,
372    model: &str,
373    updates: &[(&str, Value<'post_build>)],
374    condition: Option<&conditional::Condition<'post_build>>,
375) -> Result<u64, Error> {
376    let mut stmt = executor.dialect().update(model);
377
378    for (column, value) in updates {
379        stmt = stmt.add_update(column, *value);
380    }
381
382    if let Some(cond) = condition {
383        stmt = stmt.where_clause(cond);
384    }
385
386    let (query_string, bind_params) = stmt.build()?;
387
388    executor
389        .execute::<AffectedRows>(query_string, bind_params)
390        .await
391}