Skip to main content

p2panda_store/
sqlite.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! SQLite database implementation with associated utility functions.
4use std::sync::Arc;
5
6use p2panda_core::cbor::EncodeError;
7use sqlx::migrate::{MigrateDatabase, Migrator};
8use sqlx::sqlite::SqlitePoolOptions;
9use sqlx::{Sqlite, migrate};
10use thiserror::Error;
11use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
12
13/// Creates the SQLite database if it doesn't already exist.
14pub async fn create_database(url: &str) -> Result<(), SqliteError> {
15    if !Sqlite::database_exists(url).await? {
16        Sqlite::create_database(url).await?
17    }
18    Ok(())
19}
20
21/// Drops the SQLite database if it exists.
22pub async fn drop_database(url: &str) -> Result<(), SqliteError> {
23    if Sqlite::database_exists(url).await? {
24        Sqlite::drop_database(url).await?
25    }
26    Ok(())
27}
28
29/// Creates the SQLite connection pool.
30pub async fn connection_pool(
31    url: &str,
32    max_connections: u32,
33) -> Result<sqlx::SqlitePool, SqliteError> {
34    let pool: sqlx::SqlitePool = SqlitePoolOptions::new()
35        .max_connections(max_connections)
36        .connect(url)
37        .await?;
38    Ok(pool)
39}
40
41/// Gets migrations from folder without running them.
42pub fn migrations() -> Migrator {
43    migrate!()
44}
45
46/// Runs any pending database migrations from inside the application.
47pub async fn run_pending_migrations(pool: &sqlx::SqlitePool) -> Result<(), SqliteError> {
48    migrations().run(pool).await?;
49    Ok(())
50}
51
52/// Builder for `SqliteStore`.
53///
54/// To create the database call `SqliteStoreBuilder::build()`.
55///
56/// By default, the builder configures an in-memory database with a maximum number of 16
57/// connections. The database is created if it doesn't already exist and migrations are
58/// automatically run on start-up.
59pub struct SqliteStoreBuilder {
60    url: String,
61    max_connections: u32,
62    run_migrations: bool,
63    create_database: bool,
64}
65
66impl Default for SqliteStoreBuilder {
67    fn default() -> Self {
68        Self {
69            url: "sqlite::memory:".into(),
70            max_connections: 16,
71            create_database: true,
72            run_migrations: true,
73        }
74    }
75}
76
77impl SqliteStoreBuilder {
78    /// Creates a new `SqliteStoreBuilder` using default configuration values.
79    pub fn new() -> Self {
80        Self::default()
81    }
82
83    /// Assigns a randomly-generated in-memory database URL with private cache.
84    #[cfg(any(test, feature = "test_utils"))]
85    pub fn random_memory_url(mut self) -> Self {
86        // Combining Rust tests with in-memory databases can lead to unsound behaviour, this
87        // "workaround" assigns every temporary database a different, random name and keeps them
88        // isolated from other tests.
89        //
90        // See related issue: https://github.com/launchbadge/sqlx/issues/2510
91        self.url = format!(
92            "sqlite://dbmem{}?mode=memory&cache=private",
93            rand::random::<u32>()
94        );
95        self
96    }
97
98    /// Sets the database URL.
99    ///
100    /// If left unset, the database will use an ephemeral in-memory URL.
101    pub fn database_url(mut self, url: &str) -> Self {
102        self.url = url.to_string();
103        self
104    }
105
106    /// Sets the maximum number of connections to be maintained by the database pool.
107    ///
108    /// If left unset, a maximum of 16 connections will be maintained.
109    pub fn max_connections(mut self, max_connections: u32) -> Self {
110        self.max_connections = max_connections;
111        self
112    }
113
114    /// Creates the database if it doesn't already exist.
115    ///
116    /// If left unset, the database will be created by default.
117    pub fn create_database(mut self, create_database: bool) -> Self {
118        self.create_database = create_database;
119        self
120    }
121
122    /// Sets whether pending migrations should be applied when the database is built.
123    ///
124    /// If left unset, the database will apply any pending migrations.
125    pub fn run_default_migrations(mut self, run_migrations: bool) -> Self {
126        self.run_migrations = run_migrations;
127        self
128    }
129
130    /// Builds the `SqliteStore`.
131    pub async fn build(self) -> Result<SqliteStore, SqliteError> {
132        if self.create_database {
133            create_database(&self.url).await?;
134        }
135
136        let pool: sqlx::SqlitePool = SqlitePoolOptions::new()
137            .max_connections(self.max_connections)
138            .connect(&self.url)
139            .await?;
140
141        if self.run_migrations {
142            run_pending_migrations(&pool).await?;
143        }
144
145        Ok(SqliteStore::new(pool))
146    }
147}
148
149/// An in-progress database transaction.
150pub type Transaction<'a> = sqlx::Transaction<'a, Sqlite>;
151
152/// Sqlite connection pool.
153pub type SqlitePool = sqlx::SqlitePool;
154
155/// SQLite database with connection pool and transaction provider.
156///
157/// This struct can be cloned and used in multiple places in the application. Every cloned instance
158/// will re-use the same connection pool and have access to the same transaction instance if one
159/// was started. To guard against sharing transactions unknowingly across unrelated database
160/// queries, a concept of a `TransactionPermit` was introduced which does not protect from misuse
161/// but helps to make "holding" a transaction explicit.
162///
163/// Please note that SQLite strictly serializes transactions with _writes_ and will block any
164/// parallel attempt to begin another one. Processes starting a transaction will acquire a
165/// `TransactionPermit` and keep it until the transaction was committed or rolled back. If the
166/// query only involves _reads_ it is recommended to not use transactions and use the `execute`
167/// method directly as acquiring transactions will potentially block other processes to do work.
168///
169/// ## Design decisions
170///
171/// This storage API design was chosen to make the dynamics of the underlying SQLite database
172/// explicit to avoid potentially introducing subtle bugs. Internally any process can access the
173/// transaction object to do writes and (uncommitted) reads (see "Transaction I" in diagram). Care
174/// is required when designing systems like that as it's still possible to allow concurrent
175/// processes to read and write within the same transaction (for example one process could roll
176/// back the transaction while the other one assumed it will be committed). Usually developers want
177/// to design _writes_ to the database within a transaction if they need consistency and atomicity
178/// guarantees. "Unrelated" queries _can_ be "pooled" in one transaction (for performance reasons
179/// for example) if consistency is guaranteed by all involved processes and the underlying
180/// data-model (see "Transaction II" in diagram).
181///
182/// ```text
183/// Transaction I:
184/// begin ---------------------> commit
185///
186/// Process I:
187///       --> write --> read -->
188///
189///                                             Transaction II:
190///                                             begin ----------------------> commit
191///
192///                                             Process II:
193///                                                   --> write --> write -->
194///
195///                                             Process III:
196///                                                   --> read --> write --->
197/// ```
198///
199/// Another design decision is to not expose transactions to the high-level storage APIs (similar
200/// to the "Repository Pattern"). Users of the storage methods like `get_operation` (in
201/// `OperationStore`) etc. do _not_ need to explicity deal with transaction objects, as this is
202/// handled internally now. Like this it is possible to separate the "logic" from the "storage"
203/// layer and keep the code clean.
204#[derive(Clone, Debug)]
205pub struct SqliteStore {
206    tx: Arc<Mutex<Option<Transaction<'static>>>>,
207    pub(crate) pool: sqlx::SqlitePool,
208    semaphore: Arc<Semaphore>,
209}
210
211impl SqliteStore {
212    /// Creates a new `SqliteStore` using the provided connection pool.
213    pub(crate) fn new(pool: sqlx::SqlitePool) -> Self {
214        Self {
215            tx: Arc::default(),
216            pool,
217            // SQLite only ever allows _one_ transaction at a time. This might be a repetition of
218            // what sqlx and SQLite do under the hood, but we want to make this behaviour explicit
219            // right from the beginning with this semaphore.
220            semaphore: Arc::new(Semaphore::new(1)),
221        }
222    }
223
224    /// Creates a new `SqliteStore` using the provided connection pool.
225    pub fn from_pool(pool: sqlx::SqlitePool) -> Self {
226        Self::new(pool)
227    }
228
229    /// Returns a reference to the connection pool.
230    pub fn pool(&self) -> &sqlx::SqlitePool {
231        &self.pool
232    }
233
234    /// Builds an in-memory SQLite database with a randomised name for testing purposes.
235    #[cfg(any(test, feature = "test_utils"))]
236    pub async fn temporary() -> Self {
237        SqliteStoreBuilder::new()
238            .random_memory_url()
239            .max_connections(1)
240            .build()
241            .await
242            .expect("migrations succeeded")
243    }
244
245    /// Executes a SQL query within a transaction.
246    ///
247    /// This method will return an error when no transaction is currently given. Make sure to call
248    /// `begin` before.
249    ///
250    /// If the query fails the user probably wants to roll back the transaction and free the
251    /// permit. This is _not_ handled automatically.
252    pub async fn tx<F, R>(&self, f: F) -> Result<R, SqliteError>
253    where
254        F: AsyncFnOnce(&mut Transaction) -> Result<R, SqliteError>,
255    {
256        let mut tx_ref = self.tx.lock().await;
257        let tx = tx_ref.as_mut().ok_or(SqliteError::TransactionMissing)?;
258
259        f(tx).await
260    }
261
262    /// Executes a SQL query directly.
263    pub async fn execute<F, R>(&self, f: F) -> Result<R, SqliteError>
264    where
265        F: AsyncFnOnce(&sqlx::SqlitePool) -> Result<R, SqliteError>,
266    {
267        f(&self.pool).await
268    }
269}
270
271impl crate::traits::Transaction for SqliteStore {
272    type Error = SqliteError;
273
274    type Permit = TransactionPermit;
275
276    /// Begins a transaction.
277    ///
278    /// Transactions are strictly serialized, this is expressed in form of a `TransactionPermit`
279    /// processes need to hold when acquiring access to a new transaction. Any concurrent process
280    /// calling it will await here if there's already another process holding a permit, this will
281    /// potentially "slow down" work and should be carefully used.
282    ///
283    /// Any process with a transaction can now start using the `tx` method to execute writes within
284    /// this transaction or perform uncommitted "dirty" reads on it.
285    ///
286    /// It is usually not necessary to acquire a transaction when the logic only requires committed
287    /// _reads_ to the database. Use `execute` instead.
288    async fn begin(&self) -> Result<TransactionPermit, SqliteError> {
289        // Acquire a permit from the semaphore, it will await if currently another process has the
290        // permit. Here we enforce strict serialization of transactions (similar to what SQLite
291        // does under the hood).
292        let permit = self
293            .semaphore
294            .clone()
295            .acquire_owned()
296            .await
297            .expect("if semaphore is closed then the whole struct is gone as well");
298
299        // Access the transaction object which we've placed behind a Mutex. This lock follows a
300        // different logic and only makes sure that mutable access to it is exclusive _within_ a
301        // process "holding" the transaction permit.
302        let mut tx_ref = self.tx.lock().await;
303        assert!(
304            tx_ref.is_none(),
305            "can't have an already existing transaction after an just-acquired permit"
306        );
307        let tx = self.pool.begin().await?;
308        tx_ref.replace(tx);
309
310        Ok(TransactionPermit::new(permit, self.tx.clone()))
311    }
312
313    /// Rolls back the transaction and with that all uncommitted changes.
314    ///
315    /// This takes the permit and frees it after the rollback has finished. Other processes can now
316    /// begin new transactions.
317    async fn rollback(&self, permit: TransactionPermit) -> Result<(), SqliteError> {
318        let Some(tx) = self.tx.lock().await.take() else {
319            panic!("can't have no transaction without dropping permit first")
320        };
321
322        let result = tx.rollback().await.map_err(SqliteError::Sqlite);
323
324        // Always drop the permit, both on successful rollback and error. This will allow other
325        // processes now to begin a new transaction and acquire the permit.
326        permit.mark_committed_and_drop();
327
328        result
329    }
330
331    /// Commits the transaction.
332    ///
333    /// This takes the permit and frees it after the commit has finished. Other processes can now
334    /// begin new transactions.
335    async fn commit(&self, permit: TransactionPermit) -> Result<(), SqliteError> {
336        let Some(tx) = self.tx.lock().await.take() else {
337            panic!("can't have no transaction without dropping permit first")
338        };
339
340        let result = tx.commit().await.map_err(SqliteError::Sqlite);
341
342        // Always drop the permit, both on successful commit and error. This will allow other
343        // processes now to begin a new transaction and acquire the permit.
344        permit.mark_committed_and_drop();
345
346        result
347    }
348}
349
350/// Locked context marking the lifetime of a single transaction.
351pub struct TransactionPermit {
352    permit: Arc<OwnedSemaphorePermit>,
353    tx: Arc<Mutex<Option<Transaction<'static>>>>,
354    committed: bool,
355}
356
357impl TransactionPermit {
358    /// Creates a new `TransactionPermit` using the given permit and transaction.
359    pub(super) fn new(
360        permit: OwnedSemaphorePermit,
361        tx: Arc<Mutex<Option<Transaction<'static>>>>,
362    ) -> Self {
363        Self {
364            permit: Arc::new(permit),
365            tx,
366            committed: false,
367        }
368    }
369
370    /// Marks the transaction as committed and drops the permit.
371    ///
372    /// In the case that the permit was never used, whether due to an early return or error, the
373    /// transaction is automatically rolled-back to prevent corrupted state.
374    pub(super) fn mark_committed_and_drop(mut self) {
375        self.committed = true;
376        drop(self)
377    }
378}
379
380impl Drop for TransactionPermit {
381    fn drop(&mut self) {
382        // If the permit was never used (due to an early return / error / etc.) we automatically
383        // roll-back the transaction.
384        if !self.committed {
385            let permit = self.permit.clone();
386            let tx = self.tx.clone();
387
388            tokio::spawn(async move {
389                if let Some(tx) = tx.lock().await.take() {
390                    let _ = tx.rollback().await;
391                }
392
393                drop(permit); // Semaphore released only after rollback completes.
394            });
395        }
396    }
397}
398
399/// Error when interacting with a SQLite store implementation.
400#[derive(Debug, Error)]
401pub enum SqliteError {
402    /// This is a critical error as it indicates that something is wrong with the usage of this
403    /// API: Queries using transactions can only ever occur if a transaction was started _before_.
404    #[error("tried to interact with inexistant transaction")]
405    TransactionMissing,
406
407    /// SQLite database and connection error.
408    #[error(transparent)]
409    Sqlite(#[from] sqlx::Error),
410
411    /// SQL table schema migration error.
412    #[error(transparent)]
413    Migrate(#[from] sqlx::migrate::MigrateError),
414
415    /// An I/O error occurred while encoding bytes before storing them into the database. This is a
416    /// critical error.
417    #[error("failed encoding '{0}' value before storing to database: {1}")]
418    Encode(String, EncodeError),
419
420    /// Invalid, corrupted data was found in the database. This is a critical error.
421    #[error("could not decode corrupted '{0}' value from database: {1}")]
422    Decode(String, DecodeError),
423}
424
425/// Error decoding value retrieved from a store.
426#[derive(Debug, Error)]
427pub enum DecodeError {
428    #[error(transparent)]
429    DecodeCbor(#[from] p2panda_core::cbor::DecodeError),
430
431    #[error(transparent)]
432    Hash(#[from] p2panda_core::hash::HashError),
433
434    #[error(transparent)]
435    Topic(#[from] p2panda_core::topic::TopicError),
436
437    #[error("parsing from string failed")]
438    FromStr,
439}
440
441#[cfg(test)]
442mod tests {
443    use std::task::Poll;
444
445    use futures_test::task::noop_context;
446    use sqlx::{Executor, query, query_as, query_scalar};
447    use tokio::pin;
448
449    use crate::sqlite::{SqliteError, SqliteStore};
450    use crate::traits::Transaction;
451
452    #[tokio::test]
453    async fn transaction_provider() {
454        let pool = SqliteStore::temporary().await;
455
456        // Executing with an in-existant transaction should throw error.
457        assert!(matches!(
458            pool.tx(async |_| Ok(())).await,
459            Err(SqliteError::TransactionMissing)
460        ));
461
462        // Starting a new transaction should work.
463        let permit = pool.begin().await.expect("no error");
464
465        // .. attempting to start a second one should make us wait.
466        assert!(matches!(
467            {
468                let fut = pool.begin();
469                let mut cx = noop_context();
470                pin!(fut);
471                fut.poll(&mut cx)
472            },
473            Poll::Pending
474        ));
475
476        // Using the transaction should work without failure.
477        assert!(pool.tx(async |_| Ok(())).await.is_ok());
478
479        // Committing should work as well.
480        assert!(pool.commit(permit).await.is_ok());
481
482        // .. and now running a transaction should fail.
483        assert!(matches!(
484            pool.tx(async |_| Ok(())).await,
485            Err(SqliteError::TransactionMissing)
486        ));
487    }
488
489    #[tokio::test]
490    async fn early_permit_drop_causing_rollback() {
491        let pool = SqliteStore::temporary().await;
492
493        // Create test-table schema.
494        pool.execute(async |pool| {
495            pool.execute("CREATE TABLE test(x INTEGER)").await?;
496            Ok(())
497        })
498        .await
499        .unwrap();
500
501        let permit = pool.begin().await.unwrap();
502
503        pool.tx(async |tx| {
504            query("INSERT INTO test (x) VALUES (10)")
505                .execute(&mut **tx)
506                .await?;
507            Ok(())
508        })
509        .await
510        .unwrap();
511
512        // Permit was dropped prematurely without committing.
513        drop(permit);
514
515        // It is okay to start another permit.
516        assert!(pool.begin().await.is_ok());
517
518        // The data was not written as the transaction got rolled back.
519        let count: i64 = pool
520            .execute(async |pool| {
521                query_scalar("SELECT COUNT(*) FROM test")
522                    .fetch_one(pool)
523                    .await
524                    .map_err(SqliteError::Sqlite)
525            })
526            .await
527            .unwrap();
528        assert_eq!(count, 0);
529    }
530
531    #[tokio::test]
532    async fn serialized_transactions() {
533        let pool_1 = SqliteStore::temporary().await;
534
535        let pool_2 = pool_1.clone();
536
537        // Create test-table schema.
538        pool_1
539            .execute(async |pool| {
540                pool.execute("CREATE TABLE test(x INTEGER)").await?;
541                Ok(())
542            })
543            .await
544            .unwrap();
545
546        // 1. Pool 1 acquires the permit to run a transaction.
547        let permit_1 = pool_1.begin().await.unwrap();
548
549        // .. parallely Pool 2 also tries to do some work.
550        let handle = tokio::spawn(async move {
551            // Try to acquire a permit, this will "block" for now as pool 1 already is doing
552            // something and we need to wait.
553            let permit_2 = pool_2.begin().await.unwrap();
554
555            // 5. We should see now the previously change made by pool 1.
556            let result = pool_2
557                .tx(async |tx| {
558                    let row: (i64,) = query_as("SELECT x FROM test").fetch_one(&mut **tx).await?;
559                    Ok(row.0)
560                })
561                .await
562                .unwrap();
563            assert_eq!(result, 5);
564
565            // 6. Change the value to something else.
566            pool_2
567                .tx(async |tx| {
568                    query("INSERT INTO test (x) VALUES (10)")
569                        .execute(&mut **tx)
570                        .await?;
571                    Ok(())
572                })
573                .await
574                .unwrap();
575
576            // 7. .. but abort the transaction and roll back.
577            pool_2.rollback(permit_2).await.unwrap();
578
579            // The value should still be the same as before.
580            let result = pool_2
581                .execute(async |pool| {
582                    let row: (i64,) = query_as("SELECT x FROM test").fetch_one(pool).await?;
583                    Ok(row.0)
584                })
585                .await
586                .unwrap();
587            assert_eq!(result, 5);
588        });
589
590        // 2. Pool 1 changes the value.
591        pool_1
592            .tx(async |tx| {
593                query("INSERT INTO test (x) VALUES (5)")
594                    .execute(&mut **tx)
595                    .await?;
596                Ok(())
597            })
598            .await
599            .unwrap();
600
601        // 3. Result is already 5 during "dirty read".
602        let result = pool_1
603            .tx(async |tx| {
604                let row: (i64,) = query_as("SELECT x FROM test").fetch_one(&mut **tx).await?;
605                Ok(row.0)
606            })
607            .await
608            .unwrap();
609        assert_eq!(result, 5);
610
611        // 4. Commit the change to database and free permit. This will allow now pool_2 to read the
612        //    changed value.
613        pool_1.commit(permit_1).await.unwrap();
614
615        // Result is still 5 after commit.
616        let result = pool_1
617            .execute(async |pool| {
618                let row: (i64,) = query_as("SELECT x FROM test").fetch_one(pool).await?;
619                Ok(row.0)
620            })
621            .await
622            .unwrap();
623        assert_eq!(result, 5);
624
625        // Make sure we give pool 2 the time it needs to finish.
626        handle.await.unwrap();
627    }
628}