sqlx_cli/
lib.rs

1use std::io;
2use std::time::Duration;
3
4use anyhow::Result;
5use futures::{Future, TryFutureExt};
6
7use sqlx::{AnyConnection, Connection};
8use tokio::{select, signal};
9
10use crate::opt::{Command, ConnectOpts, DatabaseCommand, MigrateCommand};
11
12mod database;
13mod metadata;
14// mod migration;
15// mod migrator;
16#[cfg(feature = "completions")]
17mod completions;
18mod migrate;
19mod opt;
20mod prepare;
21
22pub use crate::opt::Opt;
23
24pub async fn run(opt: Opt) -> Result<()> {
25    // This `select!` is here so that when the process receives a `SIGINT` (CTRL + C),
26    // the futures currently running on this task get dropped before the program exits.
27    // This is currently necessary for the consumers of the `dialoguer` crate to restore
28    // the user's terminal if the process is interrupted while a dialog is being displayed.
29
30    let ctrlc_fut = signal::ctrl_c();
31    let do_run_fut = do_run(opt);
32
33    select! {
34        biased;
35        _ = ctrlc_fut => {
36            Ok(())
37        },
38        do_run_outcome = do_run_fut => {
39            do_run_outcome
40        }
41    }
42}
43
44async fn do_run(opt: Opt) -> Result<()> {
45    match opt.command {
46        Command::Migrate(migrate) => match migrate.command {
47            MigrateCommand::Add {
48                source,
49                description,
50                reversible,
51                sequential,
52                timestamp,
53            } => migrate::add(&source, &description, reversible, sequential, timestamp).await?,
54            MigrateCommand::Run {
55                source,
56                dry_run,
57                ignore_missing,
58                connect_opts,
59                target_version,
60            } => {
61                migrate::run(
62                    &source,
63                    &connect_opts,
64                    dry_run,
65                    *ignore_missing,
66                    target_version,
67                )
68                .await?
69            }
70            MigrateCommand::Revert {
71                source,
72                dry_run,
73                ignore_missing,
74                connect_opts,
75                target_version,
76            } => {
77                migrate::revert(
78                    &source,
79                    &connect_opts,
80                    dry_run,
81                    *ignore_missing,
82                    target_version,
83                )
84                .await?
85            }
86            MigrateCommand::Info {
87                source,
88                connect_opts,
89            } => migrate::info(&source, &connect_opts).await?,
90            MigrateCommand::BuildScript { source, force } => migrate::build_script(&source, force)?,
91        },
92
93        Command::Database(database) => match database.command {
94            DatabaseCommand::Create { connect_opts } => database::create(&connect_opts).await?,
95            DatabaseCommand::Drop {
96                confirmation,
97                connect_opts,
98                force,
99            } => database::drop(&connect_opts, !confirmation.yes, force).await?,
100            DatabaseCommand::Reset {
101                confirmation,
102                source,
103                connect_opts,
104                force,
105            } => database::reset(&source, &connect_opts, !confirmation.yes, force).await?,
106            DatabaseCommand::Setup {
107                source,
108                connect_opts,
109            } => database::setup(&source, &connect_opts).await?,
110        },
111
112        Command::Prepare {
113            check,
114            all,
115            workspace,
116            connect_opts,
117            args,
118        } => prepare::run(check, all, workspace, connect_opts, args).await?,
119
120        #[cfg(feature = "completions")]
121        Command::Completions { shell } => completions::run(shell),
122    };
123
124    Ok(())
125}
126
127/// Attempt to connect to the database server, retrying up to `ops.connect_timeout`.
128async fn connect(opts: &ConnectOpts) -> anyhow::Result<AnyConnection> {
129    retry_connect_errors(opts, AnyConnection::connect).await
130}
131
132/// Attempt an operation that may return errors like `ConnectionRefused`,
133/// retrying up until `ops.connect_timeout`.
134///
135/// The closure is passed `&ops.database_url` for easy composition.
136async fn retry_connect_errors<'a, F, Fut, T>(
137    opts: &'a ConnectOpts,
138    mut connect: F,
139) -> anyhow::Result<T>
140where
141    F: FnMut(&'a str) -> Fut,
142    Fut: Future<Output = sqlx::Result<T>> + 'a,
143{
144    sqlx::any::install_default_drivers();
145
146    let db_url = opts.required_db_url()?;
147
148    backoff::future::retry(
149        backoff::ExponentialBackoffBuilder::new()
150            .with_max_elapsed_time(Some(Duration::from_secs(opts.connect_timeout)))
151            .build(),
152        || {
153            connect(db_url).map_err(|e| -> backoff::Error<anyhow::Error> {
154                if let sqlx::Error::Io(ref ioe) = e {
155                    match ioe.kind() {
156                        io::ErrorKind::ConnectionRefused
157                        | io::ErrorKind::ConnectionReset
158                        | io::ErrorKind::ConnectionAborted => {
159                            return backoff::Error::transient(e.into());
160                        }
161                        _ => (),
162                    }
163                }
164
165                backoff::Error::permanent(e.into())
166            })
167        },
168    )
169    .await
170}