1use std::sync::Arc;
4
5use rorm_declaration::config::DatabaseDriver;
6use rorm_sql::delete::Delete;
7use rorm_sql::insert::Insert;
8use rorm_sql::join_table::JoinTableData;
9use rorm_sql::ordering::OrderByEntry;
10use rorm_sql::select::Select;
11use rorm_sql::select_column::SelectColumnData;
12use rorm_sql::update::Update;
13use rorm_sql::value::Value;
14use rorm_sql::{conditional, value};
15use tracing::warn;
16
17use crate::error::Error;
18use crate::executor::{AffectedRows, All, Executor, Nothing, One, QueryStrategy};
19use crate::internal;
20use crate::query_type::GetLimitClause;
21use crate::row::Row;
22use crate::transaction::Transaction;
23
24pub type ColumnSelector<'a> = SelectColumnData<'a>;
30
31pub type JoinTable<'until_build, 'post_build> = JoinTableData<'until_build, 'post_build>;
37
38#[derive(Debug)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
41pub struct DatabaseConfiguration {
42 pub driver: DatabaseDriver,
44
45 pub min_connections: u32,
49
50 pub max_connections: u32,
54}
55
56impl DatabaseConfiguration {
57 pub fn new(driver: DatabaseDriver) -> Self {
68 DatabaseConfiguration {
69 driver,
70 min_connections: 1,
71 max_connections: 10,
72 }
73 }
74}
75
76#[derive(Clone)]
85pub struct Database(pub(crate) internal::database::Impl, Arc<()>);
86
87impl Database {
88 pub async fn connect(configuration: DatabaseConfiguration) -> Result<Self, Error> {
90 Ok(Self(
91 internal::database::connect(configuration).await?,
92 Arc::new(()),
93 ))
94 }
95
96 #[deprecated = "Use `Executor::execute` instead"]
113 pub async fn raw_sql<'a>(
114 &self,
115 query_string: &'a str,
116 bind_params: Option<&[value::Value<'a>]>,
117 transaction: Option<&mut Transaction>,
118 ) -> Result<Vec<Row>, Error> {
119 internal::database::raw_sql(self, query_string, bind_params, transaction).await
120 }
121
122 pub async fn start_transaction(&self) -> Result<Transaction, Error> {
128 internal::database::start_transaction(self).await
129 }
130
131 pub async fn close(self) {
141 internal::database::close(self).await
142 }
143}
144
145impl Drop for Database {
146 fn drop(&mut self) {
148 if Arc::strong_count(&self.1) == 1 && !internal::database::is_closed(self) {
152 warn!("Database has been dropped without calling close. This might case the last queries to not being flushed properly");
153 }
154 }
155}
156
157#[allow(clippy::too_many_arguments)]
172pub fn query<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy + GetLimitClause>(
173 executor: impl Executor<'db>,
174 model: &str,
175 columns: &[ColumnSelector<'_>],
176 joins: &[JoinTable<'_, 'post_query>],
177 conditions: Option<&conditional::Condition<'post_query>>,
178 order_by_clause: &[OrderByEntry<'_>],
179 limit: Option<Q::LimitOrOffset>,
180) -> Q::Result<'result> {
181 let columns: Vec<_> = columns
182 .iter()
183 .map(|c| {
184 executor.dialect().select_column(
185 c.table_name,
186 c.column_name,
187 c.select_alias,
188 c.aggregation,
189 )
190 })
191 .collect();
192 let joins: Vec<_> = joins
193 .iter()
194 .map(|j| {
195 executor.dialect().join_table(
196 j.join_type,
197 j.table_name,
198 j.join_alias,
199 j.join_condition.clone(),
200 )
201 })
202 .collect();
203 let mut q = executor
204 .dialect()
205 .select(&columns, model, &joins, order_by_clause);
206
207 if let Some(condition) = conditions {
208 q = q.where_clause(condition);
209 }
210
211 if let Some(limit) = Q::get_limit_clause(limit) {
212 q = q.limit_clause(limit);
213 }
214
215 let (query_string, bind_params) = q.build();
216
217 executor.execute::<Q>(query_string, bind_params)
218}
219
220pub async fn insert_returning(
228 executor: impl Executor<'_>,
229 model: &str,
230 columns: &[&str],
231 values: &[Value<'_>],
232 returning: &[&str],
233) -> Result<Row, Error> {
234 generic_insert::<One>(executor, model, columns, values, Some(returning)).await
235}
236
237pub async fn insert(
244 executor: impl Executor<'_>,
245 model: &str,
246 columns: &[&str],
247 values: &[Value<'_>],
248) -> Result<(), Error> {
249 generic_insert::<Nothing>(executor, model, columns, values, None).await
250}
251
252pub(crate) fn generic_insert<'result, 'db: 'result, 'post_query: 'result, Q: QueryStrategy>(
256 executor: impl Executor<'db>,
257 model: &str,
258 columns: &[&str],
259 values: &[Value<'post_query>],
260 returning: Option<&[&str]>,
261) -> Q::Result<'result> {
262 let values = &[values];
263 let q = executor.dialect().insert(model, columns, values, returning);
264
265 let (query_string, bind_params): (_, Vec<Value<'post_query>>) = q.build();
266
267 executor.execute::<Q>(query_string, bind_params)
268}
269
270pub async fn insert_bulk(
280 executor: impl Executor<'_>,
281 model: &str,
282 columns: &[&str],
283 rows: &[&[Value<'_>]],
284) -> Result<(), Error> {
285 let mut guard = executor.ensure_transaction().await?;
286 let tr: &mut Transaction = guard.get_transaction();
287
288 for chunk in rows.chunks(25) {
289 let mut insert = tr.dialect().insert(model, columns, chunk, None);
290 insert = insert.rollback_transaction();
291 let (insert_query, insert_params) = insert.build();
292
293 tr.execute::<Nothing>(insert_query, insert_params).await?;
294 }
295
296 guard.commit().await?;
297 Ok(())
298}
299
300pub async fn insert_bulk_returning(
310 executor: impl Executor<'_>,
311 model: &str,
312 columns: &[&str],
313 rows: &[&[Value<'_>]],
314 returning: &[&str],
315) -> Result<Vec<Row>, Error> {
316 let mut guard = executor.ensure_transaction().await?;
317 let tr: &mut Transaction = guard.get_transaction();
318
319 let mut inserted = Vec::with_capacity(rows.len());
320 for chunk in rows.chunks(25) {
321 let mut insert = tr.dialect().insert(model, columns, chunk, Some(returning));
322 insert = insert.rollback_transaction();
323 let (insert_query, insert_params) = insert.build();
324
325 inserted.extend(tr.execute::<All>(insert_query, insert_params).await?);
326 }
327
328 guard.commit().await?;
329
330 Ok(inserted)
331}
332
333pub async fn delete<'post_build>(
343 executor: impl Executor<'_>,
344 model: &str,
345 condition: Option<&conditional::Condition<'post_build>>,
346) -> Result<u64, Error> {
347 let mut q = executor.dialect().delete(model);
348 if condition.is_some() {
349 q = q.where_clause(condition.unwrap());
350 }
351
352 let (query_string, bind_params) = q.build();
353
354 executor
355 .execute::<AffectedRows>(query_string, bind_params)
356 .await
357}
358
359pub async fn update<'post_build>(
371 executor: impl Executor<'_>,
372 model: &str,
373 updates: &[(&str, Value<'post_build>)],
374 condition: Option<&conditional::Condition<'post_build>>,
375) -> Result<u64, Error> {
376 let mut stmt = executor.dialect().update(model);
377
378 for (column, value) in updates {
379 stmt = stmt.add_update(column, *value);
380 }
381
382 if let Some(cond) = condition {
383 stmt = stmt.where_clause(cond);
384 }
385
386 let (query_string, bind_params) = stmt.build()?;
387
388 executor
389 .execute::<AffectedRows>(query_string, bind_params)
390 .await
391}