Skip to main content

rust_query/
transaction.rs

1use std::{cell::RefCell, convert::Infallible, marker::PhantomData, sync::atomic::AtomicI64};
2
3use rusqlite::ErrorCode;
4use sea_query::{
5    Alias, DeleteStatement, Expr, ExprTrait, InsertStatement, IntoIden, SelectStatement,
6    SqliteQueryBuilder, UpdateStatement,
7};
8use sea_query_rusqlite::RusqliteBinder;
9use self_cell::{MutBorrow, self_cell};
10
11use crate::{
12    IntoExpr, IntoSelect, Table, TableRow,
13    error::FromConflict,
14    migrate::{Schema, check_schema, schema_version, user_version},
15    migration::Config,
16    mutable::Mutable,
17    pool::Pool,
18    private::{IntoJoinable, Reader},
19    query::{OwnedRows, Query, track_stmt},
20    rows::Rows,
21    value::{DbTyp, OptTable},
22};
23
24/// [Database] is a proof that the database has been configured.
25///
26/// Creating a [Database] requires going through the steps to migrate an existing database to
27/// the required schema, or creating a new database from scratch (See also [crate::migration::Config]).
28/// Please see [Database::migrator] to get started.
29///
30/// Having done the setup to create a compatible database is sadly not a guarantee that the
31/// database will stay compatible for the lifetime of the [Database] struct.
32/// That is why [Database] also stores the `schema_version`. This allows detecting non-malicious
33/// modifications to the schema and gives us the ability to panic when this is detected.
34/// Such non-malicious modification of the schema can happen for example if another [Database]
35/// instance is created with additional migrations (e.g. by another newer instance of your program).
36pub struct Database<S> {
37    pub(crate) manager: Pool,
38    pub(crate) schema_version: AtomicI64,
39    pub(crate) schema: PhantomData<S>,
40    pub(crate) mut_lock: parking_lot::FairMutex<()>,
41}
42
43impl<S: Schema> Database<S> {
44    /// This is a quick way to open a database if you don't care about migration.
45    ///
46    /// Note that this will panic if the schema version doesn't match or when the schema
47    /// itself doesn't match the expected schema.
48    pub fn new(config: Config) -> Self {
49        let Some(m) = Self::migrator(config) else {
50            panic!("schema version {}, but got an older version", S::VERSION)
51        };
52        let Some(m) = m.finish() else {
53            panic!("schema version {}, but got a new version", S::VERSION)
54        };
55        m
56    }
57}
58
59use rusqlite::Connection;
60type RTransaction<'x> = Option<rusqlite::Transaction<'x>>;
61
62self_cell!(
63    pub struct OwnedTransaction {
64        owner: MutBorrow<Connection>,
65
66        #[covariant]
67        dependent: RTransaction,
68    }
69);
70
71/// SAFETY:
72/// `RTransaction: !Send` because it borrows from `Connection` and `Connection: !Sync`.
73/// `OwnedTransaction` can be `Send` because we know that `dependent` is the only
74/// borrow of `owner` and `OwnedTransaction: !Sync` so `dependent` can not be borrowed
75/// from multiple threads.
76unsafe impl Send for OwnedTransaction {}
77assert_not_impl_any! {OwnedTransaction: Sync}
78
79thread_local! {
80    pub(crate) static TXN: RefCell<Option<TransactionWithRows>> = const { RefCell::new(None) };
81}
82
83impl OwnedTransaction {
84    pub(crate) fn get(&self) -> &rusqlite::Transaction<'_> {
85        self.borrow_dependent().as_ref().unwrap()
86    }
87
88    pub(crate) fn with(
89        mut self,
90        f: impl FnOnce(rusqlite::Transaction<'_>),
91    ) -> rusqlite::Connection {
92        self.with_dependent_mut(|_, b| f(b.take().unwrap()));
93        self.into_owner().into_inner()
94    }
95}
96
97type OwnedRowsVec<'x> = slab::Slab<OwnedRows<'x>>;
98self_cell!(
99    pub struct TransactionWithRows {
100        owner: OwnedTransaction,
101
102        #[not_covariant]
103        dependent: OwnedRowsVec,
104    }
105);
106
107impl TransactionWithRows {
108    pub(crate) fn new_empty(txn: OwnedTransaction) -> Self {
109        Self::new(txn, |_| slab::Slab::new())
110    }
111
112    pub(crate) fn get(&self) -> &rusqlite::Transaction<'_> {
113        self.borrow_owner().get()
114    }
115}
116
117impl<S: Send + Sync + Schema> Database<S> {
118    #[doc = include_str!("database/transaction.md")]
119    pub fn transaction<R: Send>(&self, f: impl Send + FnOnce(&'static Transaction<S>) -> R) -> R {
120        let res = std::thread::scope(|scope| scope.spawn(|| self.transaction_local(f)).join());
121        match res {
122            Ok(val) => val,
123            Err(payload) => std::panic::resume_unwind(payload),
124        }
125    }
126
127    /// Same as [Self::transaction], but can only be used on a new thread.
128    pub(crate) fn transaction_local<R>(&self, f: impl FnOnce(&'static Transaction<S>) -> R) -> R {
129        let conn = self.manager.pop();
130
131        let owned = OwnedTransaction::new(MutBorrow::new(conn), |conn| {
132            Some(conn.borrow_mut().transaction().unwrap())
133        });
134
135        let res = f(Transaction::new_checked(owned, &self.schema_version));
136
137        let owned = TXN.take().unwrap().into_owner();
138        self.manager.push(owned.into_owner().into_inner());
139
140        res
141    }
142
143    #[doc = include_str!("database/transaction_mut.md")]
144    pub fn transaction_mut<O: Send, E: Send>(
145        &self,
146        f: impl Send + FnOnce(&'static mut Transaction<S>) -> Result<O, E>,
147    ) -> Result<O, E> {
148        let join_res =
149            std::thread::scope(|scope| scope.spawn(|| self.transaction_mut_local(f)).join());
150
151        match join_res {
152            Ok(val) => val,
153            Err(payload) => std::panic::resume_unwind(payload),
154        }
155    }
156
157    pub(crate) fn transaction_mut_local<O, E>(
158        &self,
159        f: impl FnOnce(&'static mut Transaction<S>) -> Result<O, E>,
160    ) -> Result<O, E> {
161        // Acquire the lock before creating the connection.
162        // Technically we can acquire the lock later, but we don't want to waste
163        // file descriptors on transactions that need to wait anyway.
164        let guard = self.mut_lock.lock();
165
166        let conn = self.manager.pop();
167
168        let owned = OwnedTransaction::new(MutBorrow::new(conn), |conn| {
169            let txn = conn
170                .borrow_mut()
171                .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
172                .unwrap();
173            Some(txn)
174        });
175        // if this panics then the transaction is rolled back and the guard is dropped.
176        let res = f(Transaction::new_checked(owned, &self.schema_version));
177
178        // Drop the guard before commiting to let sqlite go to the next transaction
179        // more quickly while guaranteeing that the database will unlock soon.
180        drop(guard);
181
182        let owned = TXN.take().unwrap().into_owner();
183
184        let conn = if res.is_ok() {
185            owned.with(|x| x.commit().unwrap())
186        } else {
187            owned.with(|x| x.rollback().unwrap())
188        };
189        self.manager.push(conn);
190
191        res
192    }
193
194    #[doc = include_str!("database/transaction_mut_ok.md")]
195    pub fn transaction_mut_ok<R: Send>(
196        &self,
197        f: impl Send + FnOnce(&'static mut Transaction<S>) -> R,
198    ) -> R {
199        self.transaction_mut(|txn| Ok::<R, Infallible>(f(txn)))
200            .unwrap()
201    }
202
203    /// Create a new [rusqlite::Connection] to the database.
204    ///
205    /// You can do (almost) anything you want with this connection as it is almost completely isolated from all other
206    /// [rust_query] connections. The only thing you should not do here is changing the schema.
207    /// Schema changes are detected with the `schema_version` pragma and will result in a panic when creating a new
208    /// [rust_query] transaction.
209    ///
210    /// The `foreign_keys` pragma is always enabled here, even if [crate::migration::ForeignKeys::SQLite] is not used.
211    ///
212    /// Note that many systems have a limit on the number of file descriptors that can
213    /// exist in a single process. On my machine the soft limit is (1024) by default.
214    /// If this limit is reached, it may cause a panic in this method.
215    pub fn rusqlite_connection(&self) -> rusqlite::Connection {
216        let conn = self.manager.pop();
217        conn.pragma_update(None, "foreign_keys", "ON").unwrap();
218        conn
219    }
220}
221
222/// [Transaction] can be used to query and update the database.
223///
224/// From the perspective of a [Transaction] each other [Transaction] is fully applied or not at all.
225/// Futhermore, the effects of [Transaction]s have a global order.
226/// So if we have mutations `A` and then `B`, it is impossible for a [Transaction] to see the effect of `B` without seeing the effect of `A`.
227pub struct Transaction<S> {
228    pub(crate) _p2: PhantomData<S>,
229    pub(crate) _local: PhantomData<*const ()>,
230}
231
232impl<S> Transaction<S> {
233    pub(crate) fn new() -> Self {
234        Self {
235            _p2: PhantomData,
236            _local: PhantomData,
237        }
238    }
239
240    pub(crate) fn copy(&self) -> Self {
241        Self::new()
242    }
243
244    pub(crate) fn new_ref() -> &'static mut Self {
245        // no memory is leaked because Self is zero sized
246        Box::leak(Box::new(Self::new()))
247    }
248}
249
250impl<S: Schema> Transaction<S> {
251    /// This will check the schema version and panic if it is not as expected
252    pub(crate) fn new_checked(txn: OwnedTransaction, expected: &AtomicI64) -> &'static mut Self {
253        let schema_version = schema_version(txn.get());
254        // If the schema version is not the expected version then we
255        // check if the changes are acceptable.
256        if schema_version != expected.load(std::sync::atomic::Ordering::Relaxed) {
257            if user_version(txn.get()).unwrap() != S::VERSION {
258                panic!("The database user_version changed unexpectedly")
259            }
260
261            TXN.set(Some(TransactionWithRows::new_empty(txn)));
262            check_schema::<S>(Self::new_ref());
263            expected.store(schema_version, std::sync::atomic::Ordering::Relaxed);
264        } else {
265            TXN.set(Some(TransactionWithRows::new_empty(txn)));
266        }
267
268        const {
269            assert!(size_of::<Self>() == 0);
270        }
271        Self::new_ref()
272    }
273}
274
275impl<S> Transaction<S> {
276    /// Execute a query with multiple results.
277    ///
278    /// ```
279    /// # use rust_query::{private::doctest::*};
280    /// # get_txn(|txn| {
281    /// let user_names = txn.query(|rows| {
282    ///     let user = rows.join(User);
283    ///     rows.into_vec(&user.name)
284    /// });
285    /// assert_eq!(user_names, vec!["Alice".to_owned()]);
286    /// # });
287    /// ```
288    pub fn query<'t, R>(&'t self, f: impl FnOnce(&mut Query<'t, '_, S>) -> R) -> R {
289        // Execution already happens in a [Transaction].
290        // and thus any [TransactionMut] that it might be borrowed
291        // from is borrowed immutably, which means the rows can not change.
292
293        let q = Rows {
294            phantom: PhantomData,
295            ast: Default::default(),
296            _p: PhantomData,
297        };
298        f(&mut Query {
299            q,
300            phantom: PhantomData,
301        })
302    }
303
304    /// Retrieve a single result from the database.
305    ///
306    /// ```
307    /// # use rust_query::{private::doctest::*, IntoExpr};
308    /// # rust_query::private::doctest::get_txn(|txn| {
309    /// let res = txn.query_one("test".into_expr());
310    /// assert_eq!(res, "test");
311    /// # });
312    /// ```
313    ///
314    /// Instead of using [Self::query_one] in a loop, it is better to
315    /// call [Self::query] and return all results at once.
316    pub fn query_one<O: 'static>(&self, val: impl IntoSelect<'static, S, Out = O>) -> O {
317        let mut query = self.query(|e| e.into_iter(val.into_select()));
318        let res = query.next().unwrap();
319        debug_assert!(query.next().is_none(), "query should return one row");
320        res
321    }
322
323    /// Retrieve a [crate::Lazy] value from the database.
324    ///
325    /// This is very similar to [Self::query_one], except that it retrieves
326    /// [crate::Lazy] instead of [TableRow]. As such it only works with
327    /// table valued [Expr].
328    ///
329    /// [Self::lazy] also works for optional rows, so you can write `txn.lazy(User.email(e))`.
330    pub fn lazy<'t, T: OptTable<Schema = S>>(
331        &'t self,
332        val: impl IntoExpr<'static, S, Typ = T>,
333    ) -> T::Lazy<'t> {
334        T::out_to_lazy(self.query_one(val.into_expr()))
335    }
336
337    /// This retrieves an iterator of [crate::Lazy] values.
338    ///
339    /// Refer to [Rows::join] for the kind of the parameter that is supported here.
340    pub fn lazy_iter<'t, T: Table<Schema = S>>(
341        &'t self,
342        val: impl IntoJoinable<'static, S, Typ = TableRow<T>>,
343    ) -> LazyIter<'t, T> {
344        let val = val.into_joinable();
345        self.query(|rows| {
346            let table = rows.join(val);
347            LazyIter {
348                txn: self,
349                iter: rows.into_iter(table),
350            }
351        })
352    }
353
354    /// Retrieves a [Mutable] row from the database.
355    ///
356    /// The [Transaction] is borrowed mutably until the [Mutable] is dropped.
357    pub fn mutable<'t, T: OptTable<Schema = S>>(
358        &'t mut self,
359        val: impl IntoExpr<'static, S, Typ = T>,
360    ) -> T::Mutable<'t> {
361        let x = self.query_one(T::select_opt_mutable(val.into_expr()));
362        T::into_mutable(x)
363    }
364
365    /// Retrieve multiple [crate::Mutable] rows from the database.
366    ///
367    /// Refer to [Rows::join] for the kind of the parameter that is supported here.
368    pub fn mutable_vec<'t, T: Table<Schema = S>>(
369        &'t mut self,
370        val: impl IntoJoinable<'static, S, Typ = TableRow<T>>,
371    ) -> Vec<Mutable<'t, T>> {
372        let val = val.into_joinable();
373        self.query(|rows| {
374            let val = rows.join(val);
375            rows.into_vec((T::into_select(val.clone()), val))
376                .into_iter()
377                .map(TableRow::<T>::into_mutable)
378                .collect()
379        })
380    }
381}
382
383pub struct LazyIter<'t, T: Table> {
384    txn: &'t Transaction<T::Schema>,
385    iter: crate::query::Iter<'t, TableRow<T>>,
386}
387
388impl<'t, T: Table> Iterator for LazyIter<'t, T> {
389    type Item = crate::Lazy<'t, T>;
390
391    fn next(&mut self) -> Option<Self::Item> {
392        self.iter.next().map(|x| self.txn.lazy(x))
393    }
394}
395
396impl<S: 'static> Transaction<S> {
397    /// Try inserting a value into the database.
398    ///
399    /// Returns [Ok] with a reference to the new inserted value or an [Err] with conflict information.
400    /// The type of conflict information depends on the number of unique constraints on the table:
401    /// - 0 unique constraints => [Infallible]
402    /// - 1 unique constraint => [TableRow] reference to the conflicting table row.
403    /// - 2+ unique constraints => [crate::Conflict].
404    ///
405    /// ```
406    /// # use rust_query::{private::doctest::*, IntoExpr};
407    /// # rust_query::private::doctest::get_txn(|mut txn| {
408    /// let res = txn.insert(User {
409    ///     name: "Bob".to_owned(),
410    /// });
411    /// assert!(res.is_ok());
412    /// let res = txn.insert(User {
413    ///     name: "Bob".to_owned(),
414    /// });
415    /// assert!(res.is_err(), "there is a unique constraint on the name");
416    /// # });
417    /// ```
418    pub fn insert<T: Table<Schema = S>>(&mut self, val: T) -> Result<TableRow<T>, T::Conflict> {
419        try_insert_private(T::NAME.into_iden(), None, val)
420    }
421
422    /// This is a convenience function to make using [Transaction::insert]
423    /// easier for tables without unique constraints.
424    ///
425    /// The new row is added to the table and the row reference is returned.
426    pub fn insert_ok<T: Table<Schema = S, Conflict = Infallible>>(
427        &mut self,
428        val: T,
429    ) -> TableRow<T> {
430        let Ok(row) = self.insert(val);
431        row
432    }
433
434    /// This is a convenience function to make using [Transaction::insert]
435    /// easier for tables with exactly one unique constraints.
436    ///
437    /// The new row is inserted and the reference to the row is returned OR
438    /// an existing row is found which conflicts with the new row and a reference
439    /// to the conflicting row is returned.
440    ///
441    /// ```
442    /// # use rust_query::{private::doctest::*, IntoExpr};
443    /// # rust_query::private::doctest::get_txn(|mut txn| {
444    /// let bob = txn.insert(User {
445    ///     name: "Bob".to_owned(),
446    /// }).unwrap();
447    /// let bob2 = txn.find_or_insert(User {
448    ///     name: "Bob".to_owned(), // this will conflict with the existing row.
449    /// });
450    /// assert_eq!(bob, bob2);
451    /// # });
452    /// ```
453    pub fn find_or_insert<T: Table<Schema = S, Conflict = TableRow<T>>>(
454        &mut self,
455        val: T,
456    ) -> TableRow<T> {
457        match self.insert(val) {
458            Ok(row) => row,
459            Err(row) => row,
460        }
461    }
462
463    pub(crate) fn update<T: Table<Schema = S>>(
464        &mut self,
465        row: TableRow<T>,
466        val: T::Mutable,
467    ) -> Result<(), T::Conflict> {
468        let val = T::mutable_into_insert(val);
469        let mut reader = Reader::default();
470        T::read(&val, &mut reader);
471
472        let (query, args) = UpdateStatement::new()
473            .table(("main", T::NAME))
474            .values(reader.builder.clone())
475            .cond_where(Expr::col((T::NAME, T::ID)).eq(row.inner.idx))
476            .build_rusqlite(SqliteQueryBuilder);
477
478        let res = TXN.with_borrow(|txn| {
479            let txn = txn.as_ref().unwrap().get();
480
481            let mut stmt = txn.prepare_cached(&query).unwrap();
482            stmt.execute(&*args.as_params())
483        });
484
485        match res {
486            Ok(1) => Ok(()),
487            Ok(n) => panic!("unexpected number of updates: {n}"),
488            Err(rusqlite::Error::SqliteFailure(kind, Some(msg)))
489                if kind.code == ErrorCode::ConstraintViolation =>
490            {
491                // `msg` looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
492                let res = TXN.with_borrow(|txn| {
493                    let txn = txn.as_ref().unwrap().get();
494                    <T::Conflict as FromConflict>::from_conflict(
495                        txn,
496                        T::NAME.into_iden(),
497                        reader.builder,
498                        msg,
499                    )
500                });
501                Err(res)
502            }
503            Err(err) => panic!("{err:?}"),
504        }
505    }
506
507    /// Convert the [Transaction] into a [TransactionWeak] to allow deletions.
508    pub fn downgrade(&'static mut self) -> &'static mut TransactionWeak<S> {
509        // TODO: clean this up
510        Box::leak(Box::new(TransactionWeak { inner: PhantomData }))
511    }
512}
513
514/// This is the weak version of [Transaction].
515///
516/// The reason that it is called `weak` is because [TransactionWeak] can not guarantee
517/// that [TableRow]s prove the existence of their particular row.
518///
519/// [TransactionWeak] is useful because it allowes deleting rows.
520pub struct TransactionWeak<S> {
521    inner: PhantomData<Transaction<S>>,
522}
523
524impl<S: Schema> TransactionWeak<S> {
525    /// Try to delete a row from the database.
526    ///
527    /// This will return an [Err] if there is a row that references the row that is being deleted.
528    /// When this method returns [Ok] it will contain a [bool] that is either
529    /// - `true` if the row was just deleted.
530    /// - `false` if the row was deleted previously in this transaction.
531    pub fn delete<T: Table<Schema = S>>(&mut self, val: TableRow<T>) -> Result<bool, T::Referer> {
532        let schema = crate::schema::from_macro::Schema::new::<S>();
533
534        // This is a manual check that foreign key constraints are not violated.
535        // We do this manually because we don't want to enabled foreign key constraints for the whole
536        // transaction (and is not possible to enable for part of a transaction).
537        let mut checks = vec![];
538        for (&table_name, table) in &schema.tables {
539            for col in table.columns.iter().filter_map(|(col_name, col)| {
540                let col = &col.def;
541                col.fk
542                    .as_ref()
543                    .is_some_and(|(t, c)| t == T::NAME && c == T::ID)
544                    .then_some(col_name)
545            }) {
546                let stmt = SelectStatement::new()
547                    .expr(
548                        val.inner.idx.in_subquery(
549                            SelectStatement::new()
550                                .from(table_name)
551                                .column(Alias::new(col))
552                                .take(),
553                        ),
554                    )
555                    .take();
556                checks.push(stmt.build_rusqlite(SqliteQueryBuilder));
557            }
558        }
559
560        let stmt = DeleteStatement::new()
561            .from_table(("main", T::NAME))
562            .cond_where(Expr::col(("main", T::NAME, T::ID)).eq(val.inner.idx))
563            .take();
564
565        let (query, args) = stmt.build_rusqlite(SqliteQueryBuilder);
566
567        TXN.with_borrow(|txn| {
568            let txn = txn.as_ref().unwrap().get();
569
570            for (query, args) in checks {
571                let mut stmt = txn.prepare_cached(&query).unwrap();
572                match stmt.query_one(&*args.as_params(), |r| r.get(0)) {
573                    Ok(true) => return Err(T::get_referer_unchecked()),
574                    Ok(false) => {}
575                    Err(err) => panic!("{err:?}"),
576                }
577            }
578
579            let mut stmt = txn.prepare_cached(&query).unwrap();
580            match stmt.execute(&*args.as_params()) {
581                Ok(0) => Ok(false),
582                Ok(1) => Ok(true),
583                Ok(n) => {
584                    panic!("unexpected number of deletes {n}")
585                }
586                Err(err) => panic!("{err:?}"),
587            }
588        })
589    }
590
591    /// Delete a row from the database.
592    ///
593    /// This is the infallible version of [TransactionWeak::delete].
594    ///
595    /// To be able to use this method you have to mark the table as `#[no_reference]` in the schema.
596    pub fn delete_ok<T: Table<Referer = Infallible, Schema = S>>(
597        &mut self,
598        val: TableRow<T>,
599    ) -> bool {
600        let Ok(res) = self.delete(val);
601        res
602    }
603
604    /// This allows you to do (almost) anything you want with the internal [rusqlite::Transaction].
605    ///
606    /// Note that there are some things that you should not do with the transaction, such as:
607    /// - Changes to the schema, these will result in a panic as described in [Database].
608    /// - Making changes that violate foreign-key constraints (see below).
609    ///
610    /// Sadly it is not possible to enable (or disable) the `foreign_keys` pragma during a transaction.
611    /// This means that whether this pragma is enabled depends on which [crate::migration::ForeignKeys]
612    /// option is used and can not be changed.
613    pub fn rusqlite_transaction<R>(&mut self, f: impl FnOnce(&rusqlite::Transaction) -> R) -> R {
614        TXN.with_borrow(|txn| f(txn.as_ref().unwrap().get()))
615    }
616}
617
618pub fn try_insert_private<T: Table>(
619    table: sea_query::DynIden,
620    idx: Option<i64>,
621    val: T,
622) -> Result<TableRow<T>, T::Conflict> {
623    let mut reader = Reader::default();
624    T::read(&val, &mut reader);
625    if let Some(idx) = idx {
626        reader.col::<i64>(T::ID, idx);
627    }
628    let (col_names, col_exprs): (Vec<_>, Vec<_>) = reader.builder.clone().into_iter().collect();
629    let is_empty = col_names.is_empty();
630
631    let mut insert = InsertStatement::new();
632    insert.into_table(("main", table.clone()));
633    insert.columns(col_names);
634    if is_empty {
635        // values always has at least one column, so we leave it out when there are no columns
636        insert.or_default_values();
637    } else {
638        insert.values(col_exprs).unwrap();
639    }
640    insert.returning_col(T::ID);
641
642    let (sql, values) = insert.build_rusqlite(SqliteQueryBuilder);
643
644    let res = TXN.with_borrow(|txn| {
645        let txn = txn.as_ref().unwrap().get();
646        track_stmt(txn, &sql, &values);
647
648        let mut statement = txn.prepare_cached(&sql).unwrap();
649        let mut res = statement
650            .query_map(&*values.as_params(), |row| {
651                Ok(TableRow::<T>::from_sql(row.get_ref(T::ID)?)?)
652            })
653            .unwrap();
654
655        res.next().unwrap()
656    });
657
658    match res {
659        Ok(id) => {
660            if let Some(idx) = idx {
661                assert_eq!(idx, id.inner.idx);
662            }
663            Ok(id)
664        }
665        Err(rusqlite::Error::SqliteFailure(kind, Some(msg)))
666            if kind.code == ErrorCode::ConstraintViolation =>
667        {
668            // val looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
669            let res = TXN.with_borrow(|txn| {
670                let txn = txn.as_ref().unwrap().get();
671                <T::Conflict as FromConflict>::from_conflict(txn, table, reader.builder, msg)
672            });
673            Err(res)
674        }
675        Err(err) => panic!("{err:?}"),
676    }
677}