1use crate::config::Config;
2use r2d2::{Pool, PooledConnection};
3use r2d2_sqlite::SqliteConnectionManager;
4use refinery::Runner;
5use rusqlite::OpenFlags;
6use std::{ops::DerefMut, path::PathBuf};
7use thiserror::Error;
8
9#[derive(Debug, Error)]
12pub enum Error {
13 #[error("pool error: {0}")]
15 ConnectionPool(#[from] r2d2::Error),
16
17 #[error("deserialization error: {0}")]
19 Deserialization(#[from] serde_json::Error),
20
21 #[error("migration error: {0}")]
23 Migration(#[from] refinery::Error),
24
25 #[error("sql error: {0}")]
27 Sql(#[from] rusqlite::Error),
28
29 #[error("insertion failed")]
31 InsertionFailure,
32
33 #[error("inconsistent configuration")]
35 InconsistentConfig,
36
37 #[error("failure: {0}")]
39 Failure(String),
40}
41
42pub enum Storage {
44 File(PathBuf),
46
47 Memory(String),
49}
50
51#[derive(Clone, Debug)]
57pub struct Database {
58 reader: Pool<SqliteConnectionManager>,
59 writer: Pool<SqliteConnectionManager>,
60}
61
62impl Database {
63 pub fn open(db: Option<&PathBuf>, config: Option<&Config>) -> Result<Self, Error> {
68 let storage = db
69 .map(|path| Storage::File(path.clone()))
70 .unwrap_or(Storage::Memory("orderbook".to_owned()));
71
72 let database = open_rw(storage, Some(crate::embedded::migrations::runner()))?;
73
74 let conn = database.connect(true)?;
75 let stored_config = Config::get(&conn)?;
76
77 if let Some(stored_config) = stored_config {
78 if let Some(config) = config {
79 if stored_config != *config {
80 return Err(Error::InconsistentConfig);
81 }
82 }
83 } else if let Some(config) = config {
84 assert_ne!(config.trade_rate.as_secs(), 0, "time unit must be non-zero");
86 config.set(&conn)?;
87 } else {
88 panic!("no configuration specified")
89 };
90
91 Ok(database)
92 }
93
94 pub fn connect(&self, write: bool) -> Result<PooledConnection<SqliteConnectionManager>, Error> {
96 let conn = if write {
97 self.writer.get()
98 } else {
99 self.reader.get()
100 };
101 Ok(conn?)
102 }
103}
104
105fn pool(
107 storage: &Storage,
108 max_size: Option<u32>,
109 readonly: bool,
110 migration: Option<Runner>,
111) -> Result<Pool<SqliteConnectionManager>, Error> {
112 let mut flags = OpenFlags::default();
113 if readonly {
114 flags.set(OpenFlags::SQLITE_OPEN_READ_WRITE, false);
115 flags.set(OpenFlags::SQLITE_OPEN_READ_ONLY, true);
116 flags.set(OpenFlags::SQLITE_OPEN_CREATE, false);
117 }
118
119 let db = match storage {
121 Storage::File(path) => SqliteConnectionManager::file(path),
122 Storage::Memory(name) => {
123 SqliteConnectionManager::file(format!("file:/{}?vfs=memdb", name))
125 }
126 }
127 .with_flags(flags)
128 .with_init(|c| {
129 c.execute_batch(
136 r#"
137 PRAGMA journal_mode = WAL;
138 PRAGMA busy_timeout = 5000;
139 PRAGMA synchronous = NORMAL;
140 PRAGMA foreign_keys = true;
141 PRAGMA mmap_size = 134217728;
142 PRAGMA journal_size_limit = 27103364;
143 PRAGMA cache_size=2000;
144 "#,
145 )
146 });
147
148 let pool = if let Some(n) = max_size {
149 r2d2::Pool::builder().max_size(n)
150 } else {
151 r2d2::Pool::builder()
152 }
153 .build(db)?;
154
155 if let Some(runner) = migration {
156 let mut conn = pool.get()?;
157 runner.run(conn.deref_mut())?;
158 }
159
160 Ok(pool)
161}
162
163fn open_rw(storage: Storage, migration: Option<Runner>) -> Result<Database, Error> {
165 let writer = pool(&storage, Some(1), false, migration)?;
166 let reader = pool(&storage, None, true, None)?;
167 Ok(Database { reader, writer })
168}