1use std::future::Future;
22use std::io;
23use std::time::Duration;
24
25use futures_util::TryFutureExt;
26
27use sqlx::AnyConnection;
28use tokio::{select, signal};
29
30use crate::opt::{Command, ConnectOpts, DatabaseCommand, MigrateCommand, OverrideCommand};
31
32pub mod database;
33pub mod metadata;
34#[cfg(feature = "completions")]
37pub mod completions;
38pub mod migrate;
39pub mod opt;
40pub mod prepare;
41
42pub use crate::opt::Opt;
43
44pub use sqlx::_unstable::config::{self, Config};
45
46pub fn maybe_apply_dotenv() {
48 if std::env::args().any(|arg| arg == "--no-dotenv") {
49 return;
50 }
51
52 if let Err(e) = dotenvy::dotenv() {
53 if !e.not_found() {
54 eprintln!("Warning: error loading `.env` file: {e:?}");
55 }
56 }
57}
58
59pub async fn run(opt: Opt) -> anyhow::Result<()> {
60 let ctrlc_fut = signal::ctrl_c();
66 let do_run_fut = do_run(opt);
67
68 select! {
69 biased;
70 _ = ctrlc_fut => {
71 Ok(())
72 },
73 do_run_outcome = do_run_fut => {
74 do_run_outcome
75 }
76 }
77}
78
79async fn do_run(opt: Opt) -> anyhow::Result<()> {
80 match opt.command {
81 Command::Migrate(migrate) => match migrate.command {
82 MigrateCommand::Add(opts) => migrate::add(opts).await?,
83 MigrateCommand::Run {
84 source,
85 config,
86 dry_run,
87 ignore_missing,
88 mut connect_opts,
89 target_version,
90 } => {
91 let config = config.load_config().await?;
92
93 connect_opts.populate_db_url(&config)?;
94
95 migrate::run(
96 &config,
97 &source,
98 &connect_opts,
99 dry_run,
100 *ignore_missing,
101 target_version,
102 false,
103 )
104 .await?
105 }
106 MigrateCommand::Revert {
107 source,
108 config,
109 dry_run,
110 ignore_missing,
111 mut connect_opts,
112 target_version,
113 } => {
114 let config = config.load_config().await?;
115
116 connect_opts.populate_db_url(&config)?;
117
118 migrate::revert(
119 &config,
120 &source,
121 &connect_opts,
122 dry_run,
123 *ignore_missing,
124 target_version,
125 )
126 .await?
127 }
128 MigrateCommand::Override { command } => match command {
129 OverrideCommand::Skip {
130 source,
131 config,
132 mut connect_opts,
133 dry_run,
134 ignore_missing,
135 target_version,
136 } => {
137 let config = config.load_config().await?;
138 connect_opts.populate_db_url(&config)?;
139
140 migrate::run(
141 &config,
142 &source,
143 &connect_opts,
144 dry_run,
145 *ignore_missing,
146 target_version,
147 true,
148 )
149 .await?
150 }
151 },
152 MigrateCommand::Info {
153 source,
154 config,
155 mut connect_opts,
156 } => {
157 let config = config.load_config().await?;
158
159 connect_opts.populate_db_url(&config)?;
160
161 migrate::info(&config, &source, &connect_opts).await?
162 }
163 MigrateCommand::BuildScript {
164 source,
165 config,
166 force,
167 } => {
168 let config = config.load_config().await?;
169
170 migrate::build_script(&config, &source, force)?
171 }
172 },
173
174 Command::Database(database) => match database.command {
175 DatabaseCommand::Create {
176 config,
177 mut connect_opts,
178 } => {
179 let config = config.load_config().await?;
180
181 connect_opts.populate_db_url(&config)?;
182 database::create(&connect_opts).await?
183 }
184 DatabaseCommand::Drop {
185 confirmation,
186 config,
187 mut connect_opts,
188 force,
189 } => {
190 let config = config.load_config().await?;
191
192 connect_opts.populate_db_url(&config)?;
193 database::drop(&connect_opts, !confirmation.yes, force).await?
194 }
195 DatabaseCommand::Reset {
196 confirmation,
197 source,
198 config,
199 mut connect_opts,
200 force,
201 } => {
202 let config = config.load_config().await?;
203
204 connect_opts.populate_db_url(&config)?;
205 database::reset(&config, &source, &connect_opts, !confirmation.yes, force).await?
206 }
207 DatabaseCommand::Setup {
208 source,
209 config,
210 mut connect_opts,
211 } => {
212 let config = config.load_config().await?;
213
214 connect_opts.populate_db_url(&config)?;
215 database::setup(&config, &source, &connect_opts).await?
216 }
217 },
218
219 Command::Prepare {
220 check,
221 all,
222 workspace,
223 mut connect_opts,
224 args,
225 config,
226 } => {
227 let config = config.load_config().await?;
228 connect_opts.populate_db_url(&config)?;
229 prepare::run(&config, check, all, workspace, connect_opts, args).await?
230 }
231
232 #[cfg(feature = "completions")]
233 Command::Completions { shell } => completions::run(shell),
234 };
235
236 Ok(())
237}
238
239async fn connect(config: &Config, opts: &ConnectOpts) -> anyhow::Result<AnyConnection> {
241 retry_connect_errors(opts, move |url| {
242 AnyConnection::connect_with_driver_config(url, &config.drivers)
243 })
244 .await
245}
246
247async fn retry_connect_errors<'a, F, Fut, T>(
252 opts: &'a ConnectOpts,
253 mut connect: F,
254) -> anyhow::Result<T>
255where
256 F: FnMut(&'a str) -> Fut,
257 Fut: Future<Output = sqlx::Result<T>> + 'a,
258{
259 let db_url = opts.expect_db_url()?;
260
261 backoff::future::retry(
262 backoff::ExponentialBackoffBuilder::new()
263 .with_max_elapsed_time(Some(Duration::from_secs(opts.connect_timeout)))
264 .build(),
265 || {
266 connect(db_url).map_err(|e| -> backoff::Error<anyhow::Error> {
267 if let sqlx::Error::Io(ref ioe) = e {
268 match ioe.kind() {
269 io::ErrorKind::ConnectionRefused
270 | io::ErrorKind::ConnectionReset
271 | io::ErrorKind::ConnectionAborted => {
272 return backoff::Error::transient(e.into());
273 }
274 _ => (),
275 }
276 }
277
278 backoff::Error::permanent(e.into())
279 })
280 },
281 )
282 .await
283}