rust_query/
transaction.rs

1use std::{
2    cell::RefCell, convert::Infallible, iter::zip, marker::PhantomData, sync::atomic::AtomicI64,
3};
4
5use rusqlite::ErrorCode;
6use sea_query::{
7    Alias, CommonTableExpression, DeleteStatement, Expr, ExprTrait, InsertStatement, IntoTableRef,
8    SelectStatement, SqliteQueryBuilder, UpdateStatement, WithClause,
9};
10use sea_query_rusqlite::RusqliteBinder;
11use self_cell::{MutBorrow, self_cell};
12
13use crate::{
14    IntoExpr, IntoSelect, Table, TableRow,
15    joinable::DynJoinable,
16    migrate::{Schema, check_schema, schema_version, user_version},
17    private::{Joinable, Reader},
18    query::{OwnedRows, Query, track_stmt},
19    rows::Rows,
20    value::{DynTypedExpr, MyTyp, SecretFromSql, ValueBuilder},
21    writable::TableInsert,
22};
23
24/// [Database] is a proof that the database has been configured.
25///
26/// Creating a [Database] requires going through the steps to migrate an existing database to
27/// the required schema, or creating a new database from scratch (See also [crate::migration::Config]).
28/// Please see [Database::migrator] to get started.
29///
30/// Having done the setup to create a compatible database is sadly not a guarantee that the
31/// database will stay compatible for the lifetime of the [Database] struct.
32/// That is why [Database] also stores the `schema_version`. This allows detecting non-malicious
33/// modifications to the schema and gives us the ability to panic when this is detected.
34/// Such non-malicious modification of the schema can happen for example if another [Database]
35/// instance is created with additional migrations (e.g. by another newer instance of your program).
36pub struct Database<S> {
37    pub(crate) manager: r2d2_sqlite::SqliteConnectionManager,
38    pub(crate) schema_version: AtomicI64,
39    pub(crate) schema: PhantomData<S>,
40    pub(crate) mut_lock: parking_lot::FairMutex<()>,
41}
42
43use rusqlite::Connection;
44type RTransaction<'x> = Option<rusqlite::Transaction<'x>>;
45
46self_cell!(
47    pub struct OwnedTransaction {
48        owner: MutBorrow<Connection>,
49
50        #[covariant]
51        dependent: RTransaction,
52    }
53);
54
55/// SAFETY:
56/// `RTransaction: !Send` because it borrows from `Connection` and `Connection: !Sync`.
57/// `OwnedTransaction` can be `Send` because we know that `dependent` is the only
58/// borrow of `owner` and `OwnedTransaction: !Sync` so `dependent` can not be borrowed
59/// from multiple threads.
60unsafe impl Send for OwnedTransaction {}
61assert_not_impl_any! {OwnedTransaction: Sync}
62
63thread_local! {
64    pub(crate) static TXN: RefCell<Option<TransactionWithRows>> = const { RefCell::new(None) };
65}
66
67impl OwnedTransaction {
68    pub(crate) fn get(&self) -> &rusqlite::Transaction<'_> {
69        self.borrow_dependent().as_ref().unwrap()
70    }
71
72    pub(crate) fn with(mut self, f: impl FnOnce(rusqlite::Transaction<'_>)) {
73        self.with_dependent_mut(|_, b| f(b.take().unwrap()))
74    }
75}
76
77type OwnedRowsVec<'x> = slab::Slab<OwnedRows<'x>>;
78self_cell!(
79    pub struct TransactionWithRows {
80        owner: OwnedTransaction,
81
82        #[not_covariant]
83        dependent: OwnedRowsVec,
84    }
85);
86
87impl TransactionWithRows {
88    pub(crate) fn new_empty(txn: OwnedTransaction) -> Self {
89        Self::new(txn, |_| slab::Slab::new())
90    }
91
92    pub(crate) fn get(&self) -> &rusqlite::Transaction<'_> {
93        self.borrow_owner().get()
94    }
95}
96
97impl<S: Send + Sync + Schema> Database<S> {
98    /// Create a [Transaction]. Creating the transaction will not block by default.
99    ///
100    /// This function will panic if the schema was modified compared to when the [Database] value
101    /// was created. This can happen for example by running another instance of your program with
102    /// additional migrations.
103    ///
104    /// Note that many systems have a limit on the number of file descriptors that can
105    /// exist in a single process. On my machine the soft limit is (1024) by default.
106    /// If this limit is reached, it may cause a panic in this method.
107    pub fn transaction<R: Send>(&self, f: impl Send + FnOnce(&'static Transaction<S>) -> R) -> R {
108        let res = std::thread::scope(|scope| {
109            scope
110                .spawn(|| {
111                    use r2d2::ManageConnection;
112                    let conn = self.manager.connect().unwrap();
113
114                    let owned = OwnedTransaction::new(MutBorrow::new(conn), |conn| {
115                        Some(conn.borrow_mut().transaction().unwrap())
116                    });
117
118                    f(Transaction::new_checked(owned, &self.schema_version))
119                })
120                .join()
121        });
122        match res {
123            Ok(val) => val,
124            Err(payload) => std::panic::resume_unwind(payload),
125        }
126    }
127
128    /// Create a mutable [Transaction].
129    /// This operation needs to wait for all other mutable [Transaction]s for this database to be finished.
130    /// There is currently no timeout on this operation, so it will wait indefinitly if required.
131    ///
132    /// Whether the transaction is commited depends on the result of the closure.
133    /// The transaction is only commited if the closure return [Ok]. In the case that it returns [Err]
134    /// or when the closure panics, a rollback is performed.
135    ///
136    /// This function will panic if the schema was modified compared to when the [Database] value
137    /// was created. This can happen for example by running another instance of your program with
138    /// additional migrations.
139    ///
140    /// Note that many systems have a limit on the number of file descriptors that can
141    /// exist in a single process. On my machine the soft limit is (1024) by default.
142    /// If this limit is reached, it may cause a panic in this method.
143    pub fn transaction_mut<O: Send, E: Send>(
144        &self,
145        f: impl Send + FnOnce(&'static mut Transaction<S>) -> Result<O, E>,
146    ) -> Result<O, E> {
147        let join_res = std::thread::scope(|scope| {
148            scope
149                .spawn(|| {
150                    // Acquire the lock before creating the connection.
151                    // Technically we can acquire the lock later, but we don't want to waste
152                    // file descriptors on transactions that need to wait anyway.
153                    let guard = self.mut_lock.lock();
154
155                    use r2d2::ManageConnection;
156                    let conn = self.manager.connect().unwrap();
157
158                    let owned = OwnedTransaction::new(MutBorrow::new(conn), |conn| {
159                        let txn = conn
160                            .borrow_mut()
161                            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
162                            .unwrap();
163                        Some(txn)
164                    });
165                    // if this panics then the transaction is rolled back and the guard is dropped.
166                    let res = f(Transaction::new_checked(owned, &self.schema_version));
167
168                    // Drop the guard before commiting to let sqlite go to the next transaction
169                    // more quickly while guaranteeing that the database will unlock soon.
170                    drop(guard);
171
172                    let owned = TXN.take().unwrap().into_owner();
173
174                    if res.is_ok() {
175                        owned.with(|x| x.commit().unwrap());
176                    } else {
177                        owned.with(|x| x.rollback().unwrap());
178                    }
179
180                    res
181                })
182                .join()
183        });
184
185        match join_res {
186            Ok(val) => val,
187            Err(payload) => std::panic::resume_unwind(payload),
188        }
189    }
190
191    /// Same as [Self::transaction_mut], but always commits the transaction.
192    ///
193    /// The only exception is that if the closure panics, a rollback is performed.
194    pub fn transaction_mut_ok<R: Send>(
195        &self,
196        f: impl Send + FnOnce(&'static mut Transaction<S>) -> R,
197    ) -> R {
198        self.transaction_mut(|txn| Ok::<R, Infallible>(f(txn)))
199            .unwrap()
200    }
201
202    /// Create a new [rusqlite::Connection] to the database.
203    ///
204    /// You can do (almost) anything you want with this connection as it is almost completely isolated from all other
205    /// [rust_query] connections. The only thing you should not do here is changing the schema.
206    /// Schema changes are detected with the `schema_version` pragma and will result in a panic when creating a new
207    /// [rust_query] transaction.
208    ///
209    /// The `foreign_keys` pragma is always enabled here, even if [crate::migrate::ForeignKeys::SQLite] is not used.
210    ///
211    /// Note that many systems have a limit on the number of file descriptors that can
212    /// exist in a single process. On my machine the soft limit is (1024) by default.
213    /// If this limit is reached, it may cause a panic in this method.
214    pub fn rusqlite_connection(&self) -> rusqlite::Connection {
215        use r2d2::ManageConnection;
216        let conn = self.manager.connect().unwrap();
217        conn.pragma_update(None, "foreign_keys", "ON").unwrap();
218        conn
219    }
220}
221
222/// [Transaction] can be used to query and update the database.
223///
224/// From the perspective of a [Transaction] each other [Transaction] is fully applied or not at all.
225/// Futhermore, the effects of [Transaction]s have a global order.
226/// So if we have mutations `A` and then `B`, it is impossible for a [Transaction] to see the effect of `B` without seeing the effect of `A`.
227pub struct Transaction<S> {
228    pub(crate) _p2: PhantomData<S>,
229    pub(crate) _local: PhantomData<*const ()>,
230}
231
232impl<S> Transaction<S> {
233    pub(crate) fn new() -> Self {
234        Self {
235            _p2: PhantomData,
236            _local: PhantomData,
237        }
238    }
239
240    pub(crate) fn new_ref() -> &'static mut Self {
241        // no memory is leaked because Self is zero sized
242        Box::leak(Box::new(Self::new()))
243    }
244}
245
246impl<S: Schema> Transaction<S> {
247    /// This will check the schema version and panic if it is not as expected
248    pub(crate) fn new_checked(txn: OwnedTransaction, expected: &AtomicI64) -> &'static mut Self {
249        let schema_version = schema_version(txn.get());
250        // If the schema version is not the expected version then we
251        // check if the changes are acceptable.
252        if schema_version != expected.load(std::sync::atomic::Ordering::Relaxed) {
253            if user_version(txn.get()).unwrap() != S::VERSION {
254                panic!("The database user_version changed unexpectedly")
255            }
256
257            TXN.set(Some(TransactionWithRows::new_empty(txn)));
258            check_schema::<S>(Self::new_ref());
259            expected.store(schema_version, std::sync::atomic::Ordering::Relaxed);
260        } else {
261            TXN.set(Some(TransactionWithRows::new_empty(txn)));
262        }
263
264        const {
265            assert!(size_of::<Self>() == 0);
266        }
267        Self::new_ref()
268    }
269}
270
271impl<S> Transaction<S> {
272    /// Execute a query with multiple results.
273    ///
274    /// ```
275    /// # use rust_query::{private::doctest::*};
276    /// # get_txn(|txn| {
277    /// let user_names = txn.query(|rows| {
278    ///     let user = rows.join(User);
279    ///     rows.into_vec(&user.name)
280    /// });
281    /// assert_eq!(user_names, vec!["Alice".to_owned()]);
282    /// # });
283    /// ```
284    pub fn query<'t, F, R>(&'t self, f: F) -> R
285    where
286        F: for<'inner> FnOnce(&mut Query<'t, 'inner, S>) -> R,
287    {
288        // Execution already happens in a [Transaction].
289        // and thus any [TransactionMut] that it might be borrowed
290        // from is borrowed immutably, which means the rows can not change.
291
292        let q = Rows {
293            phantom: PhantomData,
294            ast: Default::default(),
295            _p: PhantomData,
296        };
297        f(&mut Query {
298            q,
299            phantom: PhantomData,
300        })
301    }
302
303    /// Retrieve a single result from the database.
304    ///
305    /// ```
306    /// # use rust_query::{private::doctest::*, IntoExpr};
307    /// # rust_query::private::doctest::get_txn(|txn| {
308    /// let res = txn.query_one("test".into_expr());
309    /// assert_eq!(res, "test");
310    /// # });
311    /// ```
312    ///
313    /// Instead of using [Self::query_one] in a loop, it is better to
314    /// call [Self::query] and return all results at once.
315    pub fn query_one<O: 'static>(&self, val: impl IntoSelect<'static, S, Out = O>) -> O {
316        self.query(|e| e.into_iter(val.into_select()).next().unwrap())
317    }
318
319    pub fn lazy<'t, T: MyTyp>(&'t self, val: impl IntoExpr<'static, S, Typ = T>) -> T::Lazy<'t> {
320        T::out_to_lazy(self.query_one(val.into_expr()))
321    }
322
323    pub fn lazy_iter<'t, T: Table<Schema = S>>(
324        &'t self,
325        val: impl Joinable<'static, Typ = T>,
326    ) -> LazyIter<'t, T> {
327        let val = DynJoinable::new(val);
328        self.query(|rows| {
329            let table = rows.join(val);
330            LazyIter {
331                txn: self,
332                iter: rows.into_iter(table),
333            }
334        })
335    }
336}
337
338pub struct LazyIter<'t, T: Table> {
339    txn: &'t Transaction<T::Schema>,
340    iter: crate::query::Iter<'t, TableRow<T>>,
341}
342
343impl<'t, T: Table> Iterator for LazyIter<'t, T> {
344    type Item = <T as MyTyp>::Lazy<'t>;
345
346    fn next(&mut self) -> Option<Self::Item> {
347        self.iter.next().map(|x| self.txn.lazy(x))
348    }
349}
350
351impl<S: 'static> Transaction<S> {
352    /// Try inserting a value into the database.
353    ///
354    /// Returns [Ok] with a reference to the new inserted value or an [Err] with conflict information.
355    /// The type of conflict information depends on the number of unique constraints on the table:
356    /// - 0 unique constraints => [Infallible]
357    /// - 1 unique constraint => [Expr] reference to the conflicting table row.
358    /// - 2+ unique constraints => `()` no further information is provided.
359    ///
360    /// ```
361    /// # use rust_query::{private::doctest::*, IntoExpr};
362    /// # rust_query::private::doctest::get_txn(|mut txn| {
363    /// let res = txn.insert(User {
364    ///     name: "Bob",
365    /// });
366    /// assert!(res.is_ok());
367    /// let res = txn.insert(User {
368    ///     name: "Bob",
369    /// });
370    /// assert!(res.is_err(), "there is a unique constraint on the name");
371    /// # });
372    /// ```
373    pub fn insert<T: Table<Schema = S>>(
374        &mut self,
375        val: impl TableInsert<T = T>,
376    ) -> Result<TableRow<T>, T::Conflict> {
377        try_insert_private(T::NAME.into_table_ref(), None, val.into_insert())
378    }
379
380    /// This is a convenience function to make using [Transaction::insert]
381    /// easier for tables without unique constraints.
382    ///
383    /// The new row is added to the table and the row reference is returned.
384    pub fn insert_ok<T: Table<Schema = S, Conflict = Infallible>>(
385        &mut self,
386        val: impl TableInsert<T = T>,
387    ) -> TableRow<T> {
388        let Ok(row) = self.insert(val);
389        row
390    }
391
392    /// This is a convenience function to make using [Transaction::insert]
393    /// easier for tables with exactly one unique constraints.
394    ///
395    /// The new row is inserted and the reference to the row is returned OR
396    /// an existing row is found which conflicts with the new row and a reference
397    /// to the conflicting row is returned.
398    ///
399    /// ```
400    /// # use rust_query::{private::doctest::*, IntoExpr};
401    /// # rust_query::private::doctest::get_txn(|mut txn| {
402    /// let bob = txn.insert(User {
403    ///     name: "Bob",
404    /// }).unwrap();
405    /// let bob2 = txn.find_or_insert(User {
406    ///     name: "Bob", // this will conflict with the existing row.
407    /// });
408    /// assert_eq!(bob, bob2);
409    /// # });
410    /// ```
411    pub fn find_or_insert<T: Table<Schema = S, Conflict = TableRow<T>>>(
412        &mut self,
413        val: impl TableInsert<T = T>,
414    ) -> TableRow<T> {
415        match self.insert(val) {
416            Ok(row) => row,
417            Err(row) => row,
418        }
419    }
420
421    /// Try updating a row in the database to have new column values.
422    ///
423    /// Updating can fail just like [Transaction::insert] because of unique constraint conflicts.
424    /// This happens when the new values are in conflict with an existing different row.
425    ///
426    /// When the update succeeds, this function returns [Ok], when it fails it returns [Err] with one of
427    /// three conflict types:
428    /// - 0 unique constraints => [Infallible]
429    /// - 1 unique constraint => [Expr] reference to the conflicting table row.
430    /// - 2+ unique constraints => `()` no further information is provided.
431    ///
432    /// ```
433    /// # use rust_query::{private::doctest::*, IntoExpr, Update};
434    /// # rust_query::private::doctest::get_txn(|mut txn| {
435    /// let bob = txn.insert(User {
436    ///     name: "Bob",
437    /// }).unwrap();
438    /// txn.update(bob, User {
439    ///     name: Update::set("New Bob"),
440    /// }).unwrap();
441    /// # });
442    /// ```
443    pub fn update<T: Table<Schema = S>>(
444        &mut self,
445        row: impl IntoExpr<'static, S, Typ = T>,
446        val: T::Update,
447    ) -> Result<(), T::Conflict> {
448        let mut id = ValueBuilder::default();
449        let row = row.into_expr();
450        let (id, _) = id.simple_one(DynTypedExpr::erase(&row));
451
452        let val = T::apply_try_update(val, row);
453        let mut reader = Reader::default();
454        T::read(&val, &mut reader);
455        let (col_names, col_exprs): (Vec<_>, Vec<_>) = reader.builder.into_iter().collect();
456
457        let (select, col_fields) = ValueBuilder::default().simple(col_exprs);
458        let cte = CommonTableExpression::new()
459            .query(select)
460            .columns(col_fields.clone())
461            .table_name(Alias::new("cte"))
462            .to_owned();
463        let with_clause = WithClause::new().cte(cte).to_owned();
464
465        let mut update = UpdateStatement::new()
466            .table(("main", T::NAME))
467            .cond_where(Expr::col(("main", T::NAME, T::ID)).in_subquery(id))
468            .to_owned();
469
470        for (name, field) in zip(col_names, col_fields) {
471            let select = SelectStatement::new()
472                .from(Alias::new("cte"))
473                .column(field)
474                .to_owned();
475            let value = sea_query::Expr::SubQuery(
476                None,
477                Box::new(sea_query::SubQueryStatement::SelectStatement(select)),
478            );
479            update.value(Alias::new(name), value);
480        }
481
482        let (query, args) = update.with(with_clause).build_rusqlite(SqliteQueryBuilder);
483
484        let res = TXN.with_borrow(|txn| {
485            let txn = txn.as_ref().unwrap().get();
486
487            let mut stmt = txn.prepare_cached(&query).unwrap();
488            stmt.execute(&*args.as_params())
489        });
490
491        match res {
492            Ok(1) => Ok(()),
493            Ok(n) => panic!("unexpected number of updates: {n}"),
494            Err(rusqlite::Error::SqliteFailure(kind, Some(_val)))
495                if kind.code == ErrorCode::ConstraintViolation =>
496            {
497                // val looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
498                Err(T::get_conflict_unchecked(self, &val))
499            }
500            Err(err) => panic!("{err:?}"),
501        }
502    }
503
504    /// This is a convenience function to use [Transaction::update] for updates
505    /// that can not cause unique constraint violations.
506    ///
507    /// This method can be used for all tables, it just does not allow modifying
508    /// columns that are part of unique constraints.
509    pub fn update_ok<T: Table<Schema = S>>(
510        &mut self,
511        row: impl IntoExpr<'static, S, Typ = T>,
512        val: T::UpdateOk,
513    ) {
514        match self.update(row, T::update_into_try_update(val)) {
515            Ok(val) => val,
516            Err(_) => {
517                unreachable!("update can not fail")
518            }
519        }
520    }
521
522    /// Convert the [Transaction] into a [TransactionWeak] to allow deletions.
523    pub fn downgrade(&'static mut self) -> &'static mut TransactionWeak<S> {
524        // TODO: clean this up
525        Box::leak(Box::new(TransactionWeak { inner: PhantomData }))
526    }
527}
528
529/// This is the weak version of [Transaction].
530///
531/// The reason that it is called `weak` is because [TransactionWeak] can not guarantee
532/// that [TableRow]s prove the existence of their particular row.
533///
534/// [TransactionWeak] is useful because it allowes deleting rows.
535pub struct TransactionWeak<S> {
536    inner: PhantomData<Transaction<S>>,
537}
538
539impl<S: Schema> TransactionWeak<S> {
540    /// Try to delete a row from the database.
541    ///
542    /// This will return an [Err] if there is a row that references the row that is being deleted.
543    /// When this method returns [Ok] it will contain a [bool] that is either
544    /// - `true` if the row was just deleted.
545    /// - `false` if the row was deleted previously in this transaction.
546    pub fn delete<T: Table<Schema = S>>(&mut self, val: TableRow<T>) -> Result<bool, T::Referer> {
547        let schema = crate::schema::from_macro::Schema::new::<S>();
548
549        // This is a manual check that foreign key constraints are not violated.
550        // We do this manually because we don't want to enabled foreign key constraints for the whole
551        // transaction (and is not possible to enable for part of a transaction).
552        let mut checks = vec![];
553        for (table_name, table) in &schema.tables {
554            for col in table.columns.iter().filter_map(|(col_name, col)| {
555                let col = &col.def;
556                col.fk
557                    .as_ref()
558                    .is_some_and(|(t, c)| t == T::NAME && c == T::ID)
559                    .then_some(col_name)
560            }) {
561                let stmt = SelectStatement::new()
562                    .expr(
563                        val.in_subquery(
564                            SelectStatement::new()
565                                .from(Alias::new(table_name))
566                                .column(Alias::new(col))
567                                .take(),
568                        ),
569                    )
570                    .take();
571                checks.push(stmt.build_rusqlite(SqliteQueryBuilder));
572            }
573        }
574
575        let stmt = DeleteStatement::new()
576            .from_table(("main", T::NAME))
577            .cond_where(Expr::col(("main", T::NAME, T::ID)).eq(val.inner.idx))
578            .take();
579
580        let (query, args) = stmt.build_rusqlite(SqliteQueryBuilder);
581
582        TXN.with_borrow(|txn| {
583            let txn = txn.as_ref().unwrap().get();
584
585            for (query, args) in checks {
586                let mut stmt = txn.prepare_cached(&query).unwrap();
587                match stmt.query_one(&*args.as_params(), |r| r.get(0)) {
588                    Ok(true) => return Err(T::get_referer_unchecked()),
589                    Ok(false) => {}
590                    Err(err) => panic!("{err:?}"),
591                }
592            }
593
594            let mut stmt = txn.prepare_cached(&query).unwrap();
595            match stmt.execute(&*args.as_params()) {
596                Ok(0) => Ok(false),
597                Ok(1) => Ok(true),
598                Ok(n) => {
599                    panic!("unexpected number of deletes {n}")
600                }
601                Err(err) => panic!("{err:?}"),
602            }
603        })
604    }
605
606    /// Delete a row from the database.
607    ///
608    /// This is the infallible version of [TransactionWeak::delete].
609    ///
610    /// To be able to use this method you have to mark the table as `#[no_reference]` in the schema.
611    pub fn delete_ok<T: Table<Referer = Infallible, Schema = S>>(
612        &mut self,
613        val: TableRow<T>,
614    ) -> bool {
615        let Ok(res) = self.delete(val);
616        res
617    }
618
619    /// This allows you to do (almost) anything you want with the internal [rusqlite::Transaction].
620    ///
621    /// Note that there are some things that you should not do with the transaction, such as:
622    /// - Changes to the schema, these will result in a panic as described in [Database].
623    /// - Making changes that violate foreign-key constraints (see below).
624    ///
625    /// Sadly it is not possible to enable (or disable) the `foreign_keys` pragma during a transaction.
626    /// This means that whether this pragma is enabled depends on which [crate::migrate::ForeignKeys]
627    /// option is used and can not be changed.
628    pub fn rusqlite_transaction<R>(&mut self, f: impl FnOnce(&rusqlite::Transaction) -> R) -> R {
629        TXN.with_borrow(|txn| f(txn.as_ref().unwrap().get()))
630    }
631}
632
633pub fn try_insert_private<T: Table>(
634    table: sea_query::TableRef,
635    idx: Option<i64>,
636    val: T::Insert,
637) -> Result<TableRow<T>, T::Conflict> {
638    let mut reader = Reader::default();
639    T::read(&val, &mut reader);
640    if let Some(idx) = idx {
641        reader.col(T::ID, idx);
642    }
643    let (col_names, col_exprs): (Vec<_>, Vec<_>) = reader.builder.into_iter().collect();
644    let is_empty = col_names.is_empty();
645
646    let (select, _) = ValueBuilder::default().simple(col_exprs);
647
648    let mut insert = InsertStatement::new();
649    insert.into_table(table);
650    insert.columns(col_names.into_iter().map(Alias::new));
651    if is_empty {
652        // select always has at least one column, so we leave it out when there are no columns
653        insert.or_default_values();
654    } else {
655        insert.select_from(select).unwrap();
656    }
657    insert.returning_col(T::ID);
658
659    let (sql, values) = insert.build_rusqlite(SqliteQueryBuilder);
660
661    let res = TXN.with_borrow(|txn| {
662        let txn = txn.as_ref().unwrap().get();
663        track_stmt(txn, &sql, &values);
664
665        let mut statement = txn.prepare_cached(&sql).unwrap();
666        let mut res = statement
667            .query_map(&*values.as_params(), |row| {
668                Ok(TableRow::<T>::from_sql(row.get_ref(T::ID)?)?)
669            })
670            .unwrap();
671
672        res.next().unwrap()
673    });
674
675    match res {
676        Ok(id) => Ok(id),
677        Err(rusqlite::Error::SqliteFailure(kind, Some(_val)))
678            if kind.code == ErrorCode::ConstraintViolation =>
679        {
680            // val looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
681            Err(T::get_conflict_unchecked(&Transaction::new(), &val))
682        }
683        Err(err) => panic!("{err:?}"),
684    }
685}