1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
use std::path::Path;
use async_rusqlite::{Connection};
use crate::migrations::Migrations;
use crate::error::ConnectionBuilderError;
/// An opinionated connection builder which ultimately hands back
/// an [`async_rusqlite::Connection`] after checking the app ID and
/// performing any necessary migrations.
pub struct ConnectionBuilder<E = rusqlite::Error> {
// PRAGMA application_id = INTEGER; default 0
app_id: i32,
// Migrations to apply
migrations: Migrations<E>,
// Function to call when the db thread shuts down
on_close: Option<Box<dyn FnOnce(Option<rusqlite::Connection>) + Send + 'static>>
}
impl <E: Send + 'static> Default for ConnectionBuilder<E> {
fn default() -> Self {
Self::new()
}
}
impl <E: Send + 'static> ConnectionBuilder<E> {
/// Construct a new connection builder.
pub fn new() -> Self {
Self {
app_id: 0,
migrations: Default::default(),
on_close: None,
}
}
/// Configure a function to be called exactly once when the connection is closed.
/// If the database has already been closed then it will be given `None`, else it
/// will be handed the database connection.
pub fn on_close<F: FnOnce(Option<rusqlite::Connection>) + Send + 'static>(mut self, f: F) -> Self {
self.on_close = Some(Box::new(f));
self
}
/// Set the "app ID" for this database. If opening an existing file,
/// this Id must match else an error will be generated. This helps to
/// ensure that the database we're trying to open is meant for the app
/// we're running. Default to 0 if not set ("SQLite Database").
pub fn app_id(mut self, app_id: i32) -> Self {
self.app_id = app_id;
self
}
/// Add a single migration to the list, which will be responsible for
/// upgrading the database to the version given.
///
/// # Panics
///
/// Panics if the migration version given is not greater than 0.
pub fn add_migration<F>(mut self, version: i32, migration: F) -> Self
where
F: Send + 'static + Fn(&rusqlite::Connection) -> Result<(), E>
{
self.migrations = self.migrations.add(version, migration);
self
}
/// **Warning: using this could lead to database state being invalid.**
///
/// Add a single migration to the list. The migration will not be performed
/// inside a transaction. Use [`Self::add_migration`] unless you know what
/// you are doing.
pub fn add_migration_non_transactionally<F>(mut self, version: i32, migration: F) -> Self
where
F: Send + 'static + Fn(&rusqlite::Connection) -> Result<(), E>
{
self.migrations = self.migrations.add_non_transactionally(version, migration);
self
}
/// Use the provided set of migrations to ensure that the database we connect
/// to is uptodate. This uses the `user_version` PRAGMA to know which migrations
/// to apply.
pub fn set_migrations(mut self, migrations: Migrations<E>) -> Self {
self.migrations = migrations;
self
}
/// Open a connection to an in-memory database.
pub async fn open_in_memory(mut self) -> Result<Connection, ConnectionBuilderError<E>> {
let conn = self.connection_builder().open_in_memory().await?;
self.setup(&conn, true).await?;
Ok(conn)
}
/// Open a connection to a database at some file.
pub async fn open<P: AsRef<Path>>(mut self, path: P) -> Result<Connection, ConnectionBuilderError<E>> {
use async_rusqlite::rusqlite::{
OpenFlags, Error::SqliteFailure, ffi::ErrorCode::CannotOpen, ffi
};
// The default flags rusqlite's open fn uses. First we try opening
// and disallow creating a new DB. Then we allow creating a new DB.
// This allows us to know when a new DB was created and act accordingly.
let flags
= OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_NO_MUTEX;
let (conn, is_new) = match self.connection_builder().open_with_flags(path.as_ref(), flags).await {
// All good:
Ok(conn) => (conn, false),
// Can't open the file; try again but allow creating it:
Err(SqliteFailure(ffi::Error { code, .. }, _)) if code == CannotOpen => {
let flags = flags | OpenFlags::SQLITE_OPEN_CREATE;
let conn = self.connection_builder().open_with_flags(path, flags).await?;
(conn, true)
},
// Something else went wrong; just return the error.
Err(e) => return Err(e.into()),
};
self.setup(&conn, is_new).await?;
Ok(conn)
}
// A connection builder.
fn connection_builder(&mut self) -> async_rusqlite::ConnectionBuilder {
let mut builder = Connection::builder();
if let Some(on_close) = self.on_close.take() {
builder = builder.on_close(on_close);
}
builder
}
// Perform any setup on the opened connection.
async fn setup(self, conn: &Connection, is_new: bool) -> Result<(), ConnectionBuilderError<E>> {
conn.call(move |conn| {
if is_new {
// Set up the app ID if this is a new DB.
conn.pragma_update(None, "application_id", self.app_id)?;
} else {
// Check the app ID if this is not a new DB.
let val: i32 = conn.query_row(
"SELECT * from pragma_application_id",
[],
|row| row.get(0)
)?;
if val != self.app_id {
return Err(ConnectionBuilderError::WrongApplicationId(val))
}
}
// Set foreign key constraint checking.
conn.pragma_update(None, "foreign_keys", true)?;
// Which version is the DB at (ie do we need to run any migrations)
let user_version: i32 = conn.query_row(
"SELECT * FROM pragma_user_version",
[],
|row| row.get(0)
)?;
// Attempt each migration atomically. If a migration fails, we don't
// want the DB to have been altered.
let mut latest_migration_version = 0;
for (version, perform_in_transaction, migration) in self.migrations.iter() {
latest_migration_version = version;
if version > user_version {
if perform_in_transaction {
// in one transaction, apply a migration and update the db version
// to reflect this. nothing happens on failure; transaction rolled back.
let transaction = conn.transaction()?;
migration(&transaction).map_err(ConnectionBuilderError::Migration)?;
transaction.pragma_update(None, "user_version", version)?;
transaction.commit()?;
} else {
// This is less safe, since any failure inside the migration can lead to
// the database being in an invalid state. Sometimes though, we need to
// control the transaction behaviour inside the migration, so this is
// the best we can do.
migration(conn).map_err(ConnectionBuilderError::Migration)?;
conn.pragma_update(None, "user_version", version)?;
}
}
}
if latest_migration_version < user_version {
// We don't have migrations up to the version that the db is at already.
// This probably means that this app is out of date. Complain, to prevent
// an out of date app from trying to use the newer database.
return Err(ConnectionBuilderError::OutOfDate {
db_version: user_version,
latest_migration: latest_migration_version
})
}
Ok(())
}).await
}
}