#![allow(dead_code)]
#[cfg(test)]
mod tests;
use anyhow::Context as _;
use sqlx::{
SqlitePool,
query::Query,
sqlite::{
SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode,
SqliteSynchronous,
},
};
use std::{cmp::Ordering, path::Path, time::Instant};
pub type SqliteQuery<'q> = Query<'q, sqlx::Sqlite, SqliteArguments<'q>>;
pub async fn open_file(file: &Path) -> anyhow::Result<SqlitePool> {
if let Some(dir) = file.parent()
&& !dir.is_dir()
{
std::fs::create_dir_all(dir)?;
}
let options = SqliteConnectOptions::new().filename(file);
Ok(open(options).await?)
}
pub async fn open_memory() -> sqlx::Result<SqlitePool> {
open(
SqliteConnectOptions::new()
.in_memory(true)
.shared_cache(true),
)
.await
}
pub async fn open(options: SqliteConnectOptions) -> sqlx::Result<SqlitePool> {
let options = options
.synchronous(SqliteSynchronous::Normal)
.pragma("temp_store", "memory")
.pragma("mmap_size", "30000000000")
.auto_vacuum(SqliteAutoVacuum::None)
.journal_mode(SqliteJournalMode::Wal)
.pragma("journal_size_limit", "0") .foreign_keys(true)
.read_only(false);
SqlitePool::connect_with(options).await
}
pub async fn init_db<'q>(
db: &SqlitePool,
name: &str,
ddls: impl IntoIterator<Item = SqliteQuery<'q>>,
version_migrations: Vec<SqliteQuery<'q>>,
) -> anyhow::Result<()> {
let schema_version = version_migrations.len() + 1;
let init = async |db: &SqlitePool, schema_version| {
let mut tx = db.begin().await?;
sqlx::query("CREATE TABLE IF NOT EXISTS _meta (version UINT64 NOT NULL UNIQUE)")
.execute(tx.as_mut())
.await?;
for i in 1..=schema_version {
sqlx::query("INSERT OR IGNORE INTO _meta (version) VALUES (?)")
.bind(i as i64)
.execute(tx.as_mut())
.await?;
}
for ddl in ddls.into_iter() {
ddl.execute(tx.as_mut()).await?;
}
tx.commit().await
};
if sqlx::query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
.fetch_optional(db)
.await
.with_context(|| format!("error looking for {name} database _meta table"))?
.is_none()
{
init(db, schema_version).await?;
}
let found_version: u64 = sqlx::query_scalar("SELECT max(version) FROM _meta")
.fetch_optional(db)
.await?
.with_context(|| format!("invalid {name} database version: no version found"))?;
anyhow::ensure!(found_version > 0, "schema version should be 1 based");
let run_vacuum = match found_version.cmp(&(schema_version as _)) {
Ordering::Greater => {
anyhow::bail!(
"invalid {name} database version: version {found_version} is greater than the number of migrations {schema_version}"
);
}
Ordering::Equal => false,
Ordering::Less => true,
};
for (from_version, to_version, migration) in version_migrations
.into_iter()
.enumerate()
.map(|(i, m)| (i + 1, i + 2, m))
.skip(found_version as usize - 1)
{
tracing::info!("Migrating {name} database to version {to_version}");
let now = Instant::now();
let mut tx = db.begin().await?;
migration.execute(tx.as_mut()).await?;
sqlx::query("INSERT OR IGNORE INTO _meta (version) VALUES (?)")
.bind(to_version as i64)
.execute(tx.as_mut())
.await?;
tx.commit().await?;
tracing::info!(
"Successfully migrated {name} database from version {from_version} to {to_version} in {}",
humantime::format_duration(now.elapsed())
);
}
if run_vacuum {
tracing::info!(
"Performing {name} database vacuum and wal checkpointing to free up space after the migration"
);
if let Err(e) = sqlx::query("VACUUM").execute(db).await {
tracing::warn!("error vacuuming {name} database: {e}")
}
if let Err(e) = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(db)
.await
{
tracing::warn!("error checkpointing {name} database wal: {e}")
}
}
Ok(())
}