rust-query 0.9.1

A query builder using rust concepts.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
use std::{
    cell::RefCell, convert::Infallible, marker::PhantomData, rc::Rc, sync::atomic::AtomicI64,
};

use rusqlite::ErrorCode;
use self_cell::{MutBorrow, self_cell};

use crate::{
    IntoExpr, IntoSelect, Table, TableRow,
    error::FromConflict,
    lower::{
        self, emit,
        list_writer::{Alias, ListWriter},
        ord_rc::OrdRc,
    },
    migrate::{Schema, check_schema, schema_version, user_version},
    migration::Config,
    mutable::Mutable,
    pool::Pool,
    private::{IntoJoinable, Reader},
    query::{OwnedRows, Query, track_stmt},
    rows::Rows,
    value::{DbTyp, OptTable},
};

/// [Database] is a proof that the database has been configured.
///
/// Creating a [Database] requires going through the steps to migrate an existing database to
/// the required schema, or creating a new database from scratch (See also [crate::migration::Config]).
/// Please see [Database::migrator] to get started.
///
/// Having done the setup to create a compatible database is sadly not a guarantee that the
/// database will stay compatible for the lifetime of the [Database] struct.
/// That is why [Database] also stores the `schema_version`. This allows detecting non-malicious
/// modifications to the schema and gives us the ability to panic when this is detected.
/// Such non-malicious modification of the schema can happen for example if another [Database]
/// instance is created with additional migrations (e.g. by another newer instance of your program).
pub struct Database<S> {
    pub(crate) pool: Pool,
    pub(crate) schema_version: AtomicI64,
    pub(crate) schema: PhantomData<S>,
    pub(crate) mut_lock: parking_lot::FairMutex<()>,
}

impl<S: Schema> Database<S> {
    /// This is a quick way to open a database if you don't care about migration.
    ///
    /// Note that this will panic if the schema version doesn't match or when the schema
    /// itself doesn't match the expected schema.
    pub fn new(config: Config) -> Self {
        let Some(m) = Self::migrator(config) else {
            panic!("schema version {}, but got an older version", S::VERSION)
        };
        let Some(m) = m.finish() else {
            panic!("schema version {}, but got a new version", S::VERSION)
        };
        m
    }
}

use rusqlite::Connection;
type RTransaction<'x> = Option<rusqlite::Transaction<'x>>;

self_cell!(
    pub struct OwnedTransaction {
        owner: MutBorrow<Connection>,

        #[covariant]
        dependent: RTransaction,
    }
);

/// SAFETY:
/// `RTransaction: !Send` because it borrows from `Connection` and `Connection: !Sync`.
/// `OwnedTransaction` can be `Send` because we know that `dependent` is the only
/// borrow of `owner` and `OwnedTransaction: !Sync` so `dependent` can not be borrowed
/// from multiple threads.
unsafe impl Send for OwnedTransaction {}
assert_not_impl_any! {OwnedTransaction: Sync}

thread_local! {
    pub(crate) static TXN: RefCell<Option<TransactionWithRows>> = const { RefCell::new(None) };
}

impl OwnedTransaction {
    pub(crate) fn get(&self) -> &rusqlite::Transaction<'_> {
        self.borrow_dependent().as_ref().unwrap()
    }

    pub(crate) fn with(
        mut self,
        f: impl FnOnce(rusqlite::Transaction<'_>),
    ) -> rusqlite::Connection {
        self.with_dependent_mut(|_, b| f(b.take().unwrap()));
        self.into_owner().into_inner()
    }
}

type OwnedRowsVec<'x> = slab::Slab<OwnedRows<'x>>;
self_cell!(
    pub struct TransactionWithRows {
        owner: OwnedTransaction,

        #[not_covariant]
        dependent: OwnedRowsVec,
    }
);

impl TransactionWithRows {
    pub(crate) fn new_empty(txn: OwnedTransaction) -> Self {
        Self::new(txn, |_| slab::Slab::new())
    }

