rust_query/
transaction.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
use std::{convert::Infallible, marker::PhantomData, ops::Deref};

use ref_cast::RefCast;
use rusqlite::ErrorCode;
use sea_query::{Alias, Expr, InsertStatement, SqliteQueryBuilder, UpdateStatement, Value};
use sea_query_rusqlite::RusqliteBinder;

use crate::{
    alias::Field,
    ast::MySelect,
    client::private_exec,
    exec::Query,
    insert::{Reader, Writable},
    migrate::schema_version,
    private::Dummy,
    token::LocalClient,
    IntoColumn, Table, TableRow,
};

/// [Database] is a proof that the database has been configured.
///
/// For information on how to create transactions, please refer to [LocalClient].
///
/// Creating a [Database] requires going through the steps to migrate an existing database to
/// the required schema, or creating a new database from scratch.
/// Having done the setup to create a compatible database is sadly not a guarantee that the
/// database will stay compatible for the lifetime of the [Database].
///
/// That is why [Database] also stores the `schema_version`. This allows detecting non-malicious
/// modifications to the schema and gives us the ability to panic when this is detected.
/// Such non-malicious modification of the schema can happen for example if another [Database]
/// instance is created with additional migrations (e.g. by another newer instance of your program).
///
/// # Sqlite config
///
/// Sqlite is configured to be in [WAL mode](https://www.sqlite.org/wal.html).
/// The effect of this mode is that there can be any number of readers with one concurrent writer.
/// What is nice about this is that a [Transaction] can always be made immediately.
/// Making a [TransactionMut] has to wait until all other [TransactionMut]s are finished.
///
/// Sqlite is also configured with [`synchronous=NORMAL`](https://www.sqlite.org/pragma.html#pragma_synchronous). This gives better performance by fsyncing less.
/// The database will not lose transactions due to application crashes, but it might due to system crashes or power loss.
pub struct Database<S> {
    pub(crate) manager: r2d2_sqlite::SqliteConnectionManager,
    pub(crate) schema_version: i64,
    pub(crate) schema: PhantomData<S>,
}

/// [Transaction] can be used to query the database.
///
/// From the perspective of a [Transaction] each [TransactionMut] is fully applied or not at all.
/// Futhermore, the effects of [TransactionMut]s have a global order.
/// So if we have mutations `A` and then `B`, it is impossible for a [Transaction] to see the effect of `B` without seeing the effect of `A`.
///
/// All [TableRow] references retrieved from the database live for at most `'a`.
/// This makes these references effectively local to this [Transaction].
#[derive(RefCast)]
#[repr(transparent)]
pub struct Transaction<'a, S> {
    pub(crate) transaction: rusqlite::Transaction<'a>,
    pub(crate) _p: PhantomData<fn(&'a S) -> &'a S>,
    pub(crate) _local: PhantomData<LocalClient>,
}

/// Same as [Transaction], but allows inserting new rows.
///
/// [TransactionMut] always uses the latest version of the database, with the effects of all previous [TransactionMut]s applied.
///
/// To make mutations to the database permanent you need to use [TransactionMut::commit].
/// This is to make sure that if a function panics while holding a mutable transaction, it will roll back those changes.
pub struct TransactionMut<'a, S> {
    pub(crate) inner: Transaction<'a, S>,
}

impl<'a, S> Deref for TransactionMut<'a, S> {
    type Target = Transaction<'a, S>;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

impl<'t, S> Transaction<'t, S> {
    /// This will check the schema version and panic if it is not as expected
    pub(crate) fn new_checked(txn: rusqlite::Transaction<'t>, expected: i64) -> Self {
        if schema_version(&txn) != expected {
            panic!("The database schema was updated unexpectedly")
        }

        Transaction {
            transaction: txn,
            _p: PhantomData,
            _local: PhantomData,
        }
    }

