rust_query/transaction.rs
1use std::{
2 cell::RefCell, convert::Infallible, iter::zip, marker::PhantomData, sync::atomic::AtomicI64,
3};
4
5use rusqlite::ErrorCode;
6use sea_query::{
7 Alias, CommonTableExpression, DeleteStatement, Expr, ExprTrait, InsertStatement, IntoTableRef,
8 SelectStatement, SqliteQueryBuilder, UpdateStatement, WithClause,
9};
10use sea_query_rusqlite::RusqliteBinder;
11use self_cell::{MutBorrow, self_cell};
12
13use crate::{
14 IntoExpr, IntoSelect, Table, TableRow,
15 joinable::DynJoinable,
16 migrate::{Schema, check_schema, schema_version, user_version},
17 migration::Config,
18 pool::Pool,
19 private::{Joinable, Reader},
20 query::{OwnedRows, Query, track_stmt},
21 rows::Rows,
22 value::{DynTypedExpr, MyTyp, SecretFromSql, ValueBuilder},
23 writable::TableInsert,
24};
25
26/// [Database] is a proof that the database has been configured.
27///
28/// Creating a [Database] requires going through the steps to migrate an existing database to
29/// the required schema, or creating a new database from scratch (See also [crate::migration::Config]).
30/// Please see [Database::migrator] to get started.
31///
32/// Having done the setup to create a compatible database is sadly not a guarantee that the
33/// database will stay compatible for the lifetime of the [Database] struct.
34/// That is why [Database] also stores the `schema_version`. This allows detecting non-malicious
35/// modifications to the schema and gives us the ability to panic when this is detected.
36/// Such non-malicious modification of the schema can happen for example if another [Database]
37/// instance is created with additional migrations (e.g. by another newer instance of your program).
38pub struct Database<S> {
39 pub(crate) manager: Pool,
40 pub(crate) schema_version: AtomicI64,
41 pub(crate) schema: PhantomData<S>,
42 pub(crate) mut_lock: parking_lot::FairMutex<()>,
43}
44
45impl<S: Schema> Database<S> {
46 /// This is a quick way to open a database if you don't care about migration.
47 ///
48 /// Note that this will panic if the schema version doesn't match or when the schema
49 /// itself doesn't match the expected schema.
50 pub fn new(config: Config) -> Self {
51 let Some(m) = Self::migrator(config) else {
52 panic!("schema version {}, but got an older version", S::VERSION)
53 };
54 let Some(m) = m.finish() else {
55 panic!("schema version {}, but got a new version", S::VERSION)
56 };
57 m
58 }
59}
60
61use rusqlite::Connection;
62type RTransaction<'x> = Option<rusqlite::Transaction<'x>>;
63
64self_cell!(
65 pub struct OwnedTransaction {
66 owner: MutBorrow<Connection>,
67
68 #[covariant]
69 dependent: RTransaction,
70 }
71);
72
73/// SAFETY:
74/// `RTransaction: !Send` because it borrows from `Connection` and `Connection: !Sync`.
75/// `OwnedTransaction` can be `Send` because we know that `dependent` is the only
76/// borrow of `owner` and `OwnedTransaction: !Sync` so `dependent` can not be borrowed
77/// from multiple threads.
78unsafe impl Send for OwnedTransaction {}
79assert_not_impl_any! {OwnedTransaction: Sync}
80
81thread_local! {
82 pub(crate) static TXN: RefCell<Option<TransactionWithRows>> = const { RefCell::new(None) };
83}
84
85impl OwnedTransaction {
86 pub(crate) fn get(&self) -> &rusqlite::Transaction<'_> {
87 self.borrow_dependent().as_ref().unwrap()
88 }
89
90 pub(crate) fn with(
91 mut self,
92 f: impl FnOnce(rusqlite::Transaction<'_>),
93 ) -> rusqlite::Connection {
94 self.with_dependent_mut(|_, b| f(b.take().unwrap()));
95 self.into_owner().into_inner()
96 }
97}
98
99type OwnedRowsVec<'x> = slab::Slab<OwnedRows<'x>>;
100self_cell!(
101 pub struct TransactionWithRows {
102 owner: OwnedTransaction,
103
104 #[not_covariant]
105 dependent: OwnedRowsVec,
106 }
107);
108
109impl TransactionWithRows {
110 pub(crate) fn new_empty(txn: OwnedTransaction) -> Self {
111 Self::new(txn, |_| slab::Slab::new())
112 }
113
114 pub(crate) fn get(&self) -> &rusqlite::Transaction<'_> {
115 self.borrow_owner().get()
116 }
117}
118
119impl<S: Send + Sync + Schema> Database<S> {
120 /// Create a [Transaction]. Creating the transaction will not block by default.
121 ///
122 /// This function will panic if the schema was modified compared to when the [Database] value
123 /// was created. This can happen for example by running another instance of your program with
124 /// additional migrations.
125 ///
126 /// Note that many systems have a limit on the number of file descriptors that can
127 /// exist in a single process. On my machine the soft limit is (1024) by default.
128 /// If this limit is reached, it may cause a panic in this method.
129 pub fn transaction<R: Send>(&self, f: impl Send + FnOnce(&'static Transaction<S>) -> R) -> R {
130 let res = std::thread::scope(|scope| scope.spawn(|| self.transaction_local(f)).join());
131 match res {
132 Ok(val) => val,
133 Err(payload) => std::panic::resume_unwind(payload),
134 }
135 }
136
137 /// Same as [Self::transaction], but can only be used on a new thread.
138 pub(crate) fn transaction_local<R>(&self, f: impl FnOnce(&'static Transaction<S>) -> R) -> R {
139 let conn = self.manager.pop();
140
141 let owned = OwnedTransaction::new(MutBorrow::new(conn), |conn| {
142 Some(conn.borrow_mut().transaction().unwrap())
143 });
144
145 let res = f(Transaction::new_checked(owned, &self.schema_version));
146
147 let owned = TXN.take().unwrap().into_owner();
148 self.manager.push(owned.into_owner().into_inner());
149
150 res
151 }
152
153 /// Create a mutable [Transaction].
154 /// This operation needs to wait for all other mutable [Transaction]s for this database to be finished.
155 /// There is currently no timeout on this operation, so it will wait indefinitly if required.
156 ///
157 /// Whether the transaction is commited depends on the result of the closure.
158 /// The transaction is only commited if the closure return [Ok]. In the case that it returns [Err]
159 /// or when the closure panics, a rollback is performed.
160 ///
161 /// This function will panic if the schema was modified compared to when the [Database] value
162 /// was created. This can happen for example by running another instance of your program with
163 /// additional migrations.
164 ///
165 /// Note that many systems have a limit on the number of file descriptors that can
166 /// exist in a single process. On my machine the soft limit is (1024) by default.
167 /// If this limit is reached, it may cause a panic in this method.
168 pub fn transaction_mut<O: Send, E: Send>(
169 &self,
170 f: impl Send + FnOnce(&'static mut Transaction<S>) -> Result<O, E>,
171 ) -> Result<O, E> {
172 let join_res =
173 std::thread::scope(|scope| scope.spawn(|| self.transaction_mut_local(f)).join());
174
175 match join_res {
176 Ok(val) => val,
177 Err(payload) => std::panic::resume_unwind(payload),
178 }
179 }
180
181 pub(crate) fn transaction_mut_local<O, E>(
182 &self,
183 f: impl FnOnce(&'static mut Transaction<S>) -> Result<O, E>,
184 ) -> Result<O, E> {
185 // Acquire the lock before creating the connection.
186 // Technically we can acquire the lock later, but we don't want to waste
187 // file descriptors on transactions that need to wait anyway.
188 let guard = self.mut_lock.lock();
189
190 let conn = self.manager.pop();
191
192 let owned = OwnedTransaction::new(MutBorrow::new(conn), |conn| {
193 let txn = conn
194 .borrow_mut()
195 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
196 .unwrap();
197 Some(txn)
198 });
199 // if this panics then the transaction is rolled back and the guard is dropped.
200 let res = f(Transaction::new_checked(owned, &self.schema_version));
201
202 // Drop the guard before commiting to let sqlite go to the next transaction
203 // more quickly while guaranteeing that the database will unlock soon.
204 drop(guard);
205
206 let owned = TXN.take().unwrap().into_owner();
207
208 let conn = if res.is_ok() {
209 owned.with(|x| x.commit().unwrap())
210 } else {
211 owned.with(|x| x.rollback().unwrap())
212 };
213 self.manager.push(conn);
214
215 res
216 }
217
218 /// Same as [Self::transaction_mut], but always commits the transaction.
219 ///
220 /// The only exception is that if the closure panics, a rollback is performed.
221 pub fn transaction_mut_ok<R: Send>(
222 &self,
223 f: impl Send + FnOnce(&'static mut Transaction<S>) -> R,
224 ) -> R {
225 self.transaction_mut(|txn| Ok::<R, Infallible>(f(txn)))
226 .unwrap()
227 }
228
229 /// Create a new [rusqlite::Connection] to the database.
230 ///
231 /// You can do (almost) anything you want with this connection as it is almost completely isolated from all other
232 /// [rust_query] connections. The only thing you should not do here is changing the schema.
233 /// Schema changes are detected with the `schema_version` pragma and will result in a panic when creating a new
234 /// [rust_query] transaction.
235 ///
236 /// The `foreign_keys` pragma is always enabled here, even if [crate::migrate::ForeignKeys::SQLite] is not used.
237 ///
238 /// Note that many systems have a limit on the number of file descriptors that can
239 /// exist in a single process. On my machine the soft limit is (1024) by default.
240 /// If this limit is reached, it may cause a panic in this method.
241 pub fn rusqlite_connection(&self) -> rusqlite::Connection {
242 let conn = self.manager.pop();
243 conn.pragma_update(None, "foreign_keys", "ON").unwrap();
244 conn
245 }
246}
247
248/// [Transaction] can be used to query and update the database.
249///
250/// From the perspective of a [Transaction] each other [Transaction] is fully applied or not at all.
251/// Futhermore, the effects of [Transaction]s have a global order.
252/// So if we have mutations `A` and then `B`, it is impossible for a [Transaction] to see the effect of `B` without seeing the effect of `A`.
253pub struct Transaction<S> {
254 pub(crate) _p2: PhantomData<S>,
255 pub(crate) _local: PhantomData<*const ()>,
256}
257
258impl<S> Transaction<S> {
259 pub(crate) fn new() -> Self {
260 Self {
261 _p2: PhantomData,
262 _local: PhantomData,
263 }
264 }
265
266 pub(crate) fn new_ref() -> &'static mut Self {
267 // no memory is leaked because Self is zero sized
268 Box::leak(Box::new(Self::new()))
269 }
270}
271
272impl<S: Schema> Transaction<S> {
273 /// This will check the schema version and panic if it is not as expected
274 pub(crate) fn new_checked(txn: OwnedTransaction, expected: &AtomicI64) -> &'static mut Self {
275 let schema_version = schema_version(txn.get());
276 // If the schema version is not the expected version then we
277 // check if the changes are acceptable.
278 if schema_version != expected.load(std::sync::atomic::Ordering::Relaxed) {
279 if user_version(txn.get()).unwrap() != S::VERSION {
280 panic!("The database user_version changed unexpectedly")
281 }
282
283 TXN.set(Some(TransactionWithRows::new_empty(txn)));
284 check_schema::<S>(Self::new_ref());
285 expected.store(schema_version, std::sync::atomic::Ordering::Relaxed);
286 } else {
287 TXN.set(Some(TransactionWithRows::new_empty(txn)));
288 }
289
290 const {
291 assert!(size_of::<Self>() == 0);
292 }
293 Self::new_ref()
294 }
295}
296
297impl<S> Transaction<S> {
298 /// Execute a query with multiple results.
299 ///
300 /// ```
301 /// # use rust_query::{private::doctest::*};
302 /// # get_txn(|txn| {
303 /// let user_names = txn.query(|rows| {
304 /// let user = rows.join(User);
305 /// rows.into_vec(&user.name)
306 /// });
307 /// assert_eq!(user_names, vec!["Alice".to_owned()]);
308 /// # });
309 /// ```
310 pub fn query<'t, F, R>(&'t self, f: F) -> R
311 where
312 F: for<'inner> FnOnce(&mut Query<'t, 'inner, S>) -> R,
313 {
314 // Execution already happens in a [Transaction].
315 // and thus any [TransactionMut] that it might be borrowed
316 // from is borrowed immutably, which means the rows can not change.
317
318 let q = Rows {
319 phantom: PhantomData,
320 ast: Default::default(),
321 _p: PhantomData,
322 };
323 f(&mut Query {
324 q,
325 phantom: PhantomData,
326 })
327 }
328
329 /// Retrieve a single result from the database.
330 ///
331 /// ```
332 /// # use rust_query::{private::doctest::*, IntoExpr};
333 /// # rust_query::private::doctest::get_txn(|txn| {
334 /// let res = txn.query_one("test".into_expr());
335 /// assert_eq!(res, "test");
336 /// # });
337 /// ```
338 ///
339 /// Instead of using [Self::query_one] in a loop, it is better to
340 /// call [Self::query] and return all results at once.
341 pub fn query_one<O: 'static>(&self, val: impl IntoSelect<'static, S, Out = O>) -> O {
342 self.query(|e| e.into_iter(val.into_select()).next().unwrap())
343 }
344
345 pub fn lazy<'t, T: MyTyp>(&'t self, val: impl IntoExpr<'static, S, Typ = T>) -> T::Lazy<'t> {
346 T::out_to_lazy(self.query_one(val.into_expr()))
347 }
348
349 pub fn lazy_iter<'t, T: Table<Schema = S>>(
350 &'t self,
351 val: impl Joinable<'static, Typ = T>,
352 ) -> LazyIter<'t, T> {
353 let val = DynJoinable::new(val);
354 self.query(|rows| {
355 let table = rows.join(val);
356 LazyIter {
357 txn: self,
358 iter: rows.into_iter(table),
359 }
360 })
361 }
362}
363
364pub struct LazyIter<'t, T: Table> {
365 txn: &'t Transaction<T::Schema>,
366 iter: crate::query::Iter<'t, TableRow<T>>,
367}
368
369impl<'t, T: Table> Iterator for LazyIter<'t, T> {
370 type Item = <T as MyTyp>::Lazy<'t>;
371
372 fn next(&mut self) -> Option<Self::Item> {
373 self.iter.next().map(|x| self.txn.lazy(x))
374 }
375}
376
377impl<S: 'static> Transaction<S> {
378 /// Try inserting a value into the database.
379 ///
380 /// Returns [Ok] with a reference to the new inserted value or an [Err] with conflict information.
381 /// The type of conflict information depends on the number of unique constraints on the table:
382 /// - 0 unique constraints => [Infallible]
383 /// - 1 unique constraint => [Expr] reference to the conflicting table row.
384 /// - 2+ unique constraints => `()` no further information is provided.
385 ///
386 /// ```
387 /// # use rust_query::{private::doctest::*, IntoExpr};
388 /// # rust_query::private::doctest::get_txn(|mut txn| {
389 /// let res = txn.insert(User {
390 /// name: "Bob",
391 /// });
392 /// assert!(res.is_ok());
393 /// let res = txn.insert(User {
394 /// name: "Bob",
395 /// });
396 /// assert!(res.is_err(), "there is a unique constraint on the name");
397 /// # });
398 /// ```
399 pub fn insert<T: Table<Schema = S>>(
400 &mut self,
401 val: impl TableInsert<T = T>,
402 ) -> Result<TableRow<T>, T::Conflict> {
403 try_insert_private(T::NAME.into_table_ref(), None, val.into_insert())
404 }
405
406 /// This is a convenience function to make using [Transaction::insert]
407 /// easier for tables without unique constraints.
408 ///
409 /// The new row is added to the table and the row reference is returned.
410 pub fn insert_ok<T: Table<Schema = S, Conflict = Infallible>>(
411 &mut self,
412 val: impl TableInsert<T = T>,
413 ) -> TableRow<T> {
414 let Ok(row) = self.insert(val);
415 row
416 }
417
418 /// This is a convenience function to make using [Transaction::insert]
419 /// easier for tables with exactly one unique constraints.
420 ///
421 /// The new row is inserted and the reference to the row is returned OR
422 /// an existing row is found which conflicts with the new row and a reference
423 /// to the conflicting row is returned.
424 ///
425 /// ```
426 /// # use rust_query::{private::doctest::*, IntoExpr};
427 /// # rust_query::private::doctest::get_txn(|mut txn| {
428 /// let bob = txn.insert(User {
429 /// name: "Bob",
430 /// }).unwrap();
431 /// let bob2 = txn.find_or_insert(User {
432 /// name: "Bob", // this will conflict with the existing row.
433 /// });
434 /// assert_eq!(bob, bob2);
435 /// # });
436 /// ```
437 pub fn find_or_insert<T: Table<Schema = S, Conflict = TableRow<T>>>(
438 &mut self,
439 val: impl TableInsert<T = T>,
440 ) -> TableRow<T> {
441 match self.insert(val) {
442 Ok(row) => row,
443 Err(row) => row,
444 }
445 }
446
447 /// Try updating a row in the database to have new column values.
448 ///
449 /// Updating can fail just like [Transaction::insert] because of unique constraint conflicts.
450 /// This happens when the new values are in conflict with an existing different row.
451 ///
452 /// When the update succeeds, this function returns [Ok], when it fails it returns [Err] with one of
453 /// three conflict types:
454 /// - 0 unique constraints => [Infallible]
455 /// - 1 unique constraint => [Expr] reference to the conflicting table row.
456 /// - 2+ unique constraints => `()` no further information is provided.
457 ///
458 /// ```
459 /// # use rust_query::{private::doctest::*, IntoExpr, Update};
460 /// # rust_query::private::doctest::get_txn(|mut txn| {
461 /// let bob = txn.insert(User {
462 /// name: "Bob",
463 /// }).unwrap();
464 /// txn.update(bob, User {
465 /// name: Update::set("New Bob"),
466 /// }).unwrap();
467 /// # });
468 /// ```
469 pub fn update<T: Table<Schema = S>>(
470 &mut self,
471 row: impl IntoExpr<'static, S, Typ = T>,
472 val: T::Update,
473 ) -> Result<(), T::Conflict> {
474 let mut id = ValueBuilder::default();
475 let row = row.into_expr();
476 let (id, _) = id.simple_one(DynTypedExpr::erase(&row));
477
478 let val = T::apply_try_update(val, row);
479 let mut reader = Reader::default();
480 T::read(&val, &mut reader);
481 let (col_names, col_exprs): (Vec<_>, Vec<_>) = reader.builder.into_iter().collect();
482
483 let (select, col_fields) = ValueBuilder::default().simple(col_exprs);
484 let cte = CommonTableExpression::new()
485 .query(select)
486 .columns(col_fields.clone())
487 .table_name(Alias::new("cte"))
488 .to_owned();
489 let with_clause = WithClause::new().cte(cte).to_owned();
490
491 let mut update = UpdateStatement::new()
492 .table(("main", T::NAME))
493 .cond_where(Expr::col(("main", T::NAME, T::ID)).in_subquery(id))
494 .to_owned();
495
496 for (name, field) in zip(col_names, col_fields) {
497 let select = SelectStatement::new()
498 .from(Alias::new("cte"))
499 .column(field)
500 .to_owned();
501 let value = sea_query::Expr::SubQuery(
502 None,
503 Box::new(sea_query::SubQueryStatement::SelectStatement(select)),
504 );
505 update.value(Alias::new(name), value);
506 }
507
508 let (query, args) = update.with(with_clause).build_rusqlite(SqliteQueryBuilder);
509
510 let res = TXN.with_borrow(|txn| {
511 let txn = txn.as_ref().unwrap().get();
512
513 let mut stmt = txn.prepare_cached(&query).unwrap();
514 stmt.execute(&*args.as_params())
515 });
516
517 match res {
518 Ok(1) => Ok(()),
519 Ok(n) => panic!("unexpected number of updates: {n}"),
520 Err(rusqlite::Error::SqliteFailure(kind, Some(_val)))
521 if kind.code == ErrorCode::ConstraintViolation =>
522 {
523 // val looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
524 Err(T::get_conflict_unchecked(self, &val))
525 }
526 Err(err) => panic!("{err:?}"),
527 }
528 }
529
530 /// This is a convenience function to use [Transaction::update] for updates
531 /// that can not cause unique constraint violations.
532 ///
533 /// This method can be used for all tables, it just does not allow modifying
534 /// columns that are part of unique constraints.
535 pub fn update_ok<T: Table<Schema = S>>(
536 &mut self,
537 row: impl IntoExpr<'static, S, Typ = T>,
538 val: T::UpdateOk,
539 ) {
540 match self.update(row, T::update_into_try_update(val)) {
541 Ok(val) => val,
542 Err(_) => {
543 unreachable!("update can not fail")
544 }
545 }
546 }
547
548 /// Convert the [Transaction] into a [TransactionWeak] to allow deletions.
549 pub fn downgrade(&'static mut self) -> &'static mut TransactionWeak<S> {
550 // TODO: clean this up
551 Box::leak(Box::new(TransactionWeak { inner: PhantomData }))
552 }
553}
554
555/// This is the weak version of [Transaction].
556///
557/// The reason that it is called `weak` is because [TransactionWeak] can not guarantee
558/// that [TableRow]s prove the existence of their particular row.
559///
560/// [TransactionWeak] is useful because it allowes deleting rows.
561pub struct TransactionWeak<S> {
562 inner: PhantomData<Transaction<S>>,
563}
564
565impl<S: Schema> TransactionWeak<S> {
566 /// Try to delete a row from the database.
567 ///
568 /// This will return an [Err] if there is a row that references the row that is being deleted.
569 /// When this method returns [Ok] it will contain a [bool] that is either
570 /// - `true` if the row was just deleted.
571 /// - `false` if the row was deleted previously in this transaction.
572 pub fn delete<T: Table<Schema = S>>(&mut self, val: TableRow<T>) -> Result<bool, T::Referer> {
573 let schema = crate::schema::from_macro::Schema::new::<S>();
574
575 // This is a manual check that foreign key constraints are not violated.
576 // We do this manually because we don't want to enabled foreign key constraints for the whole
577 // transaction (and is not possible to enable for part of a transaction).
578 let mut checks = vec![];
579 for (table_name, table) in &schema.tables {
580 for col in table.columns.iter().filter_map(|(col_name, col)| {
581 let col = &col.def;
582 col.fk
583 .as_ref()
584 .is_some_and(|(t, c)| t == T::NAME && c == T::ID)
585 .then_some(col_name)
586 }) {
587 let stmt = SelectStatement::new()
588 .expr(
589 val.in_subquery(
590 SelectStatement::new()
591 .from(Alias::new(table_name))
592 .column(Alias::new(col))
593 .take(),
594 ),
595 )
596 .take();
597 checks.push(stmt.build_rusqlite(SqliteQueryBuilder));
598 }
599 }
600
601 let stmt = DeleteStatement::new()
602 .from_table(("main", T::NAME))
603 .cond_where(Expr::col(("main", T::NAME, T::ID)).eq(val.inner.idx))
604 .take();
605
606 let (query, args) = stmt.build_rusqlite(SqliteQueryBuilder);
607
608 TXN.with_borrow(|txn| {
609 let txn = txn.as_ref().unwrap().get();
610
611 for (query, args) in checks {
612 let mut stmt = txn.prepare_cached(&query).unwrap();
613 match stmt.query_one(&*args.as_params(), |r| r.get(0)) {
614 Ok(true) => return Err(T::get_referer_unchecked()),
615 Ok(false) => {}
616 Err(err) => panic!("{err:?}"),
617 }
618 }
619
620 let mut stmt = txn.prepare_cached(&query).unwrap();
621 match stmt.execute(&*args.as_params()) {
622 Ok(0) => Ok(false),
623 Ok(1) => Ok(true),
624 Ok(n) => {
625 panic!("unexpected number of deletes {n}")
626 }
627 Err(err) => panic!("{err:?}"),
628 }
629 })
630 }
631
632 /// Delete a row from the database.
633 ///
634 /// This is the infallible version of [TransactionWeak::delete].
635 ///
636 /// To be able to use this method you have to mark the table as `#[no_reference]` in the schema.
637 pub fn delete_ok<T: Table<Referer = Infallible, Schema = S>>(
638 &mut self,
639 val: TableRow<T>,
640 ) -> bool {
641 let Ok(res) = self.delete(val);
642 res
643 }
644
645 /// This allows you to do (almost) anything you want with the internal [rusqlite::Transaction].
646 ///
647 /// Note that there are some things that you should not do with the transaction, such as:
648 /// - Changes to the schema, these will result in a panic as described in [Database].
649 /// - Making changes that violate foreign-key constraints (see below).
650 ///
651 /// Sadly it is not possible to enable (or disable) the `foreign_keys` pragma during a transaction.
652 /// This means that whether this pragma is enabled depends on which [crate::migrate::ForeignKeys]
653 /// option is used and can not be changed.
654 pub fn rusqlite_transaction<R>(&mut self, f: impl FnOnce(&rusqlite::Transaction) -> R) -> R {
655 TXN.with_borrow(|txn| f(txn.as_ref().unwrap().get()))
656 }
657}
658
659pub fn try_insert_private<T: Table>(
660 table: sea_query::TableRef,
661 idx: Option<i64>,
662 val: T::Insert,
663) -> Result<TableRow<T>, T::Conflict> {
664 let mut reader = Reader::default();
665 T::read(&val, &mut reader);
666 if let Some(idx) = idx {
667 reader.col(T::ID, idx);
668 }
669 let (col_names, col_exprs): (Vec<_>, Vec<_>) = reader.builder.into_iter().collect();
670 let is_empty = col_names.is_empty();
671
672 let (select, _) = ValueBuilder::default().simple(col_exprs);
673
674 let mut insert = InsertStatement::new();
675 insert.into_table(table);
676 insert.columns(col_names.into_iter().map(Alias::new));
677 if is_empty {
678 // select always has at least one column, so we leave it out when there are no columns
679 insert.or_default_values();
680 } else {
681 insert.select_from(select).unwrap();
682 }
683 insert.returning_col(T::ID);
684
685 let (sql, values) = insert.build_rusqlite(SqliteQueryBuilder);
686
687 let res = TXN.with_borrow(|txn| {
688 let txn = txn.as_ref().unwrap().get();
689 track_stmt(txn, &sql, &values);
690
691 let mut statement = txn.prepare_cached(&sql).unwrap();
692 let mut res = statement
693 .query_map(&*values.as_params(), |row| {
694 Ok(TableRow::<T>::from_sql(row.get_ref(T::ID)?)?)
695 })
696 .unwrap();
697
698 res.next().unwrap()
699 });
700
701 match res {
702 Ok(id) => Ok(id),
703 Err(rusqlite::Error::SqliteFailure(kind, Some(_val)))
704 if kind.code == ErrorCode::ConstraintViolation =>
705 {
706 // val looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
707 Err(T::get_conflict_unchecked(&Transaction::new(), &val))
708 }
709 Err(err) => panic!("{err:?}"),
710 }
711}