    pub(crate) fn get(&self) -> &rusqlite::Transaction<'_> {
        self.borrow_owner().get()
    }
}

impl<S: Send + Sync + Schema> Database<S> {
    #[doc = include_str!("database/transaction.md")]
    pub fn transaction<R: Send>(&self, f: impl Send + FnOnce(&'static Transaction<S>) -> R) -> R {
        let res = std::thread::scope(|scope| scope.spawn(|| self.transaction_local(f)).join());
        match res {
            Ok(val) => val,
            Err(payload) => std::panic::resume_unwind(payload),
        }
    }

    /// Same as [Self::transaction], but can only be used on a new thread.
    pub(crate) fn transaction_local<R>(&self, f: impl FnOnce(&'static Transaction<S>) -> R) -> R {
        let conn = self.pool.pop();

        let owned = OwnedTransaction::new(MutBorrow::new(conn), |conn| {
            Some(conn.borrow_mut().transaction().unwrap())
        });

        let res = f(Transaction::new_checked(owned, &self.schema_version));

        let owned = TXN.take().unwrap().into_owner();
        self.pool.push(owned.into_owner().into_inner());

        res
    }

    #[doc = include_str!("database/transaction_mut.md")]
    pub fn transaction_mut<O: Send, E: Send>(
        &self,
        f: impl Send + FnOnce(&'static mut Transaction<S>) -> Result<O, E>,
    ) -> Result<O, E> {
        let join_res =
            std::thread::scope(|scope| scope.spawn(|| self.transaction_mut_local(f)).join());

        match join_res {
            Ok(val) => val,
            Err(payload) => std::panic::resume_unwind(payload),
        }
    }

    pub(crate) fn transaction_mut_local<O, E>(
        &self,
        f: impl FnOnce(&'static mut Transaction<S>) -> Result<O, E>,
    ) -> Result<O, E> {
        // Acquire the lock before creating the connection.
        // Technically we can acquire the lock later, but we don't want to waste
        // file descriptors on transactions that need to wait anyway.
        let guard = self.mut_lock.lock();

        let conn = self.pool.pop();

        let owned = OwnedTransaction::new(MutBorrow::new(conn), |conn| {
            let txn = conn
                .borrow_mut()
                .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
                .unwrap();
            Some(txn)
        });
        // if this panics then the transaction is rolled back and the guard is dropped.
        let res = f(Transaction::new_checked(owned, &self.schema_version));

        // Drop the guard before commiting to let sqlite go to the next transaction
        // more quickly while guaranteeing that the database will unlock soon.
        drop(guard);

        let owned = TXN.take().unwrap().into_owner();

        let conn = if res.is_ok() {
            owned.with(|x| x.commit().unwrap())
        } else {
            owned.with(|x| x.rollback().unwrap())
        };
        self.pool.push(conn);

        res
    }

    #[doc = include_str!("database/transaction_mut_ok.md")]
    pub fn transaction_mut_ok<R: Send>(
        &self,
        f: impl Send + FnOnce(&'static mut Transaction<S>) -> R,
    ) -> R {
        self.transaction_mut(|txn| Ok::<R, Infallible>(f(txn)))
            .unwrap()
    }

    /// Create a new [rusqlite::Connection] to the database.
    ///
    /// You can do (almost) anything you want with this connection as it is almost completely isolated from all other
    /// [rust_query] connections. The only thing you should not do here is changing the schema.
    /// Schema changes are detected with the `schema_version` pragma and will result in a panic when creating a new
    /// [rust_query] transaction.
    ///
    /// The `foreign_keys` pragma is always enabled here, even if [crate::migration::ForeignKeys::SQLite] is not used.
    ///
    /// Note that many systems have a limit on the number of file descriptors that can
    /// exist in a single process. On my machine the soft limit is (1024) by default.
    /// If this limit is reached, it may cause a panic in this method.
    pub fn rusqlite_connection(&self) -> rusqlite::Connection {
        let conn = self.pool.pop();
        conn.pragma_update(None, "foreign_keys", "ON").unwrap();
        conn
    }
}

/// [Transaction] can be used to query and update the database.
///
/// From the perspective of a [Transaction] each other [Transaction] is fully applied or not at all.
/// Futhermore, the effects of [Transaction]s have a global order.
/// So if we have mutations `A` and then `B`, it is impossible for a [Transaction] to see the effect of `B` without seeing the effect of `A`.
pub struct Transaction<S> {
    pub(crate) _p2: PhantomData<S>,
    pub(crate) _local: PhantomData<*const ()>,
}

impl<S> Transaction<S> {
    pub(crate) fn new() -> Self {
        Self {
            _p2: PhantomData,
            _local: PhantomData,
        }
    }

    pub(crate) fn copy(&self) -> Self {
        Self::new()
    }

    pub(crate) fn new_ref() -> &'static mut Self {
        // no memory is leaked because Self is zero sized
        Box::leak(Box::new(Self::new()))
    }
}

impl<S: Schema> Transaction<S> {
    /// This will check the schema version and panic if it is not as expected
    pub(crate) fn new_checked(txn: OwnedTransaction, expected: &AtomicI64) -> &'static mut Self {
        let schema_version = schema_version(txn.get());
        // If the schema version is not the expected version then we
        // check if the changes are acceptable.
        if schema_version != expected.load(std::sync::atomic::Ordering::Relaxed) {
            if user_version(txn.get()).unwrap() != S::VERSION {
                panic!("The database user_version changed unexpectedly")
            }

            TXN.set(Some(TransactionWithRows::new_empty(txn)));
            check_schema::<S>(Self::new_ref(), false);
            expected.store(schema_version, std::sync::atomic::Ordering::Relaxed);
        } else {
            TXN.set(Some(TransactionWithRows::new_empty(txn)));
        }

        const {
            assert!(size_of::<Self>() == 0);
        }
        Self::new_ref()
    }
}

impl<S> Transaction<S> {
    /// Execute a query with multiple results.
    ///
    /// ```
    /// # use rust_query::{private::doctest::*};
    /// # get_txn(|txn| {
    /// let user_names = txn.query(|rows| {
    ///     let user = rows.join(User);
    ///     rows.into_vec(&user.name)
    /// });
    /// assert_eq!(user_names, vec!["Alice".to_owned()]);
    /// # });
    /// ```
    pub fn query<'t, R>(&'t self, f: impl FnOnce(&mut Query<'t, '_, S>) -> R) -> R {
        // Execution already happens in a [Transaction].
        // and thus any [TransactionMut] that it might be borrowed
        // from is borrowed immutably, which means the rows can not change.

        let q = Rows {
            phantom: PhantomData,
            ast: Default::default(),
            _p: PhantomData,
        };
        f(&mut Query {
            q,
            phantom: PhantomData,
        })
    }

    /// Retrieve a single result from the database.
    ///
    /// ```
    /// # use rust_query::{private::doctest::*, IntoExpr};
    /// # rust_query::private::doctest::get_txn(|txn| {
    /// let res = txn.query_one("test".into_expr());
    /// assert_eq!(res, "test");
    /// # });
    /// ```
    ///
    /// Instead of using [Self::query_one] in a loop, it is better to
    /// call [Self::query] and return all results at once.
    pub fn query_one<O: 'static>(&self, val: impl IntoSelect<'static, S, Out = O>) -> O {
        let mut query = self.query(|e| e.into_iter(val.into_select()));
        let res = query.next().unwrap();
        debug_assert!(query.next().is_none(), "query should return one row");
        res
    }

    /// Retrieve a [crate::Lazy] value from the database.
    ///
    /// This is very similar to [Self::query_one], except that it retrieves
    /// [crate::Lazy] instead of [TableRow]. As such it only works with
    /// table valued [rust_query::Expr].
    ///
    /// ```
    /// # #[rust_query::migration::schema(M)]
    /// # pub mod vN {
    /// #     pub struct Author {
    /// #         pub name: String,
    /// #     }
    /// #     pub struct Page {
    /// #         pub content: String,
    /// #         pub title: String,
    /// #         pub author: rust_query::TableRow<Author>,
    /// #     }
    /// # }
    /// # use v0::*;
    /// # rust_query::Database::new(rust_query::migration::Config::open_in_memory()).transaction_mut_ok(|txn| {
    /// let cat = txn.insert_ok(Author {
    ///     name: "Cat".to_owned()
    /// });
    /// let blog_post = txn.insert_ok(Page {
    ///     content: "Hello world!".to_owned(),
    ///     title: "Hi".to_owned(),
    ///     author: cat,
    /// });
    /// let lazy_post = txn.lazy(blog_post);
    ///
    /// println!("{}:", lazy_post.title);
    /// println!("{}", lazy_post.content);
    /// println!("written by: {}", lazy_post.author.name);
    /// # });
    /// ```
    pub fn lazy<'t, T: OptTable<Schema = S>>(
        &'t self,
        val: impl IntoExpr<'static, S, Typ = T>,
    ) -> T::Lazy<'t> {
        T::out_to_lazy(self.query_one(val.into_expr()))
    }

    /// This retrieves an iterator of [crate::Lazy] values.
    ///
    /// Refer to [Rows::join] for the kind of the parameter that is supported here.
    /// Refer to [Transaction::lazy] for the single row version.
    pub fn lazy_iter<'t, T: Table<Schema = S>>(
        &'t self,
        val: impl IntoJoinable<'static, S, Typ = TableRow<T>>,
    ) -> LazyIter<'t, T> {
        let val = val.into_joinable();
        self.query(|rows| {
            let table = rows.join(val);
            LazyIter {
                txn: self,
                iter: rows.into_iter(table),
            }
        })
    }

    /// Retrieves a [Mutable] row from the database.
    ///
    /// The [Transaction] is borrowed mutably until the [Mutable] is dropped.
    /// It is recommended to keep the lifetime of [Mutable] short, to prevent borrow checker errors.
    /// See below for some good examples of how to use [Transaction::mutable].
    ///
    /// ```
    /// # #[rust_query::migration::schema(M)]
    /// # pub mod vN {
    /// #     pub struct Player {
    /// #         #[unique]
    /// #         pub number: i64,
    /// #         #[unique]
    /// #         pub name: String,
    /// #         pub score: i64,
    /// #     }
    /// # }
    /// # use v0::*;
    /// # rust_query::Database::new(rust_query::migration::Config::open_in_memory()).transaction_mut_ok(|txn| {
    /// # txn.insert(Player {number: 5, name: "Floris".to_owned(), score: 0});
    /// if let Some(mut player) = txn.mutable(Player.number(1)) {
    ///     player.score += 100;
    /// }
    ///
    /// let floris = txn.query_one(Player.name("Floris")).unwrap();
    /// txn.mutable(floris).score += 50;
    /// # });
    /// ```
    pub fn mutable<'t, T: OptTable<Schema = S>>(
        &'t mut self,
        val: impl IntoExpr<'static, S, Typ = T>,
    ) -> T::Mutable<'t> {
        let x = self.query_one(T::select_opt_mutable(val.into_expr()));
        T::into_mutable(x)
    }

    /// Retrieve multiple [Mutable] rows from the database.
    ///
    /// Refer to [Rows::join] for the kind of the parameter that is supported here.
    /// This may be useful when you need mutable access to multiple rows (potentially at the same time).
    ///
    /// ```
    /// # #[rust_query::migration::schema(M)]
    /// # pub mod vN {
    /// #     pub struct User { pub age: i64 }
    /// # }
    /// # use v0::*;
    /// # rust_query::Database::new(rust_query::migration::Config::open_in_memory()).transaction_mut_ok(|txn| {
    /// # txn.insert_ok(User {age: 30});
    /// for mut user in txn.mutable_vec(User) {
    ///     user.age += 1;
    /// }
    /// # });
    /// ```
    pub fn mutable_vec<'t, T: Table<Schema = S>>(
        &'t mut self,
        val: impl IntoJoinable<'static, S, Typ = TableRow<T>>,
    ) -> Vec<Mutable<'t, T>> {
        let val = val.into_joinable();
        self.query(|rows| {
            let val = rows.join(val);
            rows.into_vec((T::into_select(val.clone()), val))
                .into_iter()
                .map(TableRow::<T>::into_mutable)
                .collect()
        })
    }
}

