fts_demo/
db.rs

1use crate::config::Config;
2use r2d2::{Pool, PooledConnection};
3use r2d2_sqlite::SqliteConnectionManager;
4use refinery::Runner;
5use rusqlite::OpenFlags;
6use std::{ops::DerefMut, path::PathBuf};
7use thiserror::Error;
8
9/// Database operations generate errors for multiple reasons, this is a unified
10/// error type that our functions can return.
11#[derive(Debug, Error)]
12pub enum Error {
13    /// Error from the connection pool
14    #[error("pool error: {0}")]
15    ConnectionPool(#[from] r2d2::Error),
16
17    /// Error in JSON serialization or deserialization
18    #[error("deserialization error: {0}")]
19    Deserialization(#[from] serde_json::Error),
20
21    /// Error during database migrations
22    #[error("migration error: {0}")]
23    Migration(#[from] refinery::Error),
24
25    /// Error from SQLite operations
26    #[error("sql error: {0}")]
27    Sql(#[from] rusqlite::Error),
28
29    /// Failure to insert data
30    #[error("insertion failed")]
31    InsertionFailure,
32
33    /// Conflicting configuration detected
34    #[error("inconsistent configuration")]
35    InconsistentConfig,
36
37    /// Generic failure with message
38    #[error("failure: {0}")]
39    Failure(String),
40}
41
42/// Storage configuration for the database.
43pub enum Storage {
44    /// Store data in a file at the specified path
45    File(PathBuf),
46
47    /// Store data in memory with the given identifier
48    Memory(String),
49}
50
51/// Main database connection manager.
52///
53/// Sqlite does not have parallel writes, so we create two separate connection
54/// pools. The reader has unlimited connections, while the writer is capped to
55/// one. Sqlite has its own mutex shenanigans to make that work out.
56#[derive(Clone, Debug)]
57pub struct Database {
58    reader: Pool<SqliteConnectionManager>,
59    writer: Pool<SqliteConnectionManager>,
60}
61
62impl Database {
63    /// Opens a database connection with the specified configuration.
64    ///
65    /// Creates a new database if one doesn't exist, and applies migrations.
66    /// Validates that the provided configuration matches any existing configuration.
67    pub fn open(db: Option<&PathBuf>, config: Option<&Config>) -> Result<Self, Error> {
68        let storage = db
69            .map(|path| Storage::File(path.clone()))
70            .unwrap_or(Storage::Memory("orderbook".to_owned()));
71
72        let database = open_rw(storage, Some(crate::embedded::migrations::runner()))?;
73
74        let conn = database.connect(true)?;
75        let stored_config = Config::get(&conn)?;
76
77        if let Some(stored_config) = stored_config {
78            if let Some(config) = config {
79                if stored_config != *config {
80                    return Err(Error::InconsistentConfig);
81                }
82            }
83        } else if let Some(config) = config {
84            // TODO: can we move this to the arg parsing and surface the message more cleanly?
85            assert_ne!(config.trade_rate.as_secs(), 0, "time unit must be non-zero");
86            config.set(&conn)?;
87        } else {
88            panic!("no configuration specified")
89        };
90
91        Ok(database)
92    }
93
94    /// Obtains a connection from the pool.
95    pub fn connect(&self, write: bool) -> Result<PooledConnection<SqliteConnectionManager>, Error> {
96        let conn = if write {
97            self.writer.get()
98        } else {
99            self.reader.get()
100        };
101        Ok(conn?)
102    }
103}
104
105/// Constructs the connection pools.
106fn pool(
107    storage: &Storage,
108    max_size: Option<u32>,
109    readonly: bool,
110    migration: Option<Runner>,
111) -> Result<Pool<SqliteConnectionManager>, Error> {
112    let mut flags = OpenFlags::default();
113    if readonly {
114        flags.set(OpenFlags::SQLITE_OPEN_READ_WRITE, false);
115        flags.set(OpenFlags::SQLITE_OPEN_READ_ONLY, true);
116        flags.set(OpenFlags::SQLITE_OPEN_CREATE, false);
117    }
118
119    // Open the database
120    let db = match storage {
121        Storage::File(path) => SqliteConnectionManager::file(path),
122        Storage::Memory(name) => {
123            // for in-memory databases, SQLITE_OPEN_CREATE seems to create errors
124            SqliteConnectionManager::file(format!("file:/{}?vfs=memdb", name))
125        }
126    }
127    .with_flags(flags)
128    .with_init(|c| {
129        // TODO: validate these settings and possibly add to them. Some helpful resources:
130        // * https://lobste.rs/s/fxkk7v/why_does_sqlite_production_have_such_bad
131        // * https://kerkour.com/sqlite-for-servers
132        // * https://gcollazo.com/optimal-sqlite-settings-for-django/
133        // * https://lobste.rs/s/rvsgqy/gotchas_with_sqlite_production
134        // * https://blog.pecar.me/sqlite-prod
135        c.execute_batch(
136            r#"
137            PRAGMA journal_mode = WAL;
138            PRAGMA busy_timeout = 5000;
139            PRAGMA synchronous = NORMAL;
140            PRAGMA foreign_keys = true;
141            PRAGMA mmap_size = 134217728;
142            PRAGMA journal_size_limit = 27103364;
143            PRAGMA cache_size=2000;
144            "#,
145        )
146    });
147
148    let pool = if let Some(n) = max_size {
149        r2d2::Pool::builder().max_size(n)
150    } else {
151        r2d2::Pool::builder()
152    }
153    .build(db)?;
154
155    if let Some(runner) = migration {
156        let mut conn = pool.get()?;
157        runner.run(conn.deref_mut())?;
158    }
159
160    Ok(pool)
161}
162
163/// Creates an instance of Database with read and write connection pools.
164fn open_rw(storage: Storage, migration: Option<Runner>) -> Result<Database, Error> {
165    let writer = pool(&storage, Some(1), false, migration)?;
166    let reader = pool(&storage, None, true, None)?;
167    Ok(Database { reader, writer })
168}