    /// Execute a query with multiple results.
    ///
    /// Please take a look at the documentation of [Query] for how to use it.
    pub fn query<F, R>(&self, f: F) -> R
    where
        F: for<'a> FnOnce(&'a mut Query<'t, 'a, S>) -> R,
    {
        // Execution already happens in a [Transaction].
        // and thus any [TransactionMut] that it might be borrowed
        // from are borrowed immutably, so the rows can not change.
        private_exec(&self.transaction, f)
    }

    /// Retrieve a single result from the database.
    ///
    /// Instead of using [Self::query_one] in a loop, it is better to
    /// call [Self::query] and return all results at once.
    pub fn query_one<O>(&self, val: impl Dummy<'t, 't, S, Out = O>) -> O
    where
        S: 'static,
    {
        // Theoretically this doesn't even need to be in a transaction.
        // We already have one though, so we must use it.
        let mut res = private_exec(&self.transaction, |e| {
            // Cast the static lifetime to any lifetime necessary, this is fine because we know the static lifetime
            // can not be guaranteed by a query scope.
            e.into_vec_private(val)
        });
        res.pop().unwrap()
    }
}

impl<'t, S: 'static> TransactionMut<'t, S> {
    /// Try inserting a value into the database.
    ///
    /// Returns [Ok] with a reference to the new inserted value or an [Err] with conflict information.
    /// The type of conflict information depends on the number of unique constraints on the table:
    /// - 0 unique constraints => [Infallible]
    /// - 1 unique constraint => [TableRow] reference to the conflicting table row.
    /// - 2+ unique constraints => [()] no further information is provided.
    pub fn try_insert<T: Table<Schema = S>, C>(
        &mut self,
        val: impl Writable<'t, T = T, Conflict = C, Schema = S>,
    ) -> Result<TableRow<'t, T>, C> {
        let ast = MySelect::default();

        let reader = Reader {
            ast: &ast,
            _p: PhantomData,
            _p2: PhantomData,
        };
        val.read(reader);

        let select = ast.simple();

        let mut insert = InsertStatement::new();
        let names = ast.select.iter().map(|(_field, name)| *name);
        insert.into_table(Alias::new(T::NAME));
        insert.columns(names);
        insert.select_from(select).unwrap();
        insert.returning_col(Alias::new(T::ID));

        let (sql, values) = insert.build_rusqlite(SqliteQueryBuilder);

        let mut statement = self.transaction.prepare_cached(&sql).unwrap();
        let mut res = statement
            .query_map(&*values.as_params(), |row| row.get(T::ID))
            .unwrap();

        match res.next().unwrap() {
            Ok(id) => Ok(id),
            Err(rusqlite::Error::SqliteFailure(kind, Some(_val)))
                if kind.code == ErrorCode::ConstraintViolation =>
            {
                // val looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
                let conflict = self.query_one(val.get_conflict_unchecked());
                Err(conflict.unwrap())
            }
            Err(err) => Err(err).unwrap(),
        }
    }

    /// This is a convenience function to make using [TransactionMut::try_insert]
    /// easier for tables without unique constraints.
    ///
    /// The new row is added to the table and the row reference is returned.
    pub fn insert<T: Table<Schema = S>>(
        &mut self,
        val: impl Writable<'t, T = T, Conflict = Infallible, Schema = S>,
    ) -> TableRow<'t, T> {
        let Ok(row) = self.try_insert(val);
        row
    }

    /// This is a convenience function to make using [TransactionMut::try_insert]
    /// easier for tables with exactly one unique constraints.
    ///
    /// The new row is inserted and the reference to the row is returned OR
    /// an existing row is found which conflicts with the new row and a reference
    /// to the conflicting row is returned.
    pub fn find_or_insert<T: Table<Schema = S>>(
        &mut self,
        val: impl Writable<'t, T = T, Conflict = TableRow<'t, T>, Schema = S>,
    ) -> TableRow<'t, T> {
        match self.try_insert(val) {
            Ok(row) => row,
            Err(row) => row,
        }
    }

    /// Try updating a row in the database to have new column values.
    ///
    /// Updating can fail just like [TransactionMut::try_insert] because of unique constraint conflicts.
    /// This happens when the new values are in conflict with an existing different row.
    ///
    /// When the update succeeds, this function returns [Ok<()>], when it fails it returns [Err] with one of
    /// three conflict types:
    /// - 0 unique constraints => [Infallible]
    /// - 1 unique constraint => [TableRow] reference to the conflicting table row.
    /// - 2+ unique constraints => [()] no further information is provided.
    pub fn try_update<T: Table<Schema = S>, C>(
        &mut self,
        row: impl IntoColumn<'t, S, Typ = T>,
        val: impl Writable<'t, T = T, Conflict = C, Schema = S>,
    ) -> Result<(), C> {
        let ast = MySelect::default();

        let reader = Reader {
            ast: &ast,
            _p: PhantomData,
            _p2: PhantomData,
        };
        val.read(reader);

        let select = ast.simple();
        let (query, args) = select.build_rusqlite(SqliteQueryBuilder);
        let mut stmt = self.transaction.prepare_cached(&query).unwrap();

        let row_id = self.query_one(row).idx;
        let mut update = UpdateStatement::new()
            .table(Alias::new(T::NAME))
            .cond_where(Expr::val(row_id).equals(Alias::new(T::ID)))
            .to_owned();

        stmt.query_row(&*args.as_params(), |row| {
            for (_, field) in ast.select.iter() {
                let Field::Str(name) = field else { panic!() };

                let val = match row.get_unwrap::<&str, rusqlite::types::Value>(*name) {
                    rusqlite::types::Value::Null => Value::BigInt(None),
                    rusqlite::types::Value::Integer(x) => Value::BigInt(Some(x)),
                    rusqlite::types::Value::Real(x) => Value::Double(Some(x)),
                    rusqlite::types::Value::Text(x) => Value::String(Some(Box::new(x))),
                    rusqlite::types::Value::Blob(_) => todo!(),
                };
                update.value(*field, Expr::val(val));
            }
            Ok(())
        })
        .unwrap();

        let (query, args) = update.build_rusqlite(SqliteQueryBuilder);

        let mut stmt = self.transaction.prepare_cached(&query).unwrap();
        match stmt.execute(&*args.as_params()) {
            Ok(1) => Ok(()),
            Ok(n) => panic!("unexpected number of updates: {n}"),
            Err(rusqlite::Error::SqliteFailure(kind, Some(_val)))
                if kind.code == ErrorCode::ConstraintViolation =>
            {
                // val looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
                let conflict = self.query_one(val.get_conflict_unchecked());
                Err(conflict.unwrap())
            }
            Err(err) => Err(err).unwrap(),
        }
    }

    /// This is a convenience function to use [TransactionMut::try_update] on tables without
    /// unique constraints.
    pub fn update<T: Table<Schema = S>>(
        &mut self,
        row: impl IntoColumn<'t, S, Typ = T>,
        val: impl Writable<'t, T = T, Conflict = Infallible, Schema = S>,
    ) {
        let Ok(()) = self.try_update(row, val);
    }

    /// This is a convenience function to use [TransactionMut::try_update] on tables with
    /// exactly one unique constraint.
    ///
    /// This function works slightly different in that it does not receive a row reference.
    /// Instead it tries to update the row that would conflict if the new row would be inserted.
    /// When such a conflicting row is found, it is updated to the new column values and [Ok] is
    /// returned with a reference to the found row.
    /// If it can not find a conflicting row, then nothing happens and the function returns [Err]
    pub fn find_and_update<T: Table<Schema = S>>(
        &mut self,
        val: impl Writable<'t, T = T, Conflict = TableRow<'t, T>, Schema = S>,
    ) -> Result<TableRow<'t, T>, ()> {
        match self.query_one(val.get_conflict_unchecked()) {
            Some(row) => {
                self.try_update(row, val).unwrap();
                Ok(row)
            }
            None => Err(()),
        }
    }

    /// Make the changes made in this [TransactionMut] permanent.
    ///
    /// If the [TransactionMut] is dropped without calling this function, then the changes are rolled back.
    pub fn commit(self) {
        self.inner.transaction.commit().unwrap()
    }
}