pub struct LazyIter<'t, T: Table> {
    txn: &'t Transaction<T::Schema>,
    iter: crate::query::Iter<'t, TableRow<T>>,
}

impl<'t, T: Table> Iterator for LazyIter<'t, T> {
    type Item = crate::Lazy<'t, T>;

    fn next(&mut self) -> Option<Self::Item> {
        self.iter.next().map(|x| self.txn.lazy(x))
    }
}

impl<S: 'static> Transaction<S> {
    /// Try inserting a value into the database.
    ///
    /// Returns [Ok] with a reference to the new inserted value or an [Err] with conflict information.
    /// The type of conflict information depends on the number of unique constraints on the table:
    /// - 0 unique constraints => [Infallible]
    /// - 1 unique constraint => [TableRow] reference to the conflicting table row.
    /// - 2+ unique constraints => [crate::Conflict].
    ///
    /// ```
    /// # use rust_query::{private::doctest::*, IntoExpr};
    /// # rust_query::private::doctest::get_txn(|mut txn| {
    /// let res = txn.insert(User {
    ///     name: "Bob".to_owned(),
    /// });
    /// assert!(res.is_ok());
    /// let res = txn.insert(User {
    ///     name: "Bob".to_owned(),
    /// });
    /// assert!(res.is_err(), "there is a unique constraint on the name");
    /// # });
    /// ```
    pub fn insert<T: Table<Schema = S>>(&mut self, val: T) -> Result<TableRow<T>, T::Conflict> {
        try_insert_private(lower::JoinableTable::Table(T::NAME), None, val)
    }

    /// This is a convenience function to make using [Transaction::insert]
    /// easier for tables without unique constraints.
    ///
    /// The new row is added to the table and the row reference is returned.
    pub fn insert_ok<T: Table<Schema = S, Conflict = Infallible>>(
        &mut self,
        val: T,
    ) -> TableRow<T> {
        let Ok(row) = self.insert(val);
        row
    }

    /// This is a convenience function to make using [Transaction::insert]
    /// easier for tables with exactly one unique constraints.
    ///
    /// The new row is inserted and the reference to the row is returned OR
    /// an existing row is found which conflicts with the new row and a reference
    /// to the conflicting row is returned.
    ///
    /// ```
    /// # use rust_query::{private::doctest::*, IntoExpr};
    /// # rust_query::private::doctest::get_txn(|mut txn| {
    /// let bob = txn.insert(User {
    ///     name: "Bob".to_owned(),
    /// }).unwrap();
    /// let bob2 = txn.find_or_insert(User {
    ///     name: "Bob".to_owned(), // this will conflict with the existing row.
    /// });
    /// assert_eq!(bob, bob2);
    /// # });
    /// ```
    pub fn find_or_insert<T: Table<Schema = S, Conflict = TableRow<T>>>(
        &mut self,
        val: T,
    ) -> TableRow<T> {
        match self.insert(val) {
            Ok(row) => row,
            Err(row) => row,
        }
    }

    pub(crate) fn update<T: Table<Schema = S>>(
        &mut self,
        row: TableRow<T>,
        val: T::Mutable,
    ) -> Result<(), T::Conflict> {
        let val = T::mutable_into_insert(val);
        let mut reader = Reader::default();
        T::read(&val, &mut reader);

        let mut stmt = emit::Stmt::default();
        stmt.write("UPDATE ");
        lower::JoinableTable::Table(T::NAME).emit(&mut stmt);

        stmt.write(" SET ");
        let mut list = ListWriter::new(&mut stmt, ", ");
        for (key, val) in &reader.builder {
            list.item()
                .write(format_args!("{} = ", Alias(key)))
                .write_param(&val);
        }
        list.default(format_args!("{1} = {0}.{1}", Alias(T::NAME), Alias(T::ID)));

        stmt.write(format_args!(
            " WHERE {}.{} = ",
            Alias(T::NAME),
            Alias(T::ID)
        ));
        stmt.write_param(&OrdRc(Rc::new(row.inner.idx.into())));

        let res = TXN.with_borrow(|txn| {
            let txn = txn.as_ref().unwrap().get();

            let mut cached = txn.prepare_cached(&stmt.sql).unwrap();
            cached.execute(rusqlite::params_from_iter(stmt.params))
        });

        match res {
            Ok(1) => Ok(()),
            Ok(n) => panic!("unexpected number of updates: {n}"),
            Err(rusqlite::Error::SqliteFailure(kind, Some(msg)))
                if kind.code == ErrorCode::ConstraintViolation =>
            {
                // `msg` looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
                let res = TXN.with_borrow(|txn| {
                    let txn = txn.as_ref().unwrap().get();
                    <T::Conflict as FromConflict>::from_conflict(
                        txn,
                        lower::JoinableTable::Table(T::NAME),
                        reader.builder,
                        msg,
                    )
                });
                Err(res)
            }
            Err(err) => panic!("{err:?}"),
        }
    }

    /// Convert the [Transaction] into a [TransactionWeak] to allow deletions.
    pub fn downgrade(&'static mut self) -> &'static mut TransactionWeak<S> {
        // TODO: clean this up
        Box::leak(Box::new(TransactionWeak { inner: PhantomData }))
    }
}

