1use std::io;
2use std::time::Duration;
3
4use anyhow::Result;
5use futures::{Future, TryFutureExt};
6
7use sqlx::{AnyConnection, Connection};
8use tokio::{select, signal};
9
10use crate::opt::{Command, ConnectOpts, DatabaseCommand, MigrateCommand};
11
12mod database;
13mod metadata;
14#[cfg(feature = "completions")]
17mod completions;
18mod migrate;
19mod opt;
20mod prepare;
21
22pub use crate::opt::Opt;
23
24pub async fn run(opt: Opt) -> Result<()> {
25 let ctrlc_fut = signal::ctrl_c();
31 let do_run_fut = do_run(opt);
32
33 select! {
34 biased;
35 _ = ctrlc_fut => {
36 Ok(())
37 },
38 do_run_outcome = do_run_fut => {
39 do_run_outcome
40 }
41 }
42}
43
44async fn do_run(opt: Opt) -> Result<()> {
45 match opt.command {
46 Command::Migrate(migrate) => match migrate.command {
47 MigrateCommand::Add {
48 source,
49 description,
50 reversible,
51 sequential,
52 timestamp,
53 } => migrate::add(&source, &description, reversible, sequential, timestamp).await?,
54 MigrateCommand::Run {
55 source,
56 dry_run,
57 ignore_missing,
58 connect_opts,
59 target_version,
60 } => {
61 migrate::run(
62 &source,
63 &connect_opts,
64 dry_run,
65 *ignore_missing,
66 target_version,
67 )
68 .await?
69 }
70 MigrateCommand::Revert {
71 source,
72 dry_run,
73 ignore_missing,
74 connect_opts,
75 target_version,
76 } => {
77 migrate::revert(
78 &source,
79 &connect_opts,
80 dry_run,
81 *ignore_missing,
82 target_version,
83 )
84 .await?
85 }
86 MigrateCommand::Info {
87 source,
88 connect_opts,
89 } => migrate::info(&source, &connect_opts).await?,
90 MigrateCommand::BuildScript { source, force } => migrate::build_script(&source, force)?,
91 },
92
93 Command::Database(database) => match database.command {
94 DatabaseCommand::Create { connect_opts } => database::create(&connect_opts).await?,
95 DatabaseCommand::Drop {
96 confirmation,
97 connect_opts,
98 force,
99 } => database::drop(&connect_opts, !confirmation.yes, force).await?,
100 DatabaseCommand::Reset {
101 confirmation,
102 source,
103 connect_opts,
104 force,
105 } => database::reset(&source, &connect_opts, !confirmation.yes, force).await?,
106 DatabaseCommand::Setup {
107 source,
108 connect_opts,
109 } => database::setup(&source, &connect_opts).await?,
110 },
111
112 Command::Prepare {
113 check,
114 all,
115 workspace,
116 connect_opts,
117 args,
118 } => prepare::run(check, all, workspace, connect_opts, args).await?,
119
120 #[cfg(feature = "completions")]
121 Command::Completions { shell } => completions::run(shell),
122 };
123
124 Ok(())
125}
126
127async fn connect(opts: &ConnectOpts) -> anyhow::Result<AnyConnection> {
129 retry_connect_errors(opts, AnyConnection::connect).await
130}
131
132async fn retry_connect_errors<'a, F, Fut, T>(
137 opts: &'a ConnectOpts,
138 mut connect: F,
139) -> anyhow::Result<T>
140where
141 F: FnMut(&'a str) -> Fut,
142 Fut: Future<Output = sqlx::Result<T>> + 'a,
143{
144 sqlx::any::install_default_drivers();
145
146 let db_url = opts.required_db_url()?;
147
148 backoff::future::retry(
149 backoff::ExponentialBackoffBuilder::new()
150 .with_max_elapsed_time(Some(Duration::from_secs(opts.connect_timeout)))
151 .build(),
152 || {
153 connect(db_url).map_err(|e| -> backoff::Error<anyhow::Error> {
154 if let sqlx::Error::Io(ref ioe) = e {
155 match ioe.kind() {
156 io::ErrorKind::ConnectionRefused
157 | io::ErrorKind::ConnectionReset
158 | io::ErrorKind::ConnectionAborted => {
159 return backoff::Error::transient(e.into());
160 }
161 _ => (),
162 }
163 }
164
165 backoff::Error::permanent(e.into())
166 })
167 },
168 )
169 .await
170}