p2panda_store/sqlite.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! SQLite database implementation with associated utility functions.
4use std::sync::Arc;
5
6use p2panda_core::cbor::EncodeError;
7use sqlx::migrate::{MigrateDatabase, Migrator};
8use sqlx::sqlite::SqlitePoolOptions;
9use sqlx::{Sqlite, migrate};
10use thiserror::Error;
11use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
12
13/// Creates the SQLite database if it doesn't already exist.
14pub async fn create_database(url: &str) -> Result<(), SqliteError> {
15 if !Sqlite::database_exists(url).await? {
16 Sqlite::create_database(url).await?
17 }
18 Ok(())
19}
20
21/// Drops the SQLite database if it exists.
22pub async fn drop_database(url: &str) -> Result<(), SqliteError> {
23 if Sqlite::database_exists(url).await? {
24 Sqlite::drop_database(url).await?
25 }
26 Ok(())
27}
28
29/// Creates the SQLite connection pool.
30pub async fn connection_pool(
31 url: &str,
32 max_connections: u32,
33) -> Result<sqlx::SqlitePool, SqliteError> {
34 let pool: sqlx::SqlitePool = SqlitePoolOptions::new()
35 .max_connections(max_connections)
36 .connect(url)
37 .await?;
38 Ok(pool)
39}
40
41/// Gets migrations from folder without running them.
42pub fn migrations() -> Migrator {
43 migrate!()
44}
45
46/// Runs any pending database migrations from inside the application.
47pub async fn run_pending_migrations(pool: &sqlx::SqlitePool) -> Result<(), SqliteError> {
48 migrations().run(pool).await?;
49 Ok(())
50}
51
52/// Builder for `SqliteStore`.
53///
54/// To create the database call `SqliteStoreBuilder::build()`.
55///
56/// By default, the builder configures an in-memory database with a maximum number of 16
57/// connections. The database is created if it doesn't already exist and migrations are
58/// automatically run on start-up.
59pub struct SqliteStoreBuilder {
60 url: String,
61 max_connections: u32,
62 run_migrations: bool,
63 create_database: bool,
64}
65
66impl Default for SqliteStoreBuilder {
67 fn default() -> Self {
68 Self {
69 url: "sqlite::memory:".into(),
70 max_connections: 16,
71 create_database: true,
72 run_migrations: true,
73 }
74 }
75}
76
77impl SqliteStoreBuilder {
78 /// Creates a new `SqliteStoreBuilder` using default configuration values.
79 pub fn new() -> Self {
80 Self::default()
81 }
82
83 /// Assigns a randomly-generated in-memory database URL with private cache.
84 #[cfg(any(test, feature = "test_utils"))]
85 pub fn random_memory_url(mut self) -> Self {
86 // Combining Rust tests with in-memory databases can lead to unsound behaviour, this
87 // "workaround" assigns every temporary database a different, random name and keeps them
88 // isolated from other tests.
89 //
90 // See related issue: https://github.com/launchbadge/sqlx/issues/2510
91 self.url = format!(
92 "sqlite://dbmem{}?mode=memory&cache=private",
93 rand::random::<u32>()
94 );
95 self
96 }
97
98 /// Sets the database URL.
99 ///
100 /// If left unset, the database will use an ephemeral in-memory URL.
101 pub fn database_url(mut self, url: &str) -> Self {
102 self.url = url.to_string();
103 self
104 }
105
106 /// Sets the maximum number of connections to be maintained by the database pool.
107 ///
108 /// If left unset, a maximum of 16 connections will be maintained.
109 pub fn max_connections(mut self, max_connections: u32) -> Self {
110 self.max_connections = max_connections;
111 self
112 }
113
114 /// Creates the database if it doesn't already exist.
115 ///
116 /// If left unset, the database will be created by default.
117 pub fn create_database(mut self, create_database: bool) -> Self {
118 self.create_database = create_database;
119 self
120 }
121
122 /// Sets whether pending migrations should be applied when the database is built.
123 ///
124 /// If left unset, the database will apply any pending migrations.
125 pub fn run_default_migrations(mut self, run_migrations: bool) -> Self {
126 self.run_migrations = run_migrations;
127 self
128 }
129
130 /// Builds the `SqliteStore`.
131 pub async fn build(self) -> Result<SqliteStore, SqliteError> {
132 if self.create_database {
133 create_database(&self.url).await?;
134 }
135
136 let pool: sqlx::SqlitePool = SqlitePoolOptions::new()
137 .max_connections(self.max_connections)
138 .connect(&self.url)
139 .await?;
140
141 if self.run_migrations {
142 run_pending_migrations(&pool).await?;
143 }
144
145 Ok(SqliteStore::new(pool))
146 }
147}
148
149/// An in-progress database transaction.
150pub type Transaction<'a> = sqlx::Transaction<'a, Sqlite>;
151
152/// Sqlite connection pool.
153pub type SqlitePool = sqlx::SqlitePool;
154
155/// SQLite database with connection pool and transaction provider.
156///
157/// This struct can be cloned and used in multiple places in the application. Every cloned instance
158/// will re-use the same connection pool and have access to the same transaction instance if one
159/// was started. To guard against sharing transactions unknowingly across unrelated database
160/// queries, a concept of a `TransactionPermit` was introduced which does not protect from misuse
161/// but helps to make "holding" a transaction explicit.
162///
163/// Please note that SQLite strictly serializes transactions with _writes_ and will block any
164/// parallel attempt to begin another one. Processes starting a transaction will acquire a
165/// `TransactionPermit` and keep it until the transaction was committed or rolled back. If the
166/// query only involves _reads_ it is recommended to not use transactions and use the `execute`
167/// method directly as acquiring transactions will potentially block other processes to do work.
168///
169/// ## Design decisions
170///
171/// This storage API design was chosen to make the dynamics of the underlying SQLite database
172/// explicit to avoid potentially introducing subtle bugs. Internally any process can access the
173/// transaction object to do writes and (uncommitted) reads (see "Transaction I" in diagram). Care
174/// is required when designing systems like that as it's still possible to allow concurrent
175/// processes to read and write within the same transaction (for example one process could roll
176/// back the transaction while the other one assumed it will be committed). Usually developers want
177/// to design _writes_ to the database within a transaction if they need consistency and atomicity
178/// guarantees. "Unrelated" queries _can_ be "pooled" in one transaction (for performance reasons
179/// for example) if consistency is guaranteed by all involved processes and the underlying
180/// data-model (see "Transaction II" in diagram).
181///
182/// ```text
183/// Transaction I:
184/// begin ---------------------> commit
185///
186/// Process I:
187/// --> write --> read -->
188///
189/// Transaction II:
190/// begin ----------------------> commit
191///
192/// Process II:
193/// --> write --> write -->
194///
195/// Process III:
196/// --> read --> write --->
197/// ```
198///
199/// Another design decision is to not expose transactions to the high-level storage APIs (similar
200/// to the "Repository Pattern"). Users of the storage methods like `get_operation` (in
201/// `OperationStore`) etc. do _not_ need to explicity deal with transaction objects, as this is
202/// handled internally now. Like this it is possible to separate the "logic" from the "storage"
203/// layer and keep the code clean.
204#[derive(Clone, Debug)]
205pub struct SqliteStore {
206 tx: Arc<Mutex<Option<Transaction<'static>>>>,
207 pub(crate) pool: sqlx::SqlitePool,
208 semaphore: Arc<Semaphore>,
209}
210
211impl SqliteStore {
212 /// Creates a new `SqliteStore` using the provided connection pool.
213 pub(crate) fn new(pool: sqlx::SqlitePool) -> Self {
214 Self {
215 tx: Arc::default(),
216 pool,
217 // SQLite only ever allows _one_ transaction at a time. This might be a repetition of
218 // what sqlx and SQLite do under the hood, but we want to make this behaviour explicit
219 // right from the beginning with this semaphore.
220 semaphore: Arc::new(Semaphore::new(1)),
221 }
222 }
223
224 /// Creates a new `SqliteStore` using the provided connection pool.
225 pub fn from_pool(pool: sqlx::SqlitePool) -> Self {
226 Self::new(pool)
227 }
228
229 /// Returns a reference to the connection pool.
230 pub fn pool(&self) -> &sqlx::SqlitePool {
231 &self.pool
232 }
233
234 /// Builds an in-memory SQLite database with a randomised name for testing purposes.
235 #[cfg(any(test, feature = "test_utils"))]
236 pub async fn temporary() -> Self {
237 SqliteStoreBuilder::new()
238 .random_memory_url()
239 .max_connections(1)
240 .build()
241 .await
242 .expect("migrations succeeded")
243 }
244
245 /// Executes a SQL query within a transaction.
246 ///
247 /// This method will return an error when no transaction is currently given. Make sure to call
248 /// `begin` before.
249 ///
250 /// If the query fails the user probably wants to roll back the transaction and free the
251 /// permit. This is _not_ handled automatically.
252 pub async fn tx<F, R>(&self, f: F) -> Result<R, SqliteError>
253 where
254 F: AsyncFnOnce(&mut Transaction) -> Result<R, SqliteError>,
255 {
256 let mut tx_ref = self.tx.lock().await;
257 let tx = tx_ref.as_mut().ok_or(SqliteError::TransactionMissing)?;
258
259 f(tx).await
260 }
261
262 /// Executes a SQL query directly.
263 pub async fn execute<F, R>(&self, f: F) -> Result<R, SqliteError>
264 where
265 F: AsyncFnOnce(&sqlx::SqlitePool) -> Result<R, SqliteError>,
266 {
267 f(&self.pool).await
268 }
269}
270
271impl crate::traits::Transaction for SqliteStore {
272 type Error = SqliteError;
273
274 type Permit = TransactionPermit;
275
276 /// Begins a transaction.
277 ///
278 /// Transactions are strictly serialized, this is expressed in form of a `TransactionPermit`
279 /// processes need to hold when acquiring access to a new transaction. Any concurrent process
280 /// calling it will await here if there's already another process holding a permit, this will
281 /// potentially "slow down" work and should be carefully used.
282 ///
283 /// Any process with a transaction can now start using the `tx` method to execute writes within
284 /// this transaction or perform uncommitted "dirty" reads on it.
285 ///
286 /// It is usually not necessary to acquire a transaction when the logic only requires committed
287 /// _reads_ to the database. Use `execute` instead.
288 async fn begin(&self) -> Result<TransactionPermit, SqliteError> {
289 // Acquire a permit from the semaphore, it will await if currently another process has the
290 // permit. Here we enforce strict serialization of transactions (similar to what SQLite
291 // does under the hood).
292 let permit = self
293 .semaphore
294 .clone()
295 .acquire_owned()
296 .await
297 .expect("if semaphore is closed then the whole struct is gone as well");
298
299 // Access the transaction object which we've placed behind a Mutex. This lock follows a
300 // different logic and only makes sure that mutable access to it is exclusive _within_ a
301 // process "holding" the transaction permit.
302 let mut tx_ref = self.tx.lock().await;
303 assert!(
304 tx_ref.is_none(),
305 "can't have an already existing transaction after an just-acquired permit"
306 );
307 let tx = self.pool.begin().await?;
308 tx_ref.replace(tx);
309
310 Ok(TransactionPermit::new(permit, self.tx.clone()))
311 }
312
313 /// Rolls back the transaction and with that all uncommitted changes.
314 ///
315 /// This takes the permit and frees it after the rollback has finished. Other processes can now
316 /// begin new transactions.
317 async fn rollback(&self, permit: TransactionPermit) -> Result<(), SqliteError> {
318 let Some(tx) = self.tx.lock().await.take() else {
319 panic!("can't have no transaction without dropping permit first")
320 };
321
322 let result = tx.rollback().await.map_err(SqliteError::Sqlite);
323
324 // Always drop the permit, both on successful rollback and error. This will allow other
325 // processes now to begin a new transaction and acquire the permit.
326 permit.mark_committed_and_drop();
327
328 result
329 }
330
331 /// Commits the transaction.
332 ///
333 /// This takes the permit and frees it after the commit has finished. Other processes can now
334 /// begin new transactions.
335 async fn commit(&self, permit: TransactionPermit) -> Result<(), SqliteError> {
336 let Some(tx) = self.tx.lock().await.take() else {
337 panic!("can't have no transaction without dropping permit first")
338 };
339
340 let result = tx.commit().await.map_err(SqliteError::Sqlite);
341
342 // Always drop the permit, both on successful commit and error. This will allow other
343 // processes now to begin a new transaction and acquire the permit.
344 permit.mark_committed_and_drop();
345
346 result
347 }
348}
349
350/// Locked context marking the lifetime of a single transaction.
351pub struct TransactionPermit {
352 permit: Arc<OwnedSemaphorePermit>,
353 tx: Arc<Mutex<Option<Transaction<'static>>>>,
354 committed: bool,
355}
356
357impl TransactionPermit {
358 /// Creates a new `TransactionPermit` using the given permit and transaction.
359 pub(super) fn new(
360 permit: OwnedSemaphorePermit,
361 tx: Arc<Mutex<Option<Transaction<'static>>>>,
362 ) -> Self {
363 Self {
364 permit: Arc::new(permit),
365 tx,
366 committed: false,
367 }
368 }
369
370 /// Marks the transaction as committed and drops the permit.
371 ///
372 /// In the case that the permit was never used, whether due to an early return or error, the
373 /// transaction is automatically rolled-back to prevent corrupted state.
374 pub(super) fn mark_committed_and_drop(mut self) {
375 self.committed = true;
376 drop(self)
377 }
378}
379
380impl Drop for TransactionPermit {
381 fn drop(&mut self) {
382 // If the permit was never used (due to an early return / error / etc.) we automatically
383 // roll-back the transaction.
384 if !self.committed {
385 let permit = self.permit.clone();
386 let tx = self.tx.clone();
387
388 tokio::spawn(async move {
389 if let Some(tx) = tx.lock().await.take() {
390 let _ = tx.rollback().await;
391 }
392
393 drop(permit); // Semaphore released only after rollback completes.
394 });
395 }
396 }
397}
398
399/// Error when interacting with a SQLite store implementation.
400#[derive(Debug, Error)]
401pub enum SqliteError {
402 /// This is a critical error as it indicates that something is wrong with the usage of this
403 /// API: Queries using transactions can only ever occur if a transaction was started _before_.
404 #[error("tried to interact with inexistant transaction")]
405 TransactionMissing,
406
407 /// SQLite database and connection error.
408 #[error(transparent)]
409 Sqlite(#[from] sqlx::Error),
410
411 /// SQL table schema migration error.
412 #[error(transparent)]
413 Migrate(#[from] sqlx::migrate::MigrateError),
414
415 /// An I/O error occurred while encoding bytes before storing them into the database. This is a
416 /// critical error.
417 #[error("failed encoding '{0}' value before storing to database: {1}")]
418 Encode(String, EncodeError),
419
420 /// Invalid, corrupted data was found in the database. This is a critical error.
421 #[error("could not decode corrupted '{0}' value from database: {1}")]
422 Decode(String, DecodeError),
423}
424
425/// Error decoding value retrieved from a store.
426#[derive(Debug, Error)]
427pub enum DecodeError {
428 #[error(transparent)]
429 DecodeCbor(#[from] p2panda_core::cbor::DecodeError),
430
431 #[error(transparent)]
432 Hash(#[from] p2panda_core::hash::HashError),
433
434 #[error(transparent)]
435 Topic(#[from] p2panda_core::topic::TopicError),
436
437 #[error("parsing from string failed")]
438 FromStr,
439}
440
441#[cfg(test)]
442mod tests {
443 use std::task::Poll;
444
445 use futures_test::task::noop_context;
446 use sqlx::{Executor, query, query_as, query_scalar};
447 use tokio::pin;
448
449 use crate::sqlite::{SqliteError, SqliteStore};
450 use crate::traits::Transaction;
451
452 #[tokio::test]
453 async fn transaction_provider() {
454 let pool = SqliteStore::temporary().await;
455
456 // Executing with an in-existant transaction should throw error.
457 assert!(matches!(
458 pool.tx(async |_| Ok(())).await,
459 Err(SqliteError::TransactionMissing)
460 ));
461
462 // Starting a new transaction should work.
463 let permit = pool.begin().await.expect("no error");
464
465 // .. attempting to start a second one should make us wait.
466 assert!(matches!(
467 {
468 let fut = pool.begin();
469 let mut cx = noop_context();
470 pin!(fut);
471 fut.poll(&mut cx)
472 },
473 Poll::Pending
474 ));
475
476 // Using the transaction should work without failure.
477 assert!(pool.tx(async |_| Ok(())).await.is_ok());
478
479 // Committing should work as well.
480 assert!(pool.commit(permit).await.is_ok());
481
482 // .. and now running a transaction should fail.
483 assert!(matches!(
484 pool.tx(async |_| Ok(())).await,
485 Err(SqliteError::TransactionMissing)
486 ));
487 }
488
489 #[tokio::test]
490 async fn early_permit_drop_causing_rollback() {
491 let pool = SqliteStore::temporary().await;
492
493 // Create test-table schema.
494 pool.execute(async |pool| {
495 pool.execute("CREATE TABLE test(x INTEGER)").await?;
496 Ok(())
497 })
498 .await
499 .unwrap();
500
501 let permit = pool.begin().await.unwrap();
502
503 pool.tx(async |tx| {
504 query("INSERT INTO test (x) VALUES (10)")
505 .execute(&mut **tx)
506 .await?;
507 Ok(())
508 })
509 .await
510 .unwrap();
511
512 // Permit was dropped prematurely without committing.
513 drop(permit);
514
515 // It is okay to start another permit.
516 assert!(pool.begin().await.is_ok());
517
518 // The data was not written as the transaction got rolled back.
519 let count: i64 = pool
520 .execute(async |pool| {
521 query_scalar("SELECT COUNT(*) FROM test")
522 .fetch_one(pool)
523 .await
524 .map_err(SqliteError::Sqlite)
525 })
526 .await
527 .unwrap();
528 assert_eq!(count, 0);
529 }
530
531 #[tokio::test]
532 async fn serialized_transactions() {
533 let pool_1 = SqliteStore::temporary().await;
534
535 let pool_2 = pool_1.clone();
536
537 // Create test-table schema.
538 pool_1
539 .execute(async |pool| {
540 pool.execute("CREATE TABLE test(x INTEGER)").await?;
541 Ok(())
542 })
543 .await
544 .unwrap();
545
546 // 1. Pool 1 acquires the permit to run a transaction.
547 let permit_1 = pool_1.begin().await.unwrap();
548
549 // .. parallely Pool 2 also tries to do some work.
550 let handle = tokio::spawn(async move {
551 // Try to acquire a permit, this will "block" for now as pool 1 already is doing
552 // something and we need to wait.
553 let permit_2 = pool_2.begin().await.unwrap();
554
555 // 5. We should see now the previously change made by pool 1.
556 let result = pool_2
557 .tx(async |tx| {
558 let row: (i64,) = query_as("SELECT x FROM test").fetch_one(&mut **tx).await?;
559 Ok(row.0)
560 })
561 .await
562 .unwrap();
563 assert_eq!(result, 5);
564
565 // 6. Change the value to something else.
566 pool_2
567 .tx(async |tx| {
568 query("INSERT INTO test (x) VALUES (10)")
569 .execute(&mut **tx)
570 .await?;
571 Ok(())
572 })
573 .await
574 .unwrap();
575
576 // 7. .. but abort the transaction and roll back.
577 pool_2.rollback(permit_2).await.unwrap();
578
579 // The value should still be the same as before.
580 let result = pool_2
581 .execute(async |pool| {
582 let row: (i64,) = query_as("SELECT x FROM test").fetch_one(pool).await?;
583 Ok(row.0)
584 })
585 .await
586 .unwrap();
587 assert_eq!(result, 5);
588 });
589
590 // 2. Pool 1 changes the value.
591 pool_1
592 .tx(async |tx| {
593 query("INSERT INTO test (x) VALUES (5)")
594 .execute(&mut **tx)
595 .await?;
596 Ok(())
597 })
598 .await
599 .unwrap();
600
601 // 3. Result is already 5 during "dirty read".
602 let result = pool_1
603 .tx(async |tx| {
604 let row: (i64,) = query_as("SELECT x FROM test").fetch_one(&mut **tx).await?;
605 Ok(row.0)
606 })
607 .await
608 .unwrap();
609 assert_eq!(result, 5);
610
611 // 4. Commit the change to database and free permit. This will allow now pool_2 to read the
612 // changed value.
613 pool_1.commit(permit_1).await.unwrap();
614
615 // Result is still 5 after commit.
616 let result = pool_1
617 .execute(async |pool| {
618 let row: (i64,) = query_as("SELECT x FROM test").fetch_one(pool).await?;
619 Ok(row.0)
620 })
621 .await
622 .unwrap();
623 assert_eq!(result, 5);
624
625 // Make sure we give pool 2 the time it needs to finish.
626 handle.await.unwrap();
627 }
628}