/// This is the weak version of [Transaction].
///
/// The reason that it is called `weak` is because [TransactionWeak] can not guarantee
/// that [TableRow]s prove the existence of their particular row.
///
/// [TransactionWeak] is useful because it allowes deleting rows.
pub struct TransactionWeak<S> {
    inner: PhantomData<Transaction<S>>,
}

impl<S: Schema> TransactionWeak<S> {
    /// Try to delete a row from the database.
    ///
    /// This will return an [Err] if there is a row that references the row that is being deleted.
    /// When this method returns [Ok] it will contain a [bool] that is either
    /// - `true` if the row was just deleted.
    /// - `false` if the row was deleted previously in this transaction.
    pub fn delete<T: Table<Schema = S>>(&mut self, val: TableRow<T>) -> Result<bool, T::Referer> {
        let schema = crate::schema::from_macro::Schema::new::<S>();

        // This is a manual check that foreign key constraints are not violated.
        // We do this manually because we don't want to enabled foreign key constraints for the whole
        // transaction (and is not possible to enable for part of a transaction).
        let mut checks = vec![];
        for (&table_name, table) in &schema.tables {
            for col in table.columns.iter().filter_map(|(col_name, col)| {
                let col = &col.def;
                col.fk
                    .as_ref()
                    .is_some_and(|(t, c)| t == T::NAME && c == T::ID)
                    .then_some(col_name)
            }) {
                let mut stmt = emit::Stmt::default();
                stmt.write("SELECT ");
                stmt.write_param(&OrdRc(Rc::new(val.inner.idx.into())));
                stmt.write(format_args!(
                    " IN (SELECT {0}.{1} FROM {0})",
                    Alias(table_name),
                    Alias(col)
                ));
                checks.push(stmt);
            }
        }

        let mut stmt = emit::Stmt::default();
        stmt.write(format_args!(
            "DELETE FROM {0} WHERE {0}.{1} = ",
            Alias(T::NAME),
            Alias(T::ID)
        ));
        stmt.write_param(&OrdRc::new(val.inner.idx));

        TXN.with_borrow(|txn| {
            let txn = txn.as_ref().unwrap().get();

            for stmt in checks {
                let mut cached = txn.prepare_cached(&stmt.sql).unwrap();
                match cached.query_one(rusqlite::params_from_iter(stmt.params), |r| r.get(0)) {
                    Ok(true) => return Err(T::get_referer_unchecked()),
                    Ok(false) => {}
                    Err(err) => panic!("{err:?}"),
                }
            }

            let mut cached = txn.prepare_cached(&stmt.sql).unwrap();
            match cached.execute(rusqlite::params_from_iter(stmt.params)) {
                Ok(0) => Ok(false),
                Ok(1) => Ok(true),
                Ok(n) => {
                    panic!("unexpected number of deletes {n}")
                }
                Err(err) => panic!("{err:?}"),
            }
        })
    }

