use crate::{
core::{AppliedMigrationRow, GenericMigrator, MigrationBackend, MigrationType},
error::Error,
AppliedMigration, Migration, MigrationReport, Precondition,
};
#[cfg(test)]
use crate::MigrationFailure;
use chrono::Utc;
use rusqlite::{params, Connection};
pub(crate) struct SqliteBackend;
impl MigrationBackend for SqliteBackend {
type Conn = Connection;
fn version_table_exists(conn: &mut Connection, table_name: &str) -> Result<bool, Error> {
let mut stmt =
conn.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name=?1")?;
let exists = stmt.query([table_name])?.next()?.is_some();
Ok(exists)
}
fn create_version_table(conn: &mut Connection, table_name: &str) -> Result<(), Error> {
conn.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (version integer primary key not null, name text not null, applied_at text not null, checksum text not null, migration_type text not null default 'migration')",
table_name
),
[],
)?;
Ok(())
}
fn column_exists(
conn: &mut Connection,
table_name: &str,
column_name: &str,
) -> Result<bool, Error> {
let mut stmt = conn.prepare(&format!("PRAGMA table_info({})", table_name))?;
let columns: Vec<String> = stmt
.query_map([], |row| row.get::<_, String>(1))?
.collect::<Result<Vec<_>, _>>()?;
Ok(columns.contains(&column_name.to_string()))
}
fn add_column(
conn: &mut Connection,
table_name: &str,
column_name: &str,
column_def: &str,
) -> Result<(), Error> {
conn.execute(
&format!(
"ALTER TABLE {} ADD COLUMN {} {}",
table_name, column_name, column_def
),
[],
)?;
Ok(())
}
fn get_applied_migration_rows(
conn: &mut Connection,
table_name: &str,
) -> Result<Vec<AppliedMigrationRow>, Error> {
let mut stmt =
conn.prepare(&format!("SELECT version, name, checksum FROM {}", table_name))?;
let rows = stmt
.query_map([], |row| {
Ok(AppliedMigrationRow {
version: row.get(0)?,
name: row.get(1)?,
checksum: row.get(2)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(rows)
}
fn get_max_version(conn: &mut Connection, table_name: &str) -> Result<u32, Error> {
let mut stmt =
conn.prepare(&format!("SELECT MAX(version) from {}", table_name))?;
let version: Option<u32> = stmt.query_row([], |row| row.get(0))?;
Ok(version.unwrap_or(0))
}
fn get_migration_history_rows(
conn: &mut Connection,
table_name: &str,
) -> Result<Vec<AppliedMigration>, Error> {
let has_migration_type = {
let mut stmt =
conn.prepare(&format!("PRAGMA table_info({})", table_name))?;
let columns: Vec<String> = stmt
.query_map([], |row| row.get::<_, String>(1))?
.collect::<Result<Vec<_>, _>>()?;
columns.contains(&"migration_type".to_string())
};
let query = if has_migration_type {
format!(
"SELECT version, name, applied_at, checksum, migration_type FROM {} ORDER BY version",
table_name
)
} else {
format!(
"SELECT version, name, applied_at, checksum FROM {} ORDER BY version",
table_name
)
};
let mut stmt = conn.prepare(&query)?;
let migrations = if has_migration_type {
stmt.query_map([], |row| {
let applied_at_str: String = row.get(2)?;
let applied_at = chrono::DateTime::parse_from_rfc3339(&applied_at_str)
.map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
2,
rusqlite::types::Type::Text,
Box::new(e),
)
})?
.with_timezone(&Utc);
let migration_type_str: String = row.get(4)?;
let migration_type = if migration_type_str == "baseline" {
MigrationType::Baseline
} else {
MigrationType::Migration
};
Ok(AppliedMigration {
version: row.get(0)?,
name: row.get(1)?,
applied_at,
checksum: row.get(3)?,
migration_type,
})
})?
.collect::<Result<Vec<_>, _>>()?
} else {
stmt.query_map([], |row| {
let applied_at_str: String = row.get(2)?;
let applied_at = chrono::DateTime::parse_from_rfc3339(&applied_at_str)
.map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
2,
rusqlite::types::Type::Text,
Box::new(e),
)
})?
.with_timezone(&Utc);
Ok(AppliedMigration {
version: row.get(0)?,
name: row.get(1)?,
applied_at,
checksum: row.get(3)?,
migration_type: MigrationType::Migration,
})
})?
.collect::<Result<Vec<_>, _>>()?
};
Ok(migrations)
}
fn execute_migration_up(
conn: &mut Connection,
migration: &Box<dyn Migration>,
table_name: &str,
applied_at: &str,
checksum: &str,
migration_type: MigrationType,
) -> Result<bool, Error> {
let tx = conn.transaction()?;
let precondition = migration.sqlite_precondition(&tx)?;
match precondition {
Precondition::AlreadySatisfied => {
tx.execute(
&format!(
"INSERT INTO {} (version, name, applied_at, checksum, migration_type) VALUES(?1, ?2, ?3, ?4, ?5)",
table_name
),
params![migration.version(), migration.name(), applied_at, checksum, migration_type.to_string()],
)?;
tx.commit()?;
Ok(false)
}
Precondition::NeedsApply => {
migration.sqlite_up(&tx)?;
tx.execute(
&format!(
"INSERT INTO {} (version, name, applied_at, checksum, migration_type) VALUES(?1, ?2, ?3, ?4, ?5)",
table_name
),
params![migration.version(), migration.name(), applied_at, checksum, migration_type.to_string()],
)?;
tx.commit()?;
Ok(true)
}
}
}
fn execute_migration_down(
conn: &mut Connection,
migration: &Box<dyn Migration>,
table_name: &str,
) -> Result<(), Error> {
let tx = conn.transaction()?;
migration.sqlite_down(&tx)?;
tx.execute(
&format!("DELETE FROM {} WHERE version = ?1", table_name),
params![migration.version()],
)?;
tx.commit()?;
Ok(())
}
}
#[derive(Debug)]
pub struct SqliteMigrator {
migrator: GenericMigrator,
busy_timeout: std::time::Duration,
}
impl SqliteMigrator {
fn setup_concurrency_protection(&self, conn: &Connection) -> Result<(), Error> {
conn.busy_timeout(self.busy_timeout)?;
Ok(())
}
pub fn try_new(migrations: Vec<Box<dyn Migration>>) -> Result<Self, String> {
Ok(Self {
migrator: GenericMigrator::try_new(migrations)?,
busy_timeout: std::time::Duration::from_secs(30),
})
}
pub fn new(migrations: Vec<Box<dyn Migration>>) -> Self {
match Self::try_new(migrations) {
Ok(migrator) => migrator,
Err(err) => panic!("{}", err),
}
}
pub fn with_schema_version_table_name(mut self, name: impl Into<String>) -> Self {
self.migrator.set_schema_version_table_name(name);
self
}
pub fn with_busy_timeout(mut self, timeout: std::time::Duration) -> Self {
self.busy_timeout = timeout;
self
}
pub fn on_migration_start<F>(mut self, callback: F) -> Self
where
F: Fn(u32, &str) + Send + Sync + 'static,
{
self.migrator.set_on_migration_start(callback);
self
}
pub fn on_migration_complete<F>(mut self, callback: F) -> Self
where
F: Fn(u32, &str, std::time::Duration) + Send + Sync + 'static,
{
self.migrator.set_on_migration_complete(callback);
self
}
pub fn on_migration_skipped<F>(mut self, callback: F) -> Self
where
F: Fn(u32, &str) + Send + Sync + 'static,
{
self.migrator.set_on_migration_skipped(callback);
self
}
pub fn on_migration_error<F>(mut self, callback: F) -> Self
where
F: Fn(u32, &str, &Error) + Send + Sync + 'static,
{
self.migrator.set_on_migration_error(callback);
self
}
pub fn migrations(&self) -> &[Box<dyn Migration>] {
&self.migrator.migrations
}
pub fn schema_version_table_name(&self) -> &str {
&self.migrator.schema_version_table_name
}
pub fn get_current_version(&self, conn: &mut Connection) -> Result<u32, Error> {
self.migrator.generic_get_current_version::<SqliteBackend>(conn)
}
pub fn get_migration_history(
&self,
conn: &mut Connection,
) -> Result<Vec<AppliedMigration>, Error> {
self.migrator.generic_get_migration_history::<SqliteBackend>(conn)
}
pub fn preview_upgrade(
&self,
conn: &mut Connection,
) -> Result<Vec<&Box<dyn Migration>>, Error> {
self.migrator.generic_preview_upgrade::<SqliteBackend>(conn)
}
pub fn preview_downgrade(
&self,
conn: &mut Connection,
target_version: u32,
) -> Result<Vec<&Box<dyn Migration>>, Error> {
self.migrator
.generic_preview_downgrade::<SqliteBackend>(conn, target_version)
}
pub fn upgrade_to(
&self,
conn: &mut Connection,
target_version: u32,
) -> Result<MigrationReport<'_>, Error> {
if target_version > 0
&& !self
.migrations()
.iter()
.any(|m| m.version() == target_version)
{
return Err(Error::Rusqlite(rusqlite::Error::InvalidParameterName(
format!(
"Target version {} does not exist in migration list",
target_version
),
)));
}
self.setup_concurrency_protection(conn)?;
self.migrator
.generic_upgrade::<SqliteBackend>(conn, Some(target_version))
}
pub fn upgrade(&self, conn: &mut Connection) -> Result<MigrationReport<'_>, Error> {
self.setup_concurrency_protection(conn)?;
self.migrator.generic_upgrade::<SqliteBackend>(conn, None)
}
pub fn downgrade(
&self,
conn: &mut Connection,
target_version: u32,
) -> Result<MigrationReport<'_>, Error> {
self.setup_concurrency_protection(conn)?;
self.migrator
.generic_downgrade::<SqliteBackend>(conn, target_version)
}
}
#[cfg(test)]
mod tests {
use rusqlite::Transaction;
use super::*;
#[test]
fn single_successful_from_clean() {
use chrono::{DateTime, FixedOffset};
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(
report,
MigrationReport {
schema_version_table_existed: false,
schema_version_table_created: true,
migrations_run: vec![1],
failing_migration: None,
}
);
let mut stmt = conn.prepare("SELECT * FROM _migratio_version_").unwrap();
let rows = stmt
.query_map([], |row| {
let version: u32 = row.get("version").unwrap();
let name: String = row.get("name").unwrap();
let applied_at: String = row.get("applied_at").unwrap();
Ok((version, name, applied_at))
})
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].0, 1); assert_eq!(rows[0].1, "Migration 1"); let date_string_raw = &rows[0].2;
let date = DateTime::parse_from_rfc3339(&date_string_raw).unwrap();
assert_eq!(date.timezone(), FixedOffset::east_opt(0).unwrap());
let now = Utc::now();
let diff = now.timestamp() - date.timestamp();
assert!(diff < 5);
}
#[test]
fn single_unsuccessful_from_clean() {
let mut conn = Connection::open_in_memory().unwrap();
conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])
.unwrap();
conn.execute("INSERT INTO test (id) VALUES (1)", [])
.unwrap();
conn.execute("INSERT INTO test (id) VALUES (2)", [])
.unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
tx.execute("INSERT INTO test2 (id) SELECT id FROM test", [])?;
tx.execute("DROP TABLE test", [])?;
tx.execute("bleep blorp", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(
report,
MigrationReport {
schema_version_table_existed: false,
schema_version_table_created: false,
migrations_run: vec![],
failing_migration: Some(MigrationFailure {
migration: &(Box::new(Migration1) as Box<dyn Migration>),
error: Error::Rusqlite(rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error {
code: rusqlite::ffi::ErrorCode::Unknown,
extended_code: 1
},
Some("near \"bleep\": syntax error".to_string())
))
})
}
);
let mut stmt = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table'")
.unwrap();
let mut tables = stmt
.query_map([], |x| {
let name: String = x.get(0)?;
Ok(name)
})
.unwrap()
.collect::<Result<Vec<String>, rusqlite::Error>>()
.unwrap();
tables.sort();
assert_eq!(
tables,
vec!["_migratio_version_".to_string(), "test".to_string()]
);
let mut stmt = conn.prepare("SELECT * FROM test").unwrap();
let rows = stmt
.query_map([], |x| {
let x: i64 = x.get(0)?;
Ok(x)
})
.unwrap()
.collect::<Result<Vec<i64>, rusqlite::Error>>()
.unwrap();
assert_eq!(rows, vec![1, 2]);
}
#[test]
fn upgrade_to_specific_version() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration3;
impl Migration for Migration3 {
fn version(&self) -> u32 {
3
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test3 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![
Box::new(Migration1),
Box::new(Migration2),
Box::new(Migration3),
]);
let report = migrator.upgrade_to(&mut conn, 2).unwrap();
assert_eq!(report.migrations_run, vec![1, 2]);
let version = migrator.get_current_version(&mut conn).unwrap();
assert_eq!(version, 2);
let table_count: i32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN ('test1', 'test2', 'test3')",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(table_count, 2);
let report = migrator.upgrade_to(&mut conn, 3).unwrap();
assert_eq!(report.migrations_run, vec![3]);
let version = migrator.get_current_version(&mut conn).unwrap();
assert_eq!(version, 3);
}
#[test]
fn upgrade_to_nonexistent_version() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let result = migrator.upgrade_to(&mut conn, 5);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("Target version 5 does not exist"));
}
#[test]
fn success_then_failure_from_clean() {
let mut conn = Connection::open_in_memory().unwrap();
conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])
.unwrap();
conn.execute("INSERT INTO test (id) VALUES (1)", [])
.unwrap();
conn.execute("INSERT INTO test (id) VALUES (2)", [])
.unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
tx.execute("INSERT INTO test2 (id) SELECT id FROM test", [])?;
tx.execute("DROP TABLE test", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test3 (id INTEGER PRIMARY KEY)", [])?;
tx.execute("INSERT INTO test3 (id) SELECT id FROM test2", [])?;
tx.execute("DROP TABLE test2", [])?;
tx.execute("bleep blorp", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(
report,
MigrationReport {
schema_version_table_existed: false,
schema_version_table_created: true,
migrations_run: vec![1],
failing_migration: Some(MigrationFailure {
migration: &(Box::new(Migration2) as Box<dyn Migration>),
error: Error::Rusqlite(rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error {
code: rusqlite::ffi::ErrorCode::Unknown,
extended_code: 1
},
Some("near \"bleep\": syntax error".to_string())
))
})
}
);
let mut stmt = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table'")
.unwrap();
let mut tables = stmt
.query_map([], |x| {
let name: String = x.get(0)?;
Ok(name)
})
.unwrap()
.collect::<Result<Vec<String>, rusqlite::Error>>()
.unwrap();
tables.sort();
assert_eq!(
tables,
vec!["_migratio_version_".to_string(), "test2".to_string()]
);
let mut stmt = conn.prepare("SELECT * FROM test2").unwrap();
let rows = stmt
.query_map([], |x| {
let x: i64 = x.get(0)?;
Ok(x)
})
.unwrap()
.collect::<Result<Vec<i64>, rusqlite::Error>>()
.unwrap();
assert_eq!(rows, vec![1, 2]);
}
#[test]
fn panic_in_migration_verify_state() {
let mut conn = Connection::open_in_memory().unwrap();
conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])
.unwrap();
conn.execute("INSERT INTO test (id) VALUES (1)", [])
.unwrap();
conn.execute("INSERT INTO test (id) VALUES (2)", [])
.unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
tx.execute("INSERT INTO test2 (id) SELECT id FROM test", [])?;
tx.execute("DROP TABLE test", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test3 (id INTEGER PRIMARY KEY)", [])?;
tx.execute("INSERT INTO test3 (id) SELECT id FROM test2", [])?;
tx.execute("DROP TABLE test2", [])?;
tx.execute("bleep blorp", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(
report,
MigrationReport {
schema_version_table_existed: false,
schema_version_table_created: true,
migrations_run: vec![1],
failing_migration: Some(MigrationFailure {
migration: &(Box::new(Migration2) as Box<dyn Migration>),
error: Error::Rusqlite(rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error {
code: rusqlite::ffi::ErrorCode::Unknown,
extended_code: 1
},
Some("near \"bleep\": syntax error".to_string())
))
})
}
);
let mut stmt = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table'")
.unwrap();
let mut tables = stmt
.query_map([], |x| {
let name: String = x.get(0)?;
Ok(name)
})
.unwrap()
.collect::<Result<Vec<String>, rusqlite::Error>>()
.unwrap();
tables.sort();
assert_eq!(
tables,
vec!["_migratio_version_".to_string(), "test2".to_string()]
);
let mut stmt = conn.prepare("SELECT * FROM test2").unwrap();
let rows = stmt
.query_map([], |x| {
let x: i64 = x.get(0)?;
Ok(x)
})
.unwrap()
.collect::<Result<Vec<i64>, rusqlite::Error>>()
.unwrap();
assert_eq!(rows, vec![1, 2]);
}
#[test]
fn panic_with_successful_prior_migration() {
use std::panic;
let mut conn = Connection::open_in_memory().unwrap();
conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])
.unwrap();
conn.execute("INSERT INTO test (id) VALUES (1)", [])
.unwrap();
conn.execute("INSERT INTO test (id) VALUES (2)", [])
.unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
tx.execute("INSERT INTO test2 (id) SELECT id FROM test", [])?;
tx.execute("DROP TABLE test", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test3 (id INTEGER PRIMARY KEY)", [])?;
tx.execute("INSERT INTO test3 (id) SELECT id FROM test2", [])?;
tx.execute("DROP TABLE test2", [])?;
panic!("Migration panic!");
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| migrator.upgrade(&mut conn)));
assert!(result.is_err());
let mut stmt = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table'")
.unwrap();
let mut tables = stmt
.query_map([], |x| {
let name: String = x.get(0)?;
Ok(name)
})
.unwrap()
.collect::<Result<Vec<String>, rusqlite::Error>>()
.unwrap();
tables.sort();
assert_eq!(
tables,
vec!["_migratio_version_".to_string(), "test2".to_string()]
);
let mut stmt = conn.prepare("SELECT * FROM test2").unwrap();
let rows = stmt
.query_map([], |x| {
let x: i64 = x.get(0)?;
Ok(x)
})
.unwrap()
.collect::<Result<Vec<i64>, rusqlite::Error>>()
.unwrap();
assert_eq!(rows, vec![1, 2]);
let mut stmt = conn
.prepare("SELECT version FROM _migratio_version_")
.unwrap();
let version: u32 = stmt.query_row([], |row| row.get(0)).unwrap();
assert_eq!(version, 1);
}
#[test]
fn incremental_migrations_different_applied_at() {
use std::thread;
use std::time::Duration;
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test1_table".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test2_table".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1, 2]);
let first_batch: Vec<(u32, String, String)> = {
let mut stmt = conn
.prepare(
"SELECT version, name, applied_at FROM _migratio_version_ ORDER BY version",
)
.unwrap();
stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap()
};
assert_eq!(first_batch.len(), 2);
assert_eq!(first_batch[0].0, 1);
assert_eq!(first_batch[0].1, "create_test1_table");
assert_eq!(first_batch[1].0, 2);
assert_eq!(first_batch[1].1, "create_test2_table");
assert_eq!(first_batch[0].2, first_batch[1].2);
let first_batch_timestamp = first_batch[0].2.clone();
thread::sleep(Duration::from_millis(2));
struct Migration3;
impl Migration for Migration3 {
fn version(&self) -> u32 {
3
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test3 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test3_table".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![
Box::new(Migration1),
Box::new(Migration2),
Box::new(Migration3),
]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![3]);
let all_migrations: Vec<(u32, String, String)> = {
let mut stmt = conn
.prepare(
"SELECT version, name, applied_at FROM _migratio_version_ ORDER BY version",
)
.unwrap();
stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap()
};
assert_eq!(all_migrations.len(), 3);
assert_eq!(all_migrations[0].0, 1);
assert_eq!(all_migrations[1].0, 2);
assert_eq!(all_migrations[2].0, 3);
assert_eq!(all_migrations[2].1, "create_test3_table");
assert_eq!(all_migrations[0].2, all_migrations[1].2);
assert_ne!(all_migrations[2].2, first_batch_timestamp);
}
#[test]
#[should_panic(expected = "Migration version must be greater than 0")]
fn new_rejects_zero_version() {
struct Migration0;
impl Migration for Migration0 {
fn version(&self) -> u32 {
0
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let _ = SqliteMigrator::new(vec![Box::new(Migration0)]);
}
#[test]
#[should_panic(expected = "Duplicate migration version found: 2")]
fn new_rejects_duplicate_versions() {
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2a;
impl Migration for Migration2a {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2b;
impl Migration for Migration2b {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let _ = SqliteMigrator::new(vec![
Box::new(Migration1),
Box::new(Migration2a),
Box::new(Migration2b),
]);
}
#[test]
fn new_accepts_non_starting_at_one() {
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration3;
impl Migration for Migration3 {
fn version(&self) -> u32 {
3
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration2), Box::new(Migration3)]);
assert_eq!(migrator.migrations().len(), 2);
assert_eq!(migrator.migrations()[0].version(), 2);
assert_eq!(migrator.migrations()[1].version(), 3);
}
#[test]
#[should_panic(expected = "Migration versions must be contiguous. Found gap between version 1 and 3")]
fn new_rejects_non_contiguous() {
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration3;
impl Migration for Migration3 {
fn version(&self) -> u32 {
3
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let _ = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration3)]);
}
#[test]
fn try_new_returns_ok_for_non_starting_at_one() {
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let result = SqliteMigrator::try_new(vec![Box::new(Migration2)]);
assert!(result.is_ok());
let migrator = result.unwrap();
assert_eq!(migrator.migrations().len(), 1);
assert_eq!(migrator.migrations()[0].version(), 2);
}
#[test]
fn try_new_returns_err_for_duplicate_versions() {
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2a;
impl Migration for Migration2a {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2b;
impl Migration for Migration2b {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let result = SqliteMigrator::try_new(vec![
Box::new(Migration1),
Box::new(Migration2a),
Box::new(Migration2b),
]);
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "Duplicate migration version found: 2");
}
#[test]
fn try_new_returns_ok_for_valid_migrations() {
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let result = SqliteMigrator::try_new(vec![Box::new(Migration1), Box::new(Migration2)]);
assert!(result.is_ok());
}
#[test]
fn checksum_validation_detects_modified_migration() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1V1;
impl Migration for Migration1V1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test_table".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1V1)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1]);
struct Migration1V2;
impl Migration for Migration1V2 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test_table_modified".to_string() }
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1V2)]);
let result = migrator.upgrade(&mut conn);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("checksum mismatch"));
assert!(err_msg.contains("Migration 1"));
}
#[test]
fn checksum_validation_passes_for_unmodified_migrations() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test1_table".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test2_table".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1]);
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![2]);
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![] as Vec<u32>);
}
#[test]
fn checksums_stored_in_database() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"my_migration".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
let mut stmt = conn
.prepare("SELECT checksum FROM _migratio_version_ WHERE version = 1")
.unwrap();
let checksum: String = stmt.query_row([], |row| row.get(0)).unwrap();
assert_eq!(checksum.len(), 64);
assert!(checksum.chars().all(|c| c.is_ascii_hexdigit()));
let migration = Box::new(Migration1) as Box<dyn Migration>;
let expected_checksum = GenericMigrator::calculate_checksum(&migration);
assert_eq!(checksum, expected_checksum);
}
#[test]
fn downgrade_single_migration() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1]);
{
let mut stmt = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='test'")
.unwrap();
assert!(stmt.query([]).unwrap().next().unwrap().is_some());
}
let report = migrator.downgrade(&mut conn, 0).unwrap();
assert_eq!(report.migrations_run, vec![1]);
{
let mut stmt = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='test'")
.unwrap();
assert!(stmt.query([]).unwrap().next().unwrap().is_none());
}
let mut stmt = conn
.prepare("SELECT COUNT(*) FROM _migratio_version_")
.unwrap();
let count: i64 = stmt.query_row([], |row| row.get(0)).unwrap();
assert_eq!(count, 0);
}
#[test]
fn downgrade_multiple_migrations() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test1", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test2", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration3;
impl Migration for Migration3 {
fn version(&self) -> u32 {
3
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test3 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test3", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![
Box::new(Migration1),
Box::new(Migration2),
Box::new(Migration3),
]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1, 2, 3]);
let report = migrator.downgrade(&mut conn, 1).unwrap();
assert_eq!(report.migrations_run, vec![3, 2]);
let mut stmt = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
.unwrap();
let tables: Vec<String> = stmt
.query_map([], |row| row.get(0))
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(tables, vec!["_migratio_version_", "test1"]);
let mut stmt = conn
.prepare("SELECT version FROM _migratio_version_ ORDER BY version")
.unwrap();
let versions: Vec<u32> = stmt
.query_map([], |row| row.get(0))
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(versions, vec![1]);
}
#[test]
fn downgrade_all_migrations() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test1", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test2", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1, 2]);
let report = migrator.downgrade(&mut conn, 0).unwrap();
assert_eq!(report.migrations_run, vec![2, 1]);
let mut stmt = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table'")
.unwrap();
let tables: Vec<String> = stmt
.query_map([], |row| row.get(0))
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(tables, vec!["_migratio_version_"]);
let mut stmt = conn
.prepare("SELECT COUNT(*) FROM _migratio_version_")
.unwrap();
let count: i64 = stmt.query_row([], |row| row.get(0)).unwrap();
assert_eq!(count, 0);
}
#[test]
fn downgrade_on_clean_database() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
fn sqlite_down(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let report = migrator.downgrade(&mut conn, 0).unwrap();
assert_eq!(report.migrations_run, vec![] as Vec<u32>);
assert!(!report.schema_version_table_existed);
}
#[test]
fn downgrade_with_invalid_target_version() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
let result = migrator.downgrade(&mut conn, 5);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("Cannot downgrade to version 5 when current version is 1"));
}
#[test]
#[should_panic(expected = "does not support downgrade")]
fn downgrade_panics_when_down_not_implemented() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
let _ = migrator.downgrade(&mut conn, 0);
}
#[test]
fn downgrade_validates_checksums() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1V1;
impl Migration for Migration1V1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test_table".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1V1)]);
migrator.upgrade(&mut conn).unwrap();
struct Migration1V2;
impl Migration for Migration1V2 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test_table_modified".to_string() }
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1V2)]);
let result = migrator.downgrade(&mut conn, 0);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("checksum mismatch"));
}
#[test]
fn get_current_version_on_clean_database() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let version = migrator.get_current_version(&mut conn).unwrap();
assert_eq!(version, 0);
}
#[test]
fn get_current_version_after_migrations() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
assert_eq!(migrator.get_current_version(&mut conn).unwrap(), 0);
migrator.upgrade(&mut conn).unwrap();
assert_eq!(migrator.get_current_version(&mut conn).unwrap(), 2);
}
#[test]
fn preview_upgrade_on_clean_database() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let pending = migrator.preview_upgrade(&mut conn).unwrap();
assert_eq!(pending.len(), 2);
assert_eq!(pending[0].version(), 1);
assert_eq!(pending[1].version(), 2);
}
#[test]
fn preview_upgrade_with_partial_migrations() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration3;
impl Migration for Migration3 {
fn version(&self) -> u32 {
3
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test3 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(Migration1),
Box::new(Migration2),
Box::new(Migration3),
]);
let pending = migrator.preview_upgrade(&mut conn).unwrap();
assert_eq!(pending.len(), 2);
assert_eq!(pending[0].version(), 2);
assert_eq!(pending[1].version(), 3);
}
#[test]
fn preview_upgrade_when_up_to_date() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
let pending = migrator.preview_upgrade(&mut conn).unwrap();
assert_eq!(pending.len(), 0);
}
#[test]
fn preview_downgrade_to_zero() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test1", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test2", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
migrator.upgrade(&mut conn).unwrap();
let to_rollback = migrator.preview_downgrade(&mut conn, 0).unwrap();
assert_eq!(to_rollback.len(), 2);
assert_eq!(to_rollback[0].version(), 2); assert_eq!(to_rollback[1].version(), 1);
}
#[test]
fn preview_downgrade_to_specific_version() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test1", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test2", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration3;
impl Migration for Migration3 {
fn version(&self) -> u32 {
3
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test3 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test3", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![
Box::new(Migration1),
Box::new(Migration2),
Box::new(Migration3),
]);
migrator.upgrade(&mut conn).unwrap();
let to_rollback = migrator.preview_downgrade(&mut conn, 1).unwrap();
assert_eq!(to_rollback.len(), 2);
assert_eq!(to_rollback[0].version(), 3); assert_eq!(to_rollback[1].version(), 2);
}
#[test]
fn preview_downgrade_on_clean_database() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
fn sqlite_down(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let to_rollback = migrator.preview_downgrade(&mut conn, 0).unwrap();
assert_eq!(to_rollback.len(), 0);
}
#[test]
fn preview_downgrade_with_invalid_target() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
let result = migrator.preview_downgrade(&mut conn, 5);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("Cannot downgrade to version 5 when current version is 1"));
}
#[test]
fn migration_failure_accessors() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test_table".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("INVALID SQL HERE", [])?;
Ok(())
}
fn name(&self) -> String {
"failing_migration".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert!(report.failing_migration.is_some());
let failure = report.failing_migration.as_ref().unwrap();
let migration = failure.migration();
assert_eq!(migration.version(), 2);
assert_eq!(migration.name(), "failing_migration");
let error = failure.error();
assert!(matches!(error, Error::Rusqlite(_)));
}
#[test]
fn migration_failure_can_be_pattern_matched() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("INVALID SQL", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let report = migrator.upgrade(&mut conn).unwrap();
match report.failing_migration {
Some(ref failure) => {
println!(
"Migration {} failed: {:?}",
failure.migration().version(),
failure.error()
);
assert_eq!(failure.migration().version(), 1);
}
None => panic!("Expected a failure"),
}
}
#[test]
fn get_migration_history_on_clean_database() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let history = migrator.get_migration_history(&mut conn).unwrap();
assert_eq!(history.len(), 0);
}
#[test]
fn get_migration_history_after_migrations() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test1_table".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test2_table".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
migrator.upgrade(&mut conn).unwrap();
let history = migrator.get_migration_history(&mut conn).unwrap();
assert_eq!(history.len(), 2);
assert_eq!(history[0].version, 1);
assert_eq!(history[0].name, "create_test1_table");
assert!(history[0].applied_at.timestamp() > 0);
assert_eq!(history[0].checksum.len(), 64);
assert_eq!(history[1].version, 2);
assert_eq!(history[1].name, "create_test2_table");
assert!(history[1].applied_at.timestamp() > 0);
assert_eq!(history[1].checksum.len(), 64);
assert_eq!(history[0].applied_at, history[1].applied_at);
}
#[test]
fn get_migration_history_shows_incremental_batches() {
use std::thread;
use std::time::Duration;
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"migration_one".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"migration_two".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
let history = migrator.get_migration_history(&mut conn).unwrap();
assert_eq!(history.len(), 1);
let first_timestamp = history[0].applied_at;
thread::sleep(Duration::from_millis(2));
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
migrator.upgrade(&mut conn).unwrap();
let history = migrator.get_migration_history(&mut conn).unwrap();
assert_eq!(history.len(), 2);
assert_eq!(history[0].applied_at, first_timestamp);
assert_ne!(history[1].applied_at, first_timestamp);
}
#[test]
fn get_migration_history_includes_checksums() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"test_migration".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
let history = migrator.get_migration_history(&mut conn).unwrap();
assert_eq!(history.len(), 1);
let migration = Box::new(Migration1) as Box<dyn Migration>;
let expected_checksum = GenericMigrator::calculate_checksum(&migration);
assert_eq!(history[0].checksum, expected_checksum);
}
#[test]
fn concurrent_migrations_are_safe() {
use std::sync::{Arc, Barrier};
use std::thread;
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().unwrap();
let db_path = temp_file.path().to_str().unwrap().to_string();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
std::thread::sleep(std::time::Duration::from_millis(10));
tx.execute("INSERT INTO test1 (id) VALUES (1)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test1", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
std::thread::sleep(std::time::Duration::from_millis(10));
tx.execute("INSERT INTO test2 (id) VALUES (1)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test2", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let barrier = Arc::new(Barrier::new(3));
let db_path_arc = Arc::new(db_path);
let handles: Vec<_> = (0..3)
.map(|i| {
let barrier = Arc::clone(&barrier);
let db_path = Arc::clone(&db_path_arc);
thread::spawn(move || {
barrier.wait();
let mut conn = Connection::open(db_path.as_str()).unwrap();
let migrator =
SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let result = migrator.upgrade(&mut conn);
assert!(result.is_ok(), "Thread {} failed: {:?}", i, result);
result.unwrap().migrations_run.len()
})
})
.collect();
let migrations_run_counts: Vec<_> =
handles.into_iter().map(|h| h.join().unwrap()).collect();
let two_migrations = migrations_run_counts.iter().filter(|&&c| c == 2).count();
let zero_migrations = migrations_run_counts.iter().filter(|&&c| c == 0).count();
assert_eq!(
two_migrations, 1,
"Exactly one thread should have run 2 migrations"
);
assert_eq!(
zero_migrations, 2,
"Two threads should have found db already migrated"
);
let mut conn = Connection::open(db_path_arc.as_str()).unwrap();
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let current_version = migrator.get_current_version(&mut conn).unwrap();
assert_eq!(current_version, 2);
let count1: i64 = conn
.query_row("SELECT COUNT(*) FROM test1", [], |row| row.get(0))
.unwrap();
let count2: i64 = conn
.query_row("SELECT COUNT(*) FROM test2", [], |row| row.get(0))
.unwrap();
assert_eq!(count1, 1);
assert_eq!(count2, 1);
}
#[test]
fn concurrent_upgrade_and_downgrade() {
use std::sync::{Arc, Barrier};
use std::thread;
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().unwrap();
let db_path = temp_file.path().to_str().unwrap().to_string();
{
let mut conn = Connection::open(&db_path).unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test1", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test2", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
migrator.upgrade(&mut conn).unwrap();
}
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test1", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test2", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let barrier = Arc::new(Barrier::new(4));
let db_path_arc = Arc::new(db_path);
let handles: Vec<_> = (0..4)
.map(|i| {
let barrier = Arc::clone(&barrier);
let db_path = Arc::clone(&db_path_arc);
thread::spawn(move || {
barrier.wait();
let mut conn = Connection::open(db_path.as_str()).unwrap();
let migrator =
SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let result = if i % 2 == 0 {
migrator.upgrade(&mut conn)
} else {
migrator.downgrade(&mut conn, 1)
};
assert!(result.is_ok(), "Thread {} got error: {:?}", i, result);
true
})
})
.collect();
for (i, handle) in handles.into_iter().enumerate() {
let result = handle.join();
assert!(result.is_ok(), "Thread {} panicked", i);
assert!(result.unwrap(), "Thread {} failed", i);
}
let mut conn = Connection::open(db_path_arc.as_str()).unwrap();
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
let current_version = migrator.get_current_version(&mut conn).unwrap();
assert!(
current_version == 1 || current_version == 2,
"Version should be 1 or 2, got {}",
current_version
);
}
#[test]
fn custom_busy_timeout() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().unwrap();
let db_path = temp_file.path().to_str().unwrap().to_string();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)])
.with_busy_timeout(std::time::Duration::from_secs(5));
let mut conn = Connection::open(&db_path).unwrap();
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1]);
let current_version = migrator.get_current_version(&mut conn).unwrap();
assert_eq!(current_version, 1);
}
#[test]
fn detects_missing_migration_in_code() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test1".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test2".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration3;
impl Migration for Migration3 {
fn version(&self) -> u32 {
3
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test3 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test3".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![
Box::new(Migration1),
Box::new(Migration2),
Box::new(Migration3),
]);
migrator.upgrade(&mut conn).unwrap();
conn.execute("DELETE FROM _migratio_version_ WHERE version = 2", [])
.unwrap();
conn.execute(
"INSERT INTO _migratio_version_ (version, name, applied_at, checksum) VALUES (4, 'deleted_migration', datetime('now'), 'fakechecksum')",
[],
)
.unwrap();
let result = migrator.upgrade(&mut conn);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("Migration 4"));
assert!(err_msg.contains("deleted_migration"));
assert!(err_msg.contains("was previously applied but is no longer present"));
}
#[test]
fn detects_orphaned_migration_added_late() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test1".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test2".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration3;
impl Migration for Migration3 {
fn version(&self) -> u32 {
3
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test3 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test3".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
let checksum = {
let migration = Box::new(Migration3) as Box<dyn Migration>;
GenericMigrator::calculate_checksum(&migration)
};
conn.execute(
"INSERT INTO _migratio_version_ (version, name, applied_at, checksum) VALUES (3, 'create_test3', datetime('now'), ?1)",
[&checksum],
)
.unwrap();
let migrator_all = SqliteMigrator::new(vec![
Box::new(Migration1),
Box::new(Migration2),
Box::new(Migration3),
]);
let result = migrator_all.upgrade(&mut conn);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("Migration 2"));
assert!(err_msg.contains("create_test2"));
assert!(err_msg.contains("exists in code but was not applied"));
assert!(err_msg.contains("later migrations are already applied"));
}
#[test]
fn detects_missing_migration_during_downgrade() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test1", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test1".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2;
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test2", [])?;
Ok(())
}
fn name(&self) -> String {
"create_test2".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1), Box::new(Migration2)]);
migrator.upgrade(&mut conn).unwrap();
conn.execute(
"INSERT INTO _migratio_version_ (version, name, applied_at, checksum) VALUES (3, 'orphaned_migration', datetime('now'), 'fakechecksum')",
[],
)
.unwrap();
let result = migrator.downgrade(&mut conn, 0);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("Migration 3"));
assert!(err_msg.contains("orphaned_migration"));
assert!(err_msg.contains("was previously applied but is no longer present"));
}
#[test]
fn hooks_are_called_on_successful_migration() {
use std::sync::{Arc, Mutex};
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"test_migration".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let start_calls = Arc::new(Mutex::new(Vec::new()));
let complete_calls = Arc::new(Mutex::new(Vec::new()));
let error_calls = Arc::new(Mutex::new(Vec::new()));
let start_calls_clone = Arc::clone(&start_calls);
let complete_calls_clone = Arc::clone(&complete_calls);
let error_calls_clone = Arc::clone(&error_calls);
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)])
.on_migration_start(move |version, name| {
start_calls_clone
.lock()
.unwrap()
.push((version, name.to_string()));
})
.on_migration_complete(move |version, name, duration| {
complete_calls_clone
.lock()
.unwrap()
.push((version, name.to_string(), duration));
})
.on_migration_error(move |version, name, error| {
error_calls_clone.lock().unwrap().push((
version,
name.to_string(),
format!("{:?}", error),
));
});
migrator.upgrade(&mut conn).unwrap();
let starts = start_calls.lock().unwrap();
assert_eq!(starts.len(), 1);
assert_eq!(starts[0], (1, "test_migration".to_string()));
let completes = complete_calls.lock().unwrap();
assert_eq!(completes.len(), 1);
assert_eq!(completes[0].0, 1);
assert_eq!(completes[0].1, "test_migration");
let _ = completes[0].2;
let errors = error_calls.lock().unwrap();
assert_eq!(errors.len(), 0); }
#[test]
fn hooks_are_called_on_failed_migration() {
use std::sync::{Arc, Mutex};
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("INVALID SQL", [])?;
Ok(())
}
fn name(&self) -> String {
"failing_migration".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let start_calls = Arc::new(Mutex::new(Vec::new()));
let complete_calls = Arc::new(Mutex::new(Vec::new()));
let error_calls = Arc::new(Mutex::new(Vec::new()));
let start_calls_clone = Arc::clone(&start_calls);
let complete_calls_clone = Arc::clone(&complete_calls);
let error_calls_clone = Arc::clone(&error_calls);
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)])
.on_migration_start(move |version, name| {
start_calls_clone
.lock()
.unwrap()
.push((version, name.to_string()));
})
.on_migration_complete(move |version, name, duration| {
complete_calls_clone
.lock()
.unwrap()
.push((version, name.to_string(), duration));
})
.on_migration_error(move |version, name, _error| {
error_calls_clone
.lock()
.unwrap()
.push((version, name.to_string()));
});
let _ = migrator.upgrade(&mut conn);
let starts = start_calls.lock().unwrap();
assert_eq!(starts.len(), 1);
assert_eq!(starts[0], (1, "failing_migration".to_string()));
let completes = complete_calls.lock().unwrap();
assert_eq!(completes.len(), 0);
let errors = error_calls.lock().unwrap();
assert_eq!(errors.len(), 1);
assert_eq!(errors[0], (1, "failing_migration".to_string()));
}
#[test]
fn hooks_are_called_on_downgrade() {
use std::sync::{Arc, Mutex};
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test", [])?;
Ok(())
}
fn name(&self) -> String {
"test_migration".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
let start_calls = Arc::new(Mutex::new(Vec::new()));
let complete_calls = Arc::new(Mutex::new(Vec::new()));
let start_calls_clone = Arc::clone(&start_calls);
let complete_calls_clone = Arc::clone(&complete_calls);
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)])
.on_migration_start(move |version, name| {
start_calls_clone
.lock()
.unwrap()
.push((version, name.to_string()));
})
.on_migration_complete(move |version, name, duration| {
complete_calls_clone
.lock()
.unwrap()
.push((version, name.to_string(), duration));
});
migrator.downgrade(&mut conn, 0).unwrap();
let starts = start_calls.lock().unwrap();
assert_eq!(starts.len(), 1);
assert_eq!(starts[0], (1, "test_migration".to_string()));
let completes = complete_calls.lock().unwrap();
assert_eq!(completes.len(), 1);
assert_eq!(completes[0].0, 1);
assert_eq!(completes[0].1, "test_migration");
}
#[test]
#[cfg(feature = "tracing")]
fn tracing_logs_successful_migration() {
use tracing_test::traced_test;
#[traced_test]
fn run_test() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn name(&self) -> String {
"test_migration".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
assert!(logs_contain("Starting migration"));
assert!(logs_contain("Migration completed successfully"));
assert!(logs_contain("duration_ms"));
}
run_test();
}
#[test]
#[cfg(feature = "tracing")]
fn tracing_logs_failed_migration() {
use tracing_test::traced_test;
#[traced_test]
fn run_test() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("INVALID SQL", [])?;
Ok(())
}
fn name(&self) -> String {
"failing_migration".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let _ = migrator.upgrade(&mut conn);
assert!(logs_contain("Starting migration"));
assert!(logs_contain("Migration failed"));
}
run_test();
}
#[test]
#[cfg(feature = "tracing")]
fn tracing_logs_downgrade() {
use tracing_test::traced_test;
#[traced_test]
fn run_test() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE test", [])?;
Ok(())
}
fn name(&self) -> String {
"test_migration".to_string()
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
migrator.upgrade(&mut conn).unwrap();
migrator.downgrade(&mut conn, 0).unwrap();
assert!(logs_contain("Rolling back migration"));
assert!(logs_contain("Migration rolled back successfully"));
}
run_test();
}
#[test]
fn precondition_already_satisfied() {
use std::sync::{Arc, Mutex};
let mut conn = Connection::open_in_memory().unwrap();
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY)", [])
.unwrap();
let up_called = Arc::new(Mutex::new(false));
let up_called_clone = Arc::clone(&up_called);
struct Migration1 {
up_called: Arc<Mutex<bool>>,
}
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
*self.up_called.lock().unwrap() = true;
tx.execute("CREATE TABLE users (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_precondition(&self, tx: &Transaction) -> Result<Precondition, Error> {
let mut stmt = tx.prepare(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='users'",
)?;
let count: i64 = stmt.query_row([], |row| row.get(0))?;
if count > 0 {
Ok(Precondition::AlreadySatisfied)
} else {
Ok(Precondition::NeedsApply)
}
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1 {
up_called: up_called_clone,
})]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1]);
assert!(report.failing_migration.is_none());
assert!(!*up_called.lock().unwrap());
let version = migrator.get_current_version(&mut conn).unwrap();
assert_eq!(version, 1);
let table_exists: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='users'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(table_exists, 1);
}
#[test]
fn precondition_needs_apply() {
use std::sync::{Arc, Mutex};
let mut conn = Connection::open_in_memory().unwrap();
let up_called = Arc::new(Mutex::new(false));
let up_called_clone = Arc::clone(&up_called);
struct Migration1 {
up_called: Arc<Mutex<bool>>,
}
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
*self.up_called.lock().unwrap() = true;
tx.execute("CREATE TABLE users (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_precondition(&self, tx: &Transaction) -> Result<Precondition, Error> {
let mut stmt = tx.prepare(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='users'",
)?;
let count: i64 = stmt.query_row([], |row| row.get(0))?;
if count > 0 {
Ok(Precondition::AlreadySatisfied)
} else {
Ok(Precondition::NeedsApply)
}
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1 {
up_called: up_called_clone,
})]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1]);
assert!(report.failing_migration.is_none());
assert!(*up_called.lock().unwrap());
let version = migrator.get_current_version(&mut conn).unwrap();
assert_eq!(version, 1);
let table_exists: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='users'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(table_exists, 1);
}
#[test]
fn precondition_error() {
let mut conn = Connection::open_in_memory().unwrap();
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
Ok(())
}
fn sqlite_precondition(&self, _tx: &Transaction) -> Result<Precondition, Error> {
Err(rusqlite::Error::InvalidQuery.into())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, Vec::<u32>::new());
assert!(report.failing_migration.is_some());
let failure = report.failing_migration.unwrap();
assert_eq!(failure.migration().version(), 1);
let version = migrator.get_current_version(&mut conn).unwrap();
assert_eq!(version, 0);
}
#[test]
fn precondition_hooks_already_satisfied() {
use std::sync::{Arc, Mutex};
let mut conn = Connection::open_in_memory().unwrap();
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY)", [])
.unwrap();
let events = Arc::new(Mutex::new(Vec::new()));
let events_clone1 = Arc::clone(&events);
let events_clone2 = Arc::clone(&events);
let events_clone3 = Arc::clone(&events);
let events_clone4 = Arc::clone(&events);
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> {
panic!("up() should not be called when precondition is AlreadySatisfied");
}
fn sqlite_precondition(&self, tx: &Transaction) -> Result<Precondition, Error> {
let mut stmt = tx.prepare(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='users'",
)?;
let count: i64 = stmt.query_row([], |row| row.get(0))?;
if count > 0 {
Ok(Precondition::AlreadySatisfied)
} else {
Ok(Precondition::NeedsApply)
}
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)])
.on_migration_start(move |version, name| {
events_clone1
.lock()
.unwrap()
.push(format!("start:{}:{}", version, name));
})
.on_migration_skipped(move |version, name| {
events_clone2
.lock()
.unwrap()
.push(format!("skipped:{}:{}", version, name));
})
.on_migration_complete(move |version, name, _duration| {
events_clone3
.lock()
.unwrap()
.push(format!("complete:{}:{}", version, name));
})
.on_migration_error(move |version, name, _error| {
events_clone4
.lock()
.unwrap()
.push(format!("error:{}:{}", version, name));
});
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1]);
assert!(report.failing_migration.is_none());
let events_vec = events.lock().unwrap();
assert_eq!(events_vec.len(), 2);
assert_eq!(events_vec[0], "start:1:Migration 1");
assert_eq!(events_vec[1], "skipped:1:Migration 1");
}
#[test]
fn precondition_hooks_needs_apply() {
use std::sync::{Arc, Mutex};
let mut conn = Connection::open_in_memory().unwrap();
let events = Arc::new(Mutex::new(Vec::new()));
let events_clone1 = Arc::clone(&events);
let events_clone2 = Arc::clone(&events);
let events_clone3 = Arc::clone(&events);
let events_clone4 = Arc::clone(&events);
struct Migration1;
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE users (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_precondition(&self, tx: &Transaction) -> Result<Precondition, Error> {
let mut stmt = tx.prepare(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='users'",
)?;
let count: i64 = stmt.query_row([], |row| row.get(0))?;
if count > 0 {
Ok(Precondition::AlreadySatisfied)
} else {
Ok(Precondition::NeedsApply)
}
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![Box::new(Migration1)])
.on_migration_start(move |version, name| {
events_clone1
.lock()
.unwrap()
.push(format!("start:{}:{}", version, name));
})
.on_migration_skipped(move |version, name| {
events_clone2
.lock()
.unwrap()
.push(format!("skipped:{}:{}", version, name));
})
.on_migration_complete(move |version, name, _duration| {
events_clone3
.lock()
.unwrap()
.push(format!("complete:{}:{}", version, name));
})
.on_migration_error(move |version, name, _error| {
events_clone4
.lock()
.unwrap()
.push(format!("error:{}:{}", version, name));
});
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1]);
assert!(report.failing_migration.is_none());
let events_vec = events.lock().unwrap();
assert_eq!(events_vec.len(), 2);
assert_eq!(events_vec[0], "start:1:Migration 1");
assert_eq!(events_vec[1], "complete:1:Migration 1");
}
#[test]
fn precondition_mixed_migrations() {
use std::sync::{Arc, Mutex};
let mut conn = Connection::open_in_memory().unwrap();
conn.execute("CREATE TABLE table1 (id INTEGER PRIMARY KEY)", [])
.unwrap();
let up_calls = Arc::new(Mutex::new(Vec::new()));
let up_calls_clone1 = Arc::clone(&up_calls);
let up_calls_clone2 = Arc::clone(&up_calls);
let up_calls_clone3 = Arc::clone(&up_calls);
struct Migration1 {
up_calls: Arc<Mutex<Vec<u32>>>,
}
impl Migration for Migration1 {
fn version(&self) -> u32 {
1
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
self.up_calls.lock().unwrap().push(1);
tx.execute("CREATE TABLE table1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_precondition(&self, tx: &Transaction) -> Result<Precondition, Error> {
let mut stmt = tx.prepare(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='table1'",
)?;
let count: i64 = stmt.query_row([], |row| row.get(0))?;
Ok(if count > 0 {
Precondition::AlreadySatisfied
} else {
Precondition::NeedsApply
})
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration2 {
up_calls: Arc<Mutex<Vec<u32>>>,
}
impl Migration for Migration2 {
fn version(&self) -> u32 {
2
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
self.up_calls.lock().unwrap().push(2);
tx.execute("CREATE TABLE table2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_precondition(&self, tx: &Transaction) -> Result<Precondition, Error> {
let mut stmt = tx.prepare(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='table2'",
)?;
let count: i64 = stmt.query_row([], |row| row.get(0))?;
Ok(if count > 0 {
Precondition::AlreadySatisfied
} else {
Precondition::NeedsApply
})
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
struct Migration3 {
up_calls: Arc<Mutex<Vec<u32>>>,
}
impl Migration for Migration3 {
fn version(&self) -> u32 {
3
}
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
self.up_calls.lock().unwrap().push(3);
tx.execute("CREATE TABLE table3 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_precondition(&self, tx: &Transaction) -> Result<Precondition, Error> {
let mut stmt = tx.prepare(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='table3'",
)?;
let count: i64 = stmt.query_row([], |row| row.get(0))?;
Ok(if count > 0 {
Precondition::AlreadySatisfied
} else {
Precondition::NeedsApply
})
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> {
Ok(())
}
}
let migrator = SqliteMigrator::new(vec![
Box::new(Migration1 {
up_calls: up_calls_clone1,
}),
Box::new(Migration2 {
up_calls: up_calls_clone2,
}),
Box::new(Migration3 {
up_calls: up_calls_clone3,
}),
]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1, 2, 3]);
assert!(report.failing_migration.is_none());
let calls = up_calls.lock().unwrap();
assert_eq!(*calls, vec![2, 3]);
let table_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN ('table1', 'table2', 'table3')",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(table_count, 3);
let version = migrator.get_current_version(&mut conn).unwrap();
assert_eq!(version, 3);
}
fn table_exists(conn: &Connection, table_name: &str) -> bool {
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
[table_name],
|row| row.get(0),
)
.unwrap();
count > 0
}
fn insert_version_row_old(conn: &Connection, version: u32, name: &str) {
conn.execute(
"INSERT INTO _migratio_version_ (version, name, applied_at, checksum) VALUES (?1, ?2, '2024-01-01T00:00:00+00:00', 'old_checksum')",
rusqlite::params![version, name],
).unwrap();
}
fn insert_version_row(conn: &Connection, version: u32, name: &str, checksum: &str, migration_type: &str) {
conn.execute(
"INSERT INTO _migratio_version_ (version, name, applied_at, checksum, migration_type) VALUES (?1, ?2, '2024-01-01T00:00:00+00:00', ?3, ?4)",
rusqlite::params![version, name, checksum, migration_type],
).unwrap();
}
fn create_version_table(conn: &Connection) {
conn.execute(
"CREATE TABLE _migratio_version_ (version integer primary key not null, name text not null, applied_at text not null, checksum text not null, migration_type text not null default 'migration')",
[],
).unwrap();
}
fn create_version_table_old(conn: &Connection) {
conn.execute(
"CREATE TABLE _migratio_version_ (version integer primary key not null, name text not null, applied_at text not null, checksum text not null)",
[],
).unwrap();
}
struct BaselineV5;
impl Migration for BaselineV5 {
fn version(&self) -> u32 { 5 }
fn name(&self) -> String { "baseline_v5".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t5 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t5", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct BaselineV6;
impl Migration for BaselineV6 {
fn version(&self) -> u32 { 6 }
fn name(&self) -> String { "baseline_v6".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t6 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t6", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct BaselineV7;
impl Migration for BaselineV7 {
fn version(&self) -> u32 { 7 }
fn name(&self) -> String { "baseline_v7".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t7 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t7", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
#[test]
fn test_baseline_versions_starting_at_5() {
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![5, 6, 7]);
assert!(report.failing_migration.is_none());
let history = migrator.get_migration_history(&mut conn).unwrap();
assert_eq!(history.len(), 3);
assert_eq!(history[0].version, 5);
assert_eq!(history[0].migration_type, MigrationType::Baseline);
assert_eq!(history[1].version, 6);
assert_eq!(history[1].migration_type, MigrationType::Migration);
assert_eq!(history[2].version, 7);
assert_eq!(history[2].migration_type, MigrationType::Migration);
}
#[test]
fn test_versions_with_gap_rejected() {
struct M5;
impl Migration for M5 {
fn version(&self) -> u32 { 5 }
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> { Ok(()) }
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M6;
impl Migration for M6 {
fn version(&self) -> u32 { 6 }
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> { Ok(()) }
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M8;
impl Migration for M8 {
fn version(&self) -> u32 { 8 }
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> { Ok(()) }
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
let result = SqliteMigrator::try_new(vec![Box::new(M5), Box::new(M6), Box::new(M8)]);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.contains("gap"), "expected gap error, got: {}", err);
}
#[test]
fn test_single_migration_at_version_10() {
struct M10;
impl Migration for M10 {
fn version(&self) -> u32 { 10 }
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> { Ok(()) }
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
let result = SqliteMigrator::try_new(vec![Box::new(M10)]);
assert!(result.is_ok());
let migrator = result.unwrap();
assert_eq!(migrator.migrations()[0].version(), 10);
}
#[test]
fn test_baseline_fresh_db_upgrade() {
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![5, 6, 7]);
assert!(report.failing_migration.is_none());
let history = migrator.get_migration_history(&mut conn).unwrap();
assert_eq!(history[0].migration_type, MigrationType::Baseline);
assert_eq!(history[1].migration_type, MigrationType::Migration);
assert_eq!(history[2].migration_type, MigrationType::Migration);
assert!(table_exists(&conn, "t5"));
assert!(table_exists(&conn, "t6"));
assert!(table_exists(&conn, "t7"));
}
#[test]
fn test_baseline_fresh_db_creates_correct_schema() {
struct FullSchemaBaseline;
impl Migration for FullSchemaBaseline {
fn version(&self) -> u32 { 5 }
fn name(&self) -> String { "full_schema_baseline".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE users (id INTEGER PRIMARY KEY)", [])?;
tx.execute("CREATE TABLE orders (id INTEGER PRIMARY KEY)", [])?;
tx.execute("CREATE TABLE products (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE users", [])?;
tx.execute("DROP TABLE orders", [])?;
tx.execute("DROP TABLE products", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![Box::new(FullSchemaBaseline)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![5]);
assert!(table_exists(&conn, "users"));
assert!(table_exists(&conn, "orders"));
assert!(table_exists(&conn, "products"));
}
#[test]
fn test_baseline_db_at_baseline_version() {
let mut conn = Connection::open_in_memory().unwrap();
create_version_table_old(&conn);
for v in 1..=5u32 {
insert_version_row_old(&conn, v, &format!("migration_{}", v));
}
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![6, 7]);
assert!(report.failing_migration.is_none());
let mut stmt = conn.prepare("SELECT COUNT(*) FROM _migratio_version_ WHERE version < 5").unwrap();
let count: i64 = stmt.query_row([], |row| row.get(0)).unwrap();
assert_eq!(count, 4);
assert!(table_exists(&conn, "t6"));
assert!(table_exists(&conn, "t7"));
assert!(!table_exists(&conn, "t5"));
}
#[test]
fn test_baseline_checksum_mismatch_ignored_for_baseline_version() {
let mut conn = Connection::open_in_memory().unwrap();
create_version_table(&conn);
insert_version_row(&conn, 5, "old_baseline_name", "totally_wrong_checksum", "migration");
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![6, 7]);
assert!(report.failing_migration.is_none());
}
#[test]
fn test_baseline_db_above_baseline() {
let mut conn = Connection::open_in_memory().unwrap();
struct M1; impl Migration for M1 {
fn version(&self) -> u32 { 1 }
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> { Ok(()) }
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M2; impl Migration for M2 {
fn version(&self) -> u32 { 2 }
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> { Ok(()) }
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M3; impl Migration for M3 {
fn version(&self) -> u32 { 3 }
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> { Ok(()) }
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M4; impl Migration for M4 {
fn version(&self) -> u32 { 4 }
fn sqlite_up(&self, _tx: &Transaction) -> Result<(), Error> { Ok(()) }
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
let setup_migrator = SqliteMigrator::new(vec![
Box::new(M1), Box::new(M2), Box::new(M3), Box::new(M4),
Box::new(BaselineV5), Box::new(BaselineV6),
]);
setup_migrator.upgrade(&mut conn).unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![7]);
assert!(report.failing_migration.is_none());
}
#[test]
fn test_baseline_db_fully_migrated() {
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
migrator.upgrade(&mut conn).unwrap();
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![] as Vec<u32>);
assert!(report.failing_migration.is_none());
}
#[test]
fn test_baseline_db_below_baseline_errors() {
let mut conn = Connection::open_in_memory().unwrap();
create_version_table(&conn);
for v in 1..=3u32 {
insert_version_row(&conn, v, &format!("migration_{}", v), "some_checksum", "migration");
}
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
let result = migrator.upgrade(&mut conn);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("below baseline"), "expected 'below baseline', got: {}", err_msg);
}
#[test]
fn test_baseline_old_rows_not_flagged_as_orphans() {
let mut conn = Connection::open_in_memory().unwrap();
create_version_table(&conn);
for v in 1..=5u32 {
insert_version_row(&conn, v, &format!("migration_{}", v), "old_checksum", "migration");
}
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
let result = migrator.upgrade(&mut conn);
assert!(result.is_ok(), "expected Ok, got: {:?}", result);
let report = result.unwrap();
assert_eq!(report.migrations_run, vec![6, 7]);
}
#[test]
fn test_baseline_checksum_mismatch_above_baseline_still_errors() {
let mut conn = Connection::open_in_memory().unwrap();
create_version_table(&conn);
insert_version_row(&conn, 5, "baseline_v5", "any_checksum", "baseline");
insert_version_row(&conn, 6, "baseline_v6", "totally_wrong_checksum_for_v6", "migration");
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
let result = migrator.upgrade(&mut conn);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("checksum mismatch"), "expected checksum mismatch, got: {}", err_msg);
assert!(err_msg.contains("6"), "expected version 6 in error, got: {}", err_msg);
}
#[test]
fn test_baseline_downgrade_to_baseline() {
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
migrator.upgrade(&mut conn).unwrap();
let report = migrator.downgrade(&mut conn, 5).unwrap();
assert_eq!(report.migrations_run, vec![7, 6]);
assert!(report.failing_migration.is_none());
let version = migrator.get_current_version(&mut conn).unwrap();
assert_eq!(version, 5);
assert!(table_exists(&conn, "t5"));
assert!(!table_exists(&conn, "t6"));
assert!(!table_exists(&conn, "t7"));
}
#[test]
fn test_baseline_downgrade_below_baseline_errors() {
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
migrator.upgrade(&mut conn).unwrap();
let result = migrator.downgrade(&mut conn, 3);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(
err_msg.contains("Cannot downgrade below baseline"),
"expected 'Cannot downgrade below baseline', got: {}",
err_msg
);
}
#[test]
fn test_baseline_downgrade_to_zero_errors() {
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
migrator.upgrade(&mut conn).unwrap();
let result = migrator.downgrade(&mut conn, 0);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(
err_msg.contains("Cannot downgrade below baseline"),
"expected 'Cannot downgrade below baseline', got: {}",
err_msg
);
}
#[test]
fn test_baseline_preview_upgrade_fresh_db() {
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
let pending = migrator.preview_upgrade(&mut conn).unwrap();
assert_eq!(pending.len(), 3);
assert_eq!(pending[0].version(), 5);
assert_eq!(pending[1].version(), 6);
assert_eq!(pending[2].version(), 7);
}
#[test]
fn test_baseline_preview_upgrade_at_baseline() {
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
migrator.upgrade_to(&mut conn, 5).unwrap();
let pending = migrator.preview_upgrade(&mut conn).unwrap();
assert_eq!(pending.len(), 2);
assert_eq!(pending[0].version(), 6);
assert_eq!(pending[1].version(), 7);
}
#[test]
fn test_baseline_preview_downgrade() {
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
migrator.upgrade(&mut conn).unwrap();
let to_rollback = migrator.preview_downgrade(&mut conn, 5).unwrap();
assert_eq!(to_rollback.len(), 2);
assert_eq!(to_rollback[0].version(), 7); assert_eq!(to_rollback[1].version(), 6);
}
#[test]
fn test_baseline_preview_downgrade_below_baseline_errors() {
let mut conn = Connection::open_in_memory().unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]);
migrator.upgrade(&mut conn).unwrap();
let result = migrator.preview_downgrade(&mut conn, 3);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(
err_msg.contains("Cannot downgrade below baseline"),
"expected 'Cannot downgrade below baseline', got: {}",
err_msg
);
}
#[test]
fn test_multi_rebaseline() {
let mut conn = Connection::open_in_memory().unwrap();
struct M3; impl Migration for M3 {
fn version(&self) -> u32 { 3 }
fn name(&self) -> String { "m3".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t3 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t3", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M4; impl Migration for M4 {
fn version(&self) -> u32 { 4 }
fn name(&self) -> String { "m4".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t4 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t4", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M5Baseline; impl Migration for M5Baseline {
fn version(&self) -> u32 { 5 }
fn name(&self) -> String { "m5_baseline".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t3_b (id INTEGER PRIMARY KEY)", [])?;
tx.execute("CREATE TABLE t4_b (id INTEGER PRIMARY KEY)", [])?;
tx.execute("CREATE TABLE t5_b (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t3_b", [])?;
tx.execute("DROP TABLE t4_b", [])?;
tx.execute("DROP TABLE t5_b", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M5Orig; impl Migration for M5Orig {
fn version(&self) -> u32 { 5 }
fn name(&self) -> String { "m5_orig".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t5_orig (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t5_orig", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M6; impl Migration for M6 {
fn version(&self) -> u32 { 6 }
fn name(&self) -> String { "m6".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t6_r (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t6_r", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M7; impl Migration for M7 {
fn version(&self) -> u32 { 7 }
fn name(&self) -> String { "m7".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t7_r (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t7_r", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M7Baseline; impl Migration for M7Baseline {
fn version(&self) -> u32 { 7 }
fn name(&self) -> String { "m7_baseline".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE full_schema_7 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE full_schema_7", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M8; impl Migration for M8 {
fn version(&self) -> u32 { 8 }
fn name(&self) -> String { "m8".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t8_r (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t8_r", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M9; impl Migration for M9 {
fn version(&self) -> u32 { 9 }
fn name(&self) -> String { "m9".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE t9_r (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE t9_r", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
let migrator1 = SqliteMigrator::new(vec![
Box::new(M3), Box::new(M4), Box::new(M5Orig),
]);
let report1 = migrator1.upgrade(&mut conn).unwrap();
assert_eq!(report1.migrations_run, vec![3, 4, 5]);
let history1 = migrator1.get_migration_history(&mut conn).unwrap();
assert_eq!(history1[0].version, 3);
assert_eq!(history1[0].migration_type, MigrationType::Baseline);
assert_eq!(history1[1].migration_type, MigrationType::Migration);
assert_eq!(history1[2].migration_type, MigrationType::Migration);
let migrator2 = SqliteMigrator::new(vec![
Box::new(M5Baseline), Box::new(M6), Box::new(M7),
]);
let report2 = migrator2.upgrade(&mut conn).unwrap();
assert_eq!(report2.migrations_run, vec![6, 7]);
assert!(report2.failing_migration.is_none());
let migrator3 = SqliteMigrator::new(vec![
Box::new(M7Baseline), Box::new(M8), Box::new(M9),
]);
let report3 = migrator3.upgrade(&mut conn).unwrap();
assert_eq!(report3.migrations_run, vec![8, 9]);
assert!(report3.failing_migration.is_none());
let final_version = migrator3.get_current_version(&mut conn).unwrap();
assert_eq!(final_version, 9);
let row_count: i64 = conn
.query_row("SELECT COUNT(*) FROM _migratio_version_", [], |row| row.get(0))
.unwrap();
assert_eq!(row_count, 7);
}
#[test]
fn test_migration_type_column_added_to_existing_table() {
struct M1; impl Migration for M1 {
fn version(&self) -> u32 { 1 }
fn name(&self) -> String { "original_migration".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE legacy_stuff (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M2; impl Migration for M2 {
fn version(&self) -> u32 { 2 }
fn name(&self) -> String { "new_migration".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE new_stuff (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
let mut conn = Connection::open_in_memory().unwrap();
create_version_table_old(&conn);
let m1_checksum = {
let m: Box<dyn Migration> = Box::new(M1);
GenericMigrator::calculate_checksum(&m)
};
conn.execute(
"INSERT INTO _migratio_version_ (version, name, applied_at, checksum) VALUES (1, 'original_migration', '2024-01-01T00:00:00+00:00', ?1)",
rusqlite::params![m1_checksum],
).unwrap();
let has_migration_type_before: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('_migratio_version_') WHERE name='migration_type'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(has_migration_type_before, 0);
let migrator = SqliteMigrator::new(vec![Box::new(M1), Box::new(M2)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![2]);
let has_migration_type: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('_migratio_version_') WHERE name='migration_type'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(has_migration_type, 1);
let migration_type: String = conn
.query_row(
"SELECT migration_type FROM _migratio_version_ WHERE version = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(migration_type, "migration");
}
#[test]
fn test_no_baseline_backward_compatible() {
let mut conn = Connection::open_in_memory().unwrap();
struct M1; impl Migration for M1 {
fn version(&self) -> u32 { 1 }
fn name(&self) -> String { "m1".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE bc_t1 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE bc_t1", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M2; impl Migration for M2 {
fn version(&self) -> u32 { 2 }
fn name(&self) -> String { "m2".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE bc_t2 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE bc_t2", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M3; impl Migration for M3 {
fn version(&self) -> u32 { 3 }
fn name(&self) -> String { "m3".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE bc_t3 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_down(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("DROP TABLE bc_t3", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
let migrator = SqliteMigrator::new(vec![Box::new(M1), Box::new(M2), Box::new(M3)]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![1, 2, 3]);
let report = migrator.downgrade(&mut conn, 0).unwrap();
assert_eq!(report.migrations_run, vec![3, 2, 1]);
let migrator2 = SqliteMigrator::new(vec![Box::new(M1), Box::new(M2), Box::new(M3)]);
migrator2.upgrade(&mut conn).unwrap();
let history = migrator2.get_migration_history(&mut conn).unwrap();
for entry in &history {
assert_eq!(entry.migration_type, MigrationType::Migration);
}
let report = migrator2.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![] as Vec<u32>);
}
#[test]
fn test_baseline_callbacks_fire() {
use std::sync::{Arc, Mutex};
let mut conn = Connection::open_in_memory().unwrap();
let started = Arc::new(Mutex::new(Vec::<u32>::new()));
let completed = Arc::new(Mutex::new(Vec::<u32>::new()));
let started_clone = Arc::clone(&started);
let completed_clone = Arc::clone(&completed);
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
])
.on_migration_start(move |version, _name| {
started_clone.lock().unwrap().push(version);
})
.on_migration_complete(move |version, _name, _duration| {
completed_clone.lock().unwrap().push(version);
});
migrator.upgrade(&mut conn).unwrap();
let started_vec = started.lock().unwrap();
let completed_vec = completed.lock().unwrap();
assert_eq!(*started_vec, vec![5, 6, 7]);
assert_eq!(*completed_vec, vec![5, 6, 7]);
}
#[test]
#[cfg(feature = "testing")]
fn test_harness_baseline_migrate_to() {
use crate::testing::sqlite::SqliteTestHarness;
let mut harness = SqliteTestHarness::new(SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]));
harness.migrate_to(5).unwrap();
assert_eq!(harness.current_version().unwrap(), 5);
harness.migrate_to(7).unwrap();
assert_eq!(harness.current_version().unwrap(), 7);
let result = harness.migrate_to(3);
assert!(result.is_err(), "expected error for version 3 below baseline");
}
#[test]
#[cfg(feature = "testing")]
fn test_harness_baseline_migrate_up_one() {
use crate::testing::sqlite::SqliteTestHarness;
let mut harness = SqliteTestHarness::new(SqliteMigrator::new(vec![
Box::new(BaselineV5),
Box::new(BaselineV6),
Box::new(BaselineV7),
]));
harness.migrate_up_one().unwrap();
assert_eq!(harness.current_version().unwrap(), 5);
harness.migrate_up_one().unwrap();
assert_eq!(harness.current_version().unwrap(), 6);
}
#[test]
fn test_baseline_precondition_already_satisfied_records_as_baseline() {
struct BaselineWithPrecondition;
impl Migration for BaselineWithPrecondition {
fn version(&self) -> u32 { 5 }
fn name(&self) -> String { "baseline_with_precondition".to_string() }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE precond_t (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
fn sqlite_precondition(&self, tx: &Transaction) -> Result<Precondition, Error> {
let count: i64 = tx.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='precond_t'",
[],
|row| row.get(0),
)?;
if count > 0 {
Ok(Precondition::AlreadySatisfied)
} else {
Ok(Precondition::NeedsApply)
}
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
struct M6;
impl Migration for M6 {
fn version(&self) -> u32 { 6 }
fn sqlite_up(&self, tx: &Transaction) -> Result<(), Error> {
tx.execute("CREATE TABLE precond_t6 (id INTEGER PRIMARY KEY)", [])?;
Ok(())
}
#[cfg(feature = "mysql")]
fn mysql_up(&self, _conn: &mut mysql::Conn) -> Result<(), Error> { Ok(()) }
}
let mut conn = Connection::open_in_memory().unwrap();
conn.execute("CREATE TABLE precond_t (id INTEGER PRIMARY KEY)", []).unwrap();
let migrator = SqliteMigrator::new(vec![
Box::new(BaselineWithPrecondition),
Box::new(M6),
]);
let report = migrator.upgrade(&mut conn).unwrap();
assert_eq!(report.migrations_run, vec![5, 6]);
assert!(report.failing_migration.is_none());
let history = migrator.get_migration_history(&mut conn).unwrap();
assert_eq!(history.len(), 2);
assert_eq!(history[0].version, 5);
assert_eq!(
history[0].migration_type,
MigrationType::Baseline,
"baseline migration with AlreadySatisfied precondition must still be recorded as BASELINE"
);
assert_eq!(history[1].version, 6);
assert_eq!(history[1].migration_type, MigrationType::Migration);
}
}