use std::path::{Path, PathBuf};
pub use downgrade::Downgrade;
#[allow(clippy::module_name_repetitions)]
pub use error::Error as MigrationError;
pub use schema_version::SchemaVersion;
pub(crate) use sql::Sql;
pub use upgrade::Upgrade;
use crate::{
log, query::state, response::mapping::Mapping, tracing, Connection, Query, RequestBuilder,
Response, Value,
};
mod downgrade;
mod embed;
mod error;
mod schema_version;
mod sql;
mod upgrade;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct M<'a>(pub Upgrade<'a>, pub Option<Downgrade<'a>>);
impl M<'_> {
#[must_use]
#[inline]
pub fn as_tuple(&'_ self) -> Mtuple<'_> {
(&self.0, self.1.as_ref())
}
}
pub(crate) type Mtuple<'a> = (&'a Upgrade<'a>, Option<&'a Downgrade<'a>>);
impl<'a> From<&'a M<'a>> for Mtuple<'a> {
fn from(value: &'a M<'a>) -> Self {
value.as_tuple()
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Migration<'a, T>
where
T: RequestBuilder<state::NoLevelMulti>,
{
migrations: Vec<M<'a>>,
request_builder: Option<T>,
}
impl<'a, T> Migration<'a, T>
where
T: RequestBuilder<state::NoLevelMulti>,
{
#[cfg(feature = "migration_embed")]
#[cfg(not(feature = "ureq"))]
#[must_use]
#[inline]
pub fn from_embed<S>() -> Self
where
S: rust_embed::RustEmbed,
{
Self::new(embed::migrations::<S>())
}
#[cfg(not(feature = "ureq"))]
#[inline]
pub fn from_path<'p, P>(path: P) -> Self
where
P: Into<&'p Path>,
{
Self::new(Self::migrations(path))
}
#[cfg(not(feature = "ureq"))]
#[inline]
pub fn new<S>(migrations: S) -> Self
where
S: Into<Vec<M<'a>>>,
{
Self {
migrations: migrations.into(),
request_builder: None,
}
}
#[inline]
pub fn migrate(&self, connection: &Connection) -> Result<SchemaVersion, MigrationError> {
self.migrate_to(connection, None::<&SchemaVersion>)
}
pub fn migrate_to(
&self,
connection: &Connection,
to_version: Option<&SchemaVersion>,
) -> Result<SchemaVersion, MigrationError> {
if self.migrations.is_empty() {
return Err(MigrationError::NoData);
}
if self.request_builder.is_none() {
return Err(MigrationError::NoRequestBuilder);
}
let rb = self
.request_builder
.as_ref()
.expect("no request_builder checked and found");
let db_version = Self::pragma_user_version(connection, rb)?;
let mut query = connection.execute().enable_transaction();
let mut version = SchemaVersion::default();
for (upgrade, _) in self.migrations.iter().map(Mtuple::from) {
if let Some(to_version) = to_version.filter(|v| &version > *v) {
let _ = to_version;
log::trace!("db_version: {db_version} - migrated to version {to_version}");
tracing::trace!("db_version: {db_version} - migrated to version {to_version}");
break;
} else if db_version <= version {
log::debug!("db_version: {db_version} migrating: {version}");
tracing::debug!("db_version: {db_version} migrating: {version}");
for line in upgrade.lines() {
let line = line.trim();
if let Some(first_char) = &line.chars().next() {
if !['#', ';', '/', '-'].contains(first_char) {
let v = Value::from(line);
query = query.push_sql(v);
}
}
}
} else {
log::trace!("db_version: {db_version} - already migrated with version {version}");
tracing::trace!(
"db_version: {db_version} - already migrated with version {version}"
);
}
version += 1;
}
if let Some(to_version) = to_version {
version = version.checked_sub(1).unwrap_or_default();
if version != *to_version {
return Err(MigrationError::DataMalformat(format!(
"no migration {to_version}"
)));
}
if version < db_version {
version = db_version;
}
}
Self::run_n_set_pragma_user_version(rb, query, version)?;
log::info!("migrated to version {version}");
tracing::info!("migrated to version {version}");
Ok(version)
}
#[must_use]
#[inline]
pub fn pop(mut self) -> Option<M<'a>> {
self.migrations.pop()
}
fn pragma_user_version(
connection: &Connection,
rb: &T,
) -> Result<SchemaVersion, MigrationError> {
let query = connection.query().push_sql_str("PRAGMA user_version");
#[allow(irrefutable_let_patterns)]
let Response::Query(r) = rb
.run(&query)
.map_err(|err| MigrationError::try_from(err).unwrap_err())?
else {
return Err(MigrationError::Internal("query_response required"));
};
let mut db_version = None;
for (index, result) in r.results().enumerate() {
match result {
Mapping::Error(err) => {
return Err(MigrationError::QueryFail(format!(
"{} - {}",
err.error,
query.sql()[index]
)));
}
Mapping::Standard(standard) => {
db_version = standard
.value(0, 0)
.and_then(Value::as_u64)
.map(SchemaVersion);
}
_ => return Err(MigrationError::QueryFail("result not handled".to_string())),
}
if db_version.is_some() {
break;
}
}
db_version.ok_or(MigrationError::QueryFail("no schema version".to_string()))
}
#[must_use]
#[inline]
pub fn push<'m: 'a>(mut self, migration: M<'m>) -> Self {
self.migrations.push(migration);
self
}
pub fn rollback_to(
&self,
connection: &Connection,
to_version: &SchemaVersion,
) -> Result<SchemaVersion, MigrationError> {
if self.migrations.is_empty() {
return Err(MigrationError::NoData);
}
if self.request_builder.is_none() {
return Err(MigrationError::NoRequestBuilder);
}
let rb = self
.request_builder
.as_ref()
.expect("request_builder checked and found");
let db_version = Self::pragma_user_version(connection, rb)?;
if *to_version >= db_version {
return Err(MigrationError::DataMalformat(format!(
"no rollback {to_version}"
)));
}
let mut query = connection.execute().enable_transaction();
let mut version = db_version;
for (_, downgrade) in self.migrations.iter().rev().map(Mtuple::from) {
if let Some(downgrade) = downgrade {
if version > *to_version {
version = version.checked_sub(1).unwrap_or_default();
log::debug!("db_version: {db_version} rollback: {version}");
tracing::debug!("db_version: {db_version} rollback: {version}");
for line in downgrade.lines() {
let line = line.trim();
if let Some(first_char) = &line.chars().next() {
if !['#', ';', '/', '-'].contains(first_char) {
let v = Value::from(line);
query = query.push_sql(v);
}
}
}
}
} else {
version = version.checked_sub(1).unwrap_or_default();
log::debug!("db_version: {db_version} rollback: {version}");
tracing::debug!("db_version: {db_version} rollback: {version}");
continue;
}
}
Self::run_n_set_pragma_user_version(rb, query, version)?;
log::info!("rollback to version {version}");
tracing::info!("rollback to version {version}");
Ok(version)
}
fn run_n_set_pragma_user_version(
rb: &T,
query: Query<'_, state::NoLevelMulti>,
version: SchemaVersion,
) -> Result<(), MigrationError> {
let query = query.push_sql_str(&format!("PRAGMA user_version={version}"));
#[allow(irrefutable_let_patterns)]
let Response::Query(r) = rb
.run(&query)
.map_err(|err| MigrationError::try_from(err).unwrap_err())?
else {
return Err(MigrationError::Internal("query response required"));
};
for (index, result) in r.results().enumerate() {
if let Mapping::Error(err) = result {
return Err(MigrationError::QueryFail(format!(
"{} - {}",
err.error,
query.sql()[index]
)));
}
}
Ok(())
}
#[cfg(not(feature = "ureq"))]
#[must_use]
#[inline]
pub fn set_request_builder(mut self, builder: T) -> Self {
self.request_builder = Some(builder);
self
}
#[must_use]
#[inline]
pub fn truncate(mut self, len: usize) -> Self {
self.migrations.truncate(len);
self
}
fn migrations<'p, P>(path: P) -> Vec<M<'a>>
where
P: Into<&'p Path>,
{
let mut migrations = Vec::new();
if let Ok(migration_files) = Self::migration_files(path) {
for (upgrade_path, downgrade_path) in migration_files {
if let Ok(upgrade_sql) = Sql::try_from(upgrade_path.as_path()) {
if let Some(downgrade_path) = downgrade_path {
migrations
.push(M(upgrade_sql, Sql::try_from(downgrade_path.as_path()).ok()));
} else {
migrations.push(M(upgrade_sql, None));
}
}
}
}
migrations
}
fn migration_files<'p, P>(path: P) -> Result<Vec<(PathBuf, Option<PathBuf>)>, MigrationError>
where
P: Into<&'p Path>,
{
let mut entries = std::fs::read_dir(path.into())
.map_err(|_err| MigrationError::NoData)?
.filter_map(|dir| {
if let Ok(dir) = dir {
if dir.path().is_dir() {
let upgrade = dir.path().join("upgrade.sql");
if upgrade.is_file() {
let downgrade = dir.path().join("downgrade.sql");
if downgrade.is_file() {
return Some((upgrade, Some(downgrade)));
}
return Some((upgrade, None));
}
}
}
None
})
.collect::<Vec<(PathBuf, Option<PathBuf>)>>();
entries.sort();
Ok(entries)
}
}
impl<'a, T> std::ops::Add for Migration<'a, T>
where
T: RequestBuilder<state::NoLevelMulti> + Clone,
{
type Output = Migration<'a, T>;
fn add(self, rhs: Self) -> Self::Output {
let mut m = self.clone(); m.migrations.extend(rhs.migrations);
m
}
}
impl<T> std::ops::AddAssign for Migration<'_, T>
where
T: RequestBuilder<state::NoLevelMulti>,
{
fn add_assign(&mut self, rhs: Self) {
self.migrations.extend(rhs.migrations);
}
}
#[cfg(feature = "ureq")]
impl<'a> Migration<'a, crate::Request<crate::request_type::Post>> {
#[cfg(feature = "migration_embed")]
#[must_use]
#[inline]
pub fn from_embed<S>() -> Self
where
S: rust_embed::RustEmbed,
{
Self::new(embed::migrations::<S>())
}
#[inline]
pub fn from_path<'p, P>(path: P) -> Self
where
P: Into<&'p Path>,
{
Self::new(Self::migrations(path))
}
#[inline]
pub fn new<S>(migrations: S) -> Self
where
S: Into<Vec<M<'a>>>,
{
Self {
migrations: migrations.into(),
request_builder: Some(crate::Request::<crate::request_type::Post>::new()),
}
}
}
#[cfg(feature = "ureq")]
impl<'a> Default for Migration<'a, crate::Request<crate::request_type::Post>> {
fn default() -> Self {
Self {
migrations: Vec::new(),
request_builder: Some(crate::Request::<crate::request_type::Post>::new()),
}
}
}
#[cfg(feature = "ureq")]
impl<'a> From<M<'a>> for Migration<'a, crate::Request<crate::request_type::Post>> {
fn from(value: M<'a>) -> Self {
Self::new(vec![value])
}
}
#[cfg(feature = "ureq")]
impl<'a> From<&'a M<'a>> for Migration<'a, crate::Request<crate::request_type::Post>> {
fn from(value: &'a M<'a>) -> Self {
Self::new(vec![value.clone()])
}
}
#[cfg(not(feature = "ureq"))]
impl<'a, T> Default for Migration<'a, T>
where
T: RequestBuilder<state::NoLevelMulti>,
{
fn default() -> Self {
Self {
migrations: Vec::new(),
request_builder: None,
}
}
}
#[cfg(not(feature = "ureq"))]
impl<'a, T> From<M<'a>> for Migration<'a, T>
where
T: RequestBuilder<state::NoLevelMulti>,
{
fn from(value: M<'a>) -> Self {
Self::new(vec![value])
}
}
#[cfg(not(feature = "ureq"))]
impl<'a, T> From<&'a M<'a>> for Migration<'a, T>
where
T: RequestBuilder<state::NoLevelMulti>,
{
fn from(value: &'a M<'a>) -> Self {
Self::new(vec![value.clone()])
}
}
#[cfg(feature = "ureq")]
impl<'p, P> From<P> for Migration<'_, crate::Request<crate::request_type::Post>>
where
P: Into<&'p Path>,
{
fn from(path: P) -> Self {
Self::new(Self::migrations(path))
}
}
#[cfg(not(feature = "ureq"))]
impl<'p, P, T> From<P> for Migration<'_, T>
where
P: Into<&'p Path>,
T: RequestBuilder<state::NoLevelMulti>,
{
fn from(path: P) -> Self {
Self::new(Self::migrations(path))
}
}