    /// Delete a row from the database.
    ///
    /// This is the infallible version of [TransactionWeak::delete].
    ///
    /// To be able to use this method you have to mark the table as `#[no_reference]` in the schema.
    pub fn delete_ok<T: Table<Referer = Infallible, Schema = S>>(
        &mut self,
        val: TableRow<T>,
    ) -> bool {
        let Ok(res) = self.delete(val);
        res
    }

    /// This allows you to do (almost) anything you want with the internal [rusqlite::Transaction].
    ///
    /// Note that there are some things that you should not do with the transaction, such as:
    /// - Changes to the schema, these will result in a panic as described in [Database].
    /// - Making changes that violate foreign-key constraints (see below).
    ///
    /// Sadly it is not possible to enable (or disable) the `foreign_keys` pragma during a transaction.
    /// This means that whether this pragma is enabled depends on which [crate::migration::ForeignKeys]
    /// option is used and can not be changed.
    pub fn rusqlite_transaction<R>(&mut self, f: impl FnOnce(&rusqlite::Transaction) -> R) -> R {
        TXN.with_borrow(|txn| f(txn.as_ref().unwrap().get()))
    }
}

pub fn try_insert_private<T: Table>(
    table: lower::JoinableTable,
    idx: Option<i64>,
    val: T,
) -> Result<TableRow<T>, T::Conflict> {
    let mut reader = Reader::default();
    T::read(&val, &mut reader);
    if let Some(idx) = idx {
        reader.col::<i64>(T::ID, idx);
    }

    let mut stmt = emit::Stmt::default();
    stmt.write("INSERT INTO ");
    table.emit(&mut stmt);

    if reader.builder.is_empty() {
        // values always has at least one column, so we leave it out when there are no columns
        stmt.write(" DEFAULT VALUES");
    } else {
        let (col_names, col_exprs): (Vec<_>, Vec<_>) = reader.builder.clone().into_iter().collect();

        stmt.write(" (");
        let mut list = ListWriter::new(&mut stmt, ", ");
        for col in col_names {
            list.item().write(Alias(col));
        }
        stmt.write(") VALUES (");
        let mut list = ListWriter::new(&mut stmt, ", ");
        for val in col_exprs {
            list.item().write_param(&val);
        }
        stmt.write(")");
    }
    stmt.write(" RETURNING ").write(T::ID);

    let res = TXN.with_borrow(|txn| {
        let txn = txn.as_ref().unwrap().get();
        track_stmt(txn, &stmt.sql, &stmt.params);

        let mut statement = txn.prepare_cached(&stmt.sql).unwrap();
        let mut res = statement
            .query_map(rusqlite::params_from_iter(stmt.params), |row| {
                Ok(TableRow::<T>::from_sql(row.get_ref(T::ID)?)?)
            })
            .unwrap();

        res.next().unwrap()
    });

    match res {
        Ok(id) => {
            if let Some(idx) = idx {
                assert_eq!(idx, id.inner.idx);
            }
            Ok(id)
        }
        Err(rusqlite::Error::SqliteFailure(kind, Some(msg)))
            if kind.code == ErrorCode::ConstraintViolation =>
        {
            // val looks like "UNIQUE constraint failed: playlist_track.playlist, playlist_track.track"
            let res = TXN.with_borrow(|txn| {
                let txn = txn.as_ref().unwrap().get();
                <T::Conflict as FromConflict>::from_conflict(txn, table, reader.builder, msg)
            });
            Err(res)
        }
        Err(err) => panic!("{err:?}"),
    }
}