use std::io;
use std::time::Duration;
use anyhow::Result;
use futures::{Future, TryFutureExt};
use sqlx::{AnyConnection, Connection};
use crate::opt::{Command, ConnectOpts, DatabaseCommand, MigrateCommand};
mod database;
mod metadata;
#[cfg(feature = "completions")]
mod completions;
mod migrate;
mod opt;
mod prepare;
pub use crate::opt::Opt;
pub async fn run(opt: Opt) -> Result<()> {
match opt.command {
Command::Migrate(migrate) => match migrate.command {
MigrateCommand::Add {
source,
description,
reversible,
} => migrate::add(&source, &description, reversible).await?,
MigrateCommand::Run {
source,
dry_run,
ignore_missing,
connect_opts,
} => migrate::run(&source, &connect_opts, dry_run, *ignore_missing).await?,
MigrateCommand::Revert {
source,
dry_run,
ignore_missing,
connect_opts,
} => migrate::revert(&source, &connect_opts, dry_run, *ignore_missing).await?,
MigrateCommand::Info {
source,
connect_opts,
} => migrate::info(&source, &connect_opts).await?,
MigrateCommand::BuildScript { source, force } => migrate::build_script(&source, force)?,
},
Command::Database(database) => match database.command {
DatabaseCommand::Create { connect_opts } => database::create(&connect_opts).await?,
DatabaseCommand::Drop {
confirmation,
connect_opts,
} => database::drop(&connect_opts, !confirmation.yes).await?,
DatabaseCommand::Reset {
confirmation,
source,
connect_opts,
} => database::reset(&source, &connect_opts, !confirmation.yes).await?,
DatabaseCommand::Setup {
source,
connect_opts,
} => database::setup(&source, &connect_opts).await?,
},
Command::Prepare {
check,
workspace,
connect_opts,
args,
} => prepare::run(check, workspace, connect_opts, args).await?,
#[cfg(feature = "completions")]
Command::Completions { shell } => completions::run(shell),
};
Ok(())
}
async fn connect(opts: &ConnectOpts) -> sqlx::Result<AnyConnection> {
retry_connect_errors(opts, AnyConnection::connect).await
}
async fn retry_connect_errors<'a, F, Fut, T>(
opts: &'a ConnectOpts,
mut connect: F,
) -> sqlx::Result<T>
where
F: FnMut(&'a str) -> Fut,
Fut: Future<Output = sqlx::Result<T>> + 'a,
{
sqlx::any::install_default_drivers();
backoff::future::retry(
backoff::ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(Duration::from_secs(opts.connect_timeout)))
.build(),
|| {
connect(&opts.database_url).map_err(|e| -> backoff::Error<sqlx::Error> {
match e {
sqlx::Error::Io(ref ioe) => match ioe.kind() {
io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionReset
| io::ErrorKind::ConnectionAborted => {
return backoff::Error::transient(e);
}
_ => (),
},
_ => (),
}
backoff::Error::permanent(e)
})
},
)
.await
}