rust_query/
transaction.rs

1use std::{convert::Infallible, marker::PhantomData, ops::Deref};
2
3use rusqlite::ErrorCode;
4use sea_query::{
5    Alias, DeleteStatement, Expr, InsertStatement, SqliteQueryBuilder, UpdateStatement, Value,
6};
7use sea_query_rusqlite::RusqliteBinder;
8
9use crate::{
10    alias::Field,
11    ast::MySelect,
12    client::private_exec,
13    exec::Query,
14    insert::{Reader, Writable},
15    migrate::schema_version,
16    private::Dummy,
17    token::LocalClient,
18    value::MyTyp,
19    IntoColumn, Table, TableRow,
20};
21
22/// [Database] is a proof that the database has been configured.
23///
24/// For information on how to create transactions, please refer to [LocalClient].
25///
26/// Creating a [Database] requires going through the steps to migrate an existing database to
27/// the required schema, or creating a new database from scratch.
28/// Having done the setup to create a compatible database is sadly not a guarantee that the
29/// database will stay compatible for the lifetime of the [Database].
30///
31/// That is why [Database] also stores the `schema_version`. This allows detecting non-malicious
32/// modifications to the schema and gives us the ability to panic when this is detected.
33/// Such non-malicious modification of the schema can happen for example if another [Database]
34/// instance is created with additional migrations (e.g. by another newer instance of your program).
35///
36/// # Sqlite config
37///
38/// Sqlite is configured to be in [WAL mode](https://www.sqlite.org/wal.html).
39/// The effect of this mode is that there can be any number of readers with one concurrent writer.
40/// What is nice about this is that a [Transaction] can always be made immediately.
41/// Making a [TransactionMut] has to wait until all other [TransactionMut]s are finished.
42///
43/// Sqlite is also configured with [`synchronous=NORMAL`](https://www.sqlite.org/pragma.html#pragma_synchronous). This gives better performance by fsyncing less.
44/// The database will not lose transactions due to application crashes, but it might due to system crashes or power loss.
45pub struct Database<S> {
46    pub(crate) manager: r2d2_sqlite::SqliteConnectionManager,
47    pub(crate) schema_version: i64,
48    pub(crate) schema: PhantomData<S>,
49}
50
51/// [Transaction] can be used to query the database.
52///
53/// From the perspective of a [Transaction] each [TransactionMut] is fully applied or not at all.
54/// Futhermore, the effects of [TransactionMut]s have a global order.
55/// So if we have mutations `A` and then `B`, it is impossible for a [Transaction] to see the effect of `B` without seeing the effect of `A`.
56///
57/// All [TableRow] references retrieved from the database live for at most `'a`.
58/// This makes these references effectively local to this [Transaction].
59#[repr(transparent)]
60pub struct Transaction<'a, S> {
61    pub(crate) transaction: rusqlite::Transaction<'a>,
62    pub(crate) _p: PhantomData<fn(&'a S) -> &'a S>,
63    pub(crate) _local: PhantomData<LocalClient>,
64}
65
66/// Same as [Transaction], but allows inserting new rows.
67///
68/// [TransactionMut] always uses the latest version of the database, with the effects of all previous [TransactionMut]s applied.
69///
70/// To make mutations to the database permanent you need to use [TransactionMut::commit].
71/// This is to make sure that if a function panics while holding a mutable transaction, it will roll back those changes.
72pub struct TransactionMut<'a, S> {
73    pub(crate) inner: Transaction<'a, S>,
74}
75
76impl<'a, S> Deref for TransactionMut<'a, S> {
77    type Target = Transaction<'a, S>;
78
79    fn deref(&self) -> &Self::Target {
80        &self.inner
81    }
82}
83
84impl<'t, S> Transaction<'t, S> {
85    /// This will check the schema version and panic if it is not as expected
86    pub(crate) fn new_checked(txn: rusqlite::Transaction<'t>, expected: i64) -> Self {
87        if schema_version(&txn) != expected {
88            panic!("The database schema was updated unexpectedly")
89        }
90
91        Transaction {
92            transaction: txn,
93            _p: PhantomData,
94            _local: PhantomData,
95        }
96    }
97
98    /// Execute a query with multiple results.
99    ///
100    /// Please take a look at the documentation of [Query] for how to use it.
101    pub fn query<F, R>(&self, f: F) -> R
102    where
103        F: for<'a> FnOnce(&'a mut Query<'t, 'a, S>) -> R,
104    {
105        // Execution already happens in a [Transaction].
106        // and thus any [TransactionMut] that it might be borrowed
107        // from are borrowed immutably, so the rows can not change.
108        private_exec(&self.transaction, f)
109    }
110
111    /// Retrieve a single result from the database.
112    ///
113    /// Instead of using [Self::query_one] in a loop, it is better to
114    /// call [Self::query] and return all results at once.
115    pub fn query_one<O>(&self, val: impl Dummy<'t, 't, S, Out = O>) -> O
116    where
117        S: 'static,
118    {
119        // Theoretically this doesn't even need to be in a transaction.
120        // We already have one though, so we must use it.
121        let mut res = private_exec(&self.transaction, |e| {
122            // Cast the static lifetime to any lifetime necessary, this is fine because we know the static lifetime
123            // can not be guaranteed by a query scope.
124            e.into_vec_private(val)
125        });
126        res.pop().unwrap()
127    }
128}
129
130impl<'t, S: 'static> TransactionMut<'t, S> {
131    /// Try inserting a value into the database.
132    ///
133    /// Returns [Ok] with a reference to the new inserted value or an [Err] with conflict information.
134    /// The type of conflict information depends on the number of unique constraints on the table:
135    /// - 0 unique constraints => [Infallible]
136    /// - 1 unique constraint => [TableRow] reference to the conflicting table row.
137    /// - 2+ unique constraints => `()` no further information is provided.
138    pub fn try_insert<T: Table<Schema = S>, C>(
139        &mut self,
140        val: impl Writable<'t, T = T, Conflict = C, Schema = S>,
141    ) -> Result<TableRow<'t, T>, C> {
142        let ast = MySelect::default();
143
144        let reader = Reader {
145            ast: &ast,
146            _p: PhantomData,
147            _p2: PhantomData,
148        };
149        val.read(reader);
150
151        let select = ast.simple();
152
153        let mut insert = InsertStatement::new();
154        let names = ast.select.iter().map(|(_field, name)| *name);
155        insert.into_table(Alias::new(T::NAME));
156        insert.columns(names);
157        insert.select_from(select).unwrap();
158        insert.returning_col(Alias::new(T::ID));
159
160        let (sql, values) = insert.build_rusqlite(SqliteQueryBuilder);
161
162        let mut statement = self.transaction.prepare_cached(&sql).unwrap();
163        let mut res = statement
164            .query_map(&*values.as_params(), |row| {
165                Ok(<T as MyTyp>::from_sql(row.get_ref(T::ID)?)?)
166            })
167            .unwrap();
168
169        match res.next().unwrap() {
170            Ok(id) => Ok(id),
171            Err(rusqlite::Error::SqliteFailure(kind, Some(_val)))
172                if kind.code == ErrorCode::ConstraintViolation =>
173            {
174                // val looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
175                let conflict = self.query_one(val.get_conflict_unchecked());
176                Err(conflict.unwrap())
177            }
178            Err(err) => Err(err).unwrap(),
179        }
180    }
181
182    /// This is a convenience function to make using [TransactionMut::try_insert]
183    /// easier for tables without unique constraints.
184    ///
185    /// The new row is added to the table and the row reference is returned.
186    pub fn insert<T: Table<Schema = S>>(
187        &mut self,
188        val: impl Writable<'t, T = T, Conflict = Infallible, Schema = S>,
189    ) -> TableRow<'t, T> {
190        let Ok(row) = self.try_insert(val);
191        row
192    }
193
194    /// This is a convenience function to make using [TransactionMut::try_insert]
195    /// easier for tables with exactly one unique constraints.
196    ///
197    /// The new row is inserted and the reference to the row is returned OR
198    /// an existing row is found which conflicts with the new row and a reference
199    /// to the conflicting row is returned.
200    pub fn find_or_insert<T: Table<Schema = S>>(
201        &mut self,
202        val: impl Writable<'t, T = T, Conflict = TableRow<'t, T>, Schema = S>,
203    ) -> TableRow<'t, T> {
204        match self.try_insert(val) {
205            Ok(row) => row,
206            Err(row) => row,
207        }
208    }
209
210    /// Try updating a row in the database to have new column values.
211    ///
212    /// Updating can fail just like [TransactionMut::try_insert] because of unique constraint conflicts.
213    /// This happens when the new values are in conflict with an existing different row.
214    ///
215    /// When the update succeeds, this function returns [Ok<()>], when it fails it returns [Err] with one of
216    /// three conflict types:
217    /// - 0 unique constraints => [Infallible]
218    /// - 1 unique constraint => [TableRow] reference to the conflicting table row.
219    /// - 2+ unique constraints => `()` no further information is provided.
220    pub fn try_update<T: Table<Schema = S>, C>(
221        &mut self,
222        row: impl IntoColumn<'t, S, Typ = T>,
223        val: impl Writable<'t, T = T, Conflict = C, Schema = S>,
224    ) -> Result<(), C> {
225        let ast = MySelect::default();
226
227        let reader = Reader {
228            ast: &ast,
229            _p: PhantomData,
230            _p2: PhantomData,
231        };
232        val.read(reader);
233
234        let select = ast.simple();
235        let (query, args) = select.build_rusqlite(SqliteQueryBuilder);
236        let mut stmt = self.transaction.prepare_cached(&query).unwrap();
237
238        let row_id = self.query_one(row).idx;
239        let mut update = UpdateStatement::new()
240            .table(Alias::new(T::NAME))
241            .cond_where(Expr::val(row_id).equals(Alias::new(T::ID)))
242            .to_owned();
243
244        stmt.query_row(&*args.as_params(), |row| {
245            for (_, field) in ast.select.iter() {
246                let Field::Str(name) = field else { panic!() };
247
248                let val = match row.get_unwrap::<&str, rusqlite::types::Value>(*name) {
249                    rusqlite::types::Value::Null => Value::BigInt(None),
250                    rusqlite::types::Value::Integer(x) => Value::BigInt(Some(x)),
251                    rusqlite::types::Value::Real(x) => Value::Double(Some(x)),
252                    rusqlite::types::Value::Text(x) => Value::String(Some(Box::new(x))),
253                    rusqlite::types::Value::Blob(_) => todo!(),
254                };
255                update.value(*field, Expr::val(val));
256            }
257            Ok(())
258        })
259        .unwrap();
260
261        let (query, args) = update.build_rusqlite(SqliteQueryBuilder);
262
263        let mut stmt = self.transaction.prepare_cached(&query).unwrap();
264        match stmt.execute(&*args.as_params()) {
265            Ok(1) => Ok(()),
266            Ok(n) => panic!("unexpected number of updates: {n}"),
267            Err(rusqlite::Error::SqliteFailure(kind, Some(_val)))
268                if kind.code == ErrorCode::ConstraintViolation =>
269            {
270                // val looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
271                let conflict = self.query_one(val.get_conflict_unchecked());
272                Err(conflict.unwrap())
273            }
274            Err(err) => Err(err).unwrap(),
275        }
276    }
277
278    /// This is a convenience function to use [TransactionMut::try_update] on tables without
279    /// unique constraints.
280    pub fn update<T: Table<Schema = S>>(
281        &mut self,
282        row: impl IntoColumn<'t, S, Typ = T>,
283        val: impl Writable<'t, T = T, Conflict = Infallible, Schema = S>,
284    ) {
285        let Ok(()) = self.try_update(row, val);
286    }
287
288    /// This is a convenience function to use [TransactionMut::try_update] on tables with
289    /// exactly one unique constraint.
290    ///
291    /// This function works slightly different in that it does not receive a row reference.
292    /// Instead it tries to update the row that would conflict if the new row would be inserted.
293    /// When such a conflicting row is found, it is updated to the new column values and [Ok] is
294    /// returned with a reference to the found row.
295    /// If it can not find a conflicting row, then nothing happens and the function returns [Err]
296    pub fn find_and_update<T: Table<Schema = S>>(
297        &mut self,
298        val: impl Writable<'t, T = T, Conflict = TableRow<'t, T>, Schema = S>,
299    ) -> Result<TableRow<'t, T>, ()> {
300        match self.query_one(val.get_conflict_unchecked()) {
301            Some(row) => {
302                self.try_update(row, val).unwrap();
303                Ok(row)
304            }
305            None => Err(()),
306        }
307    }
308
309    /// Make the changes made in this [TransactionMut] permanent.
310    ///
311    /// If the [TransactionMut] is dropped without calling this function, then the changes are rolled back.
312    pub fn commit(self) {
313        self.inner.transaction.commit().unwrap();
314    }
315
316    pub fn downgrade(self) -> TransactionWeak<'t, S> {
317        TransactionWeak { inner: self }
318    }
319}
320
321/// This is the weak version of [TransactionMut].
322///
323/// The reason that it is called `weak` is because [TransactionWeak] can not guarantee
324/// that [TableRow]s prove the existence of their particular row.
325///
326/// [TransactionWeak] is useful because it allowes deleting rows.
327pub struct TransactionWeak<'t, S> {
328    inner: TransactionMut<'t, S>,
329}
330
331impl<'t, S: 'static> TransactionWeak<'t, S> {
332    /// Try to delete a row from the database.
333    ///
334    /// This will return an [Err] if there is a row that references the row that is being deleted.
335    /// When this method returns [Ok] it will contain a [bool] that is either
336    /// - `true` if the row was just deleted.
337    /// - `false` if the row was deleted previously in this transaction.
338    pub fn try_delete<T: Table<Schema = S>>(
339        &mut self,
340        val: TableRow<'t, T>,
341    ) -> Result<bool, T::Referer> {
342        let stmt = DeleteStatement::new()
343            .from_table(Alias::new(T::NAME))
344            .cond_where(Expr::col(Alias::new(T::ID)).eq(val.idx))
345            .to_owned();
346
347        let (query, args) = stmt.build_rusqlite(SqliteQueryBuilder);
348        let mut stmt = self.inner.transaction.prepare_cached(&query).unwrap();
349
350        match stmt.execute(&*args.as_params()) {
351            Ok(0) => Ok(false),
352            Ok(1) => Ok(true),
353            Ok(n) => {
354                panic!("unexpected number of deletes {n}")
355            }
356            Err(rusqlite::Error::SqliteFailure(kind, Some(_val)))
357                if kind.code == ErrorCode::ConstraintViolation =>
358            {
359                // Some foreign key constraint got violated
360                Err(T::get_referer_unchecked())
361            }
362            Err(err) => Err(err).unwrap(),
363        }
364    }
365
366    /// Delete a row from the database.
367    ///
368    /// This is the infallible version of [TransactionWeak::try_delete].
369    ///
370    /// To be able to use this method you have to mark the table as `#[no_reference]` in the schema.
371    pub fn delete<T: Table<Referer = Infallible, Schema = S>>(
372        &mut self,
373        val: TableRow<'t, T>,
374    ) -> bool {
375        self.try_delete(val).unwrap()
376    }
377
378    /// This allows you to do anything you want with the internal [rusqlite::Transaction]
379    ///
380    /// **Warning:** [TransactionWeak::unchecked_transaction] makes it possible to break the
381    /// invariants that [rust_query] relies on to avoid panics at run-time. It should
382    /// therefore be avoided whenever possible.
383    ///
384    /// The specific version of rusqlite used is not stable. This means the [rusqlite]
385    /// version might change as part of a non breaking version update of [rust_query].
386    #[cfg(feature = "unchecked_transaction")]
387    pub fn unchecked_transaction(&mut self) -> &rusqlite::Transaction {
388        &self.inner.transaction
389    }
390
391    /// Make the changes made in this [TransactionWeak] permanent.
392    ///
393    /// If the [TransactionWeak] is dropped without calling this function, then the changes are rolled back.
394    pub fn commit(self) {
395        self.inner.commit();
396    }
397}