1use std::future::Future;
22use std::io;
23use std::time::Duration;
24
25use futures_util::TryFutureExt;
26
27use sqlx::AnyConnection;
28use tokio::{select, signal};
29
30use crate::opt::{Command, ConnectOpts, DatabaseCommand, MigrateCommand};
31
32pub mod database;
33pub mod metadata;
34#[cfg(feature = "completions")]
37pub mod completions;
38pub mod migrate;
39pub mod opt;
40pub mod prepare;
41
42pub use crate::opt::Opt;
43
44pub use sqlx::_unstable::config::{self, Config};
45
46pub fn maybe_apply_dotenv() {
48 if std::env::args().any(|arg| arg == "--no-dotenv") {
49 return;
50 }
51
52 if let Err(e) = dotenvy::dotenv() {
53 if !e.not_found() {
54 eprintln!("Warning: error loading `.env` file: {e:?}");
55 }
56 }
57}
58
59pub async fn run(opt: Opt) -> anyhow::Result<()> {
60 let ctrlc_fut = signal::ctrl_c();
66 let do_run_fut = do_run(opt);
67
68 select! {
69 biased;
70 _ = ctrlc_fut => {
71 Ok(())
72 },
73 do_run_outcome = do_run_fut => {
74 do_run_outcome
75 }
76 }
77}
78
79async fn do_run(opt: Opt) -> anyhow::Result<()> {
80 match opt.command {
81 Command::Migrate(migrate) => match migrate.command {
82 MigrateCommand::Add(opts) => migrate::add(opts).await?,
83 MigrateCommand::Run {
84 source,
85 config,
86 dry_run,
87 ignore_missing,
88 mut connect_opts,
89 target_version,
90 } => {
91 let config = config.load_config().await?;
92
93 connect_opts.populate_db_url(&config)?;
94
95 migrate::run(
96 &config,
97 &source,
98 &connect_opts,
99 dry_run,
100 *ignore_missing,
101 target_version,
102 )
103 .await?
104 }
105 MigrateCommand::Revert {
106 source,
107 config,
108 dry_run,
109 ignore_missing,
110 mut connect_opts,
111 target_version,
112 } => {
113 let config = config.load_config().await?;
114
115 connect_opts.populate_db_url(&config)?;
116
117 migrate::revert(
118 &config,
119 &source,
120 &connect_opts,
121 dry_run,
122 *ignore_missing,
123 target_version,
124 )
125 .await?
126 }
127 MigrateCommand::Info {
128 source,
129 config,
130 mut connect_opts,
131 } => {
132 let config = config.load_config().await?;
133
134 connect_opts.populate_db_url(&config)?;
135
136 migrate::info(&config, &source, &connect_opts).await?
137 }
138 MigrateCommand::BuildScript {
139 source,
140 config,
141 force,
142 } => {
143 let config = config.load_config().await?;
144
145 migrate::build_script(&config, &source, force)?
146 }
147 },
148
149 Command::Database(database) => match database.command {
150 DatabaseCommand::Create {
151 config,
152 mut connect_opts,
153 } => {
154 let config = config.load_config().await?;
155
156 connect_opts.populate_db_url(&config)?;
157 database::create(&connect_opts).await?
158 }
159 DatabaseCommand::Drop {
160 confirmation,
161 config,
162 mut connect_opts,
163 force,
164 } => {
165 let config = config.load_config().await?;
166
167 connect_opts.populate_db_url(&config)?;
168 database::drop(&connect_opts, !confirmation.yes, force).await?
169 }
170 DatabaseCommand::Reset {
171 confirmation,
172 source,
173 config,
174 mut connect_opts,
175 force,
176 } => {
177 let config = config.load_config().await?;
178
179 connect_opts.populate_db_url(&config)?;
180 database::reset(&config, &source, &connect_opts, !confirmation.yes, force).await?
181 }
182 DatabaseCommand::Setup {
183 source,
184 config,
185 mut connect_opts,
186 } => {
187 let config = config.load_config().await?;
188
189 connect_opts.populate_db_url(&config)?;
190 database::setup(&config, &source, &connect_opts).await?
191 }
192 },
193
194 Command::Prepare {
195 check,
196 all,
197 workspace,
198 mut connect_opts,
199 args,
200 config,
201 } => {
202 let config = config.load_config().await?;
203 connect_opts.populate_db_url(&config)?;
204 prepare::run(&config, check, all, workspace, connect_opts, args).await?
205 }
206
207 #[cfg(feature = "completions")]
208 Command::Completions { shell } => completions::run(shell),
209 };
210
211 Ok(())
212}
213
214async fn connect(config: &Config, opts: &ConnectOpts) -> anyhow::Result<AnyConnection> {
216 retry_connect_errors(opts, move |url| {
217 AnyConnection::connect_with_driver_config(url, &config.drivers)
218 })
219 .await
220}
221
222async fn retry_connect_errors<'a, F, Fut, T>(
227 opts: &'a ConnectOpts,
228 mut connect: F,
229) -> anyhow::Result<T>
230where
231 F: FnMut(&'a str) -> Fut,
232 Fut: Future<Output = sqlx::Result<T>> + 'a,
233{
234 let db_url = opts.expect_db_url()?;
235
236 backoff::future::retry(
237 backoff::ExponentialBackoffBuilder::new()
238 .with_max_elapsed_time(Some(Duration::from_secs(opts.connect_timeout)))
239 .build(),
240 || {
241 connect(db_url).map_err(|e| -> backoff::Error<anyhow::Error> {
242 if let sqlx::Error::Io(ref ioe) = e {
243 match ioe.kind() {
244 io::ErrorKind::ConnectionRefused
245 | io::ErrorKind::ConnectionReset
246 | io::ErrorKind::ConnectionAborted => {
247 return backoff::Error::transient(e.into());
248 }
249 _ => (),
250 }
251 }
252
253 backoff::Error::permanent(e.into())
254 })
255 },
256 )
257 .await
258}