refinery_core/drivers/
config.rs

1#[cfg(any(
2    feature = "mysql",
3    feature = "postgres",
4    feature = "tokio-postgres",
5    feature = "mysql_async"
6))]
7use crate::config::build_db_url;
8use crate::config::{Config, ConfigDbType};
9use crate::error::WrapMigrationError;
10use crate::traits::r#async::{AsyncQuery, AsyncTransaction};
11use crate::traits::sync::{Query, Transaction};
12use crate::traits::{GET_APPLIED_MIGRATIONS_QUERY, GET_LAST_APPLIED_MIGRATION_QUERY};
13use crate::{Error, Migration, Report, Target};
14use async_trait::async_trait;
15use std::convert::Infallible;
16
17// we impl all the dependent traits as noop's and then override the methods that call them on Migrate and AsyncMigrate
18impl Transaction for Config {
19    type Error = Infallible;
20
21    fn execute(&mut self, _queries: &[&str]) -> Result<usize, Self::Error> {
22        Ok(0)
23    }
24}
25
26impl Query<Vec<Migration>> for Config {
27    fn query(&mut self, _query: &str) -> Result<Vec<Migration>, Self::Error> {
28        Ok(Vec::new())
29    }
30}
31
32#[async_trait]
33impl AsyncTransaction for Config {
34    type Error = Infallible;
35
36    async fn execute(&mut self, _queries: &[&str]) -> Result<usize, Self::Error> {
37        Ok(0)
38    }
39}
40
41#[async_trait]
42impl AsyncQuery<Vec<Migration>> for Config {
43    async fn query(
44        &mut self,
45        _query: &str,
46    ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
47        Ok(Vec::new())
48    }
49}
50// this is written as macro so that we don't have to deal with type signatures
51#[cfg(any(feature = "mysql", feature = "postgres", feature = "rusqlite"))]
52#[allow(clippy::redundant_closure_call)]
53macro_rules! with_connection {
54    ($config:ident, $op: expr) => {
55        #[allow(clippy::redundant_closure_call)]
56        match $config.db_type() {
57            ConfigDbType::Mysql => {
58                cfg_if::cfg_if! {
59                    if #[cfg(feature = "mysql")] {
60                        let url = build_db_url("mysql", &$config);
61                        let opts = mysql::Opts::from_url(&url).migration_err("could not parse url", None)?;
62                        let conn = mysql::Conn::new(opts).migration_err("could not connect to database", None)?;
63                        $op(conn)
64                    } else {
65                        panic!("tried to migrate from config for a mysql database, but feature mysql not enabled!");
66                    }
67                }
68            }
69            ConfigDbType::Sqlite => {
70                cfg_if::cfg_if! {
71                    if #[cfg(feature = "rusqlite")] {
72                        //may have been checked earlier on config parsing, even if not let it fail with a Rusqlite db file not found error
73                        let path = $config.db_path().map(|p| p.to_path_buf()).unwrap_or_default();
74                        let conn = rusqlite::Connection::open_with_flags(path, rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE).migration_err("could not open database", None)?;
75                        $op(conn)
76                    } else {
77                        panic!("tried to migrate from config for a sqlite database, but feature rusqlite not enabled!");
78                    }
79                }
80            }
81            ConfigDbType::Postgres => {
82                cfg_if::cfg_if! {
83                    if #[cfg(feature = "postgres")] {
84                        let path = build_db_url("postgresql", &$config);
85                        let conn = postgres::Client::connect(path.as_str(), postgres::NoTls).migration_err("could not connect to database", None)?;
86                        $op(conn)
87                    } else {
88                        panic!("tried to migrate from config for a postgresql database, but feature postgres not enabled!");
89                    }
90                }
91            }
92            ConfigDbType::Mssql => {
93                panic!("tried to synchronously migrate from config for a mssql database, but tiberius is an async driver");
94            }
95        }
96    }
97}
98
99#[cfg(any(
100    feature = "tokio-postgres",
101    feature = "mysql_async",
102    feature = "tiberius-config"
103))]
104macro_rules! with_connection_async {
105    ($config: ident, $op: expr) => {
106        #[allow(clippy::redundant_closure_call)]
107        match $config.db_type() {
108            ConfigDbType::Mysql => {
109                cfg_if::cfg_if! {
110                    if #[cfg(feature = "mysql_async")] {
111                        let url = build_db_url("mysql", $config);
112                        let pool = mysql_async::Pool::from_url(&url).migration_err("could not connect to the database", None)?;
113                        $op(pool).await
114                    } else {
115                        panic!("tried to migrate async from config for a mysql database, but feature mysql_async not enabled!");
116                    }
117                }
118            }
119            ConfigDbType::Sqlite => {
120                panic!("tried to migrate async from config for a sqlite database, but this feature is not implemented yet");
121            }
122            ConfigDbType::Postgres => {
123                cfg_if::cfg_if! {
124                    if #[cfg(feature = "tokio-postgres")] {
125                        let path = build_db_url("postgresql", $config);
126                        let (client, connection ) = tokio_postgres::connect(path.as_str(), tokio_postgres::NoTls).await.migration_err("could not connect to database", None)?;
127                        tokio::spawn(async move {
128                            if let Err(e) = connection.await {
129                                eprintln!("connection error: {}", e);
130                            }
131                        });
132                        $op(client).await
133                    } else {
134                        panic!("tried to migrate async from config for a postgresql database, but tokio-postgres was not enabled!");
135                    }
136                }
137            }
138            ConfigDbType::Mssql => {
139                cfg_if::cfg_if! {
140                    if #[cfg(feature = "tiberius-config")] {
141                        use tiberius::{Client, Config};
142                        use tokio::net::TcpStream;
143                        use tokio_util::compat::TokioAsyncWriteCompatExt;
144                        use std::convert::TryInto;
145
146                        let config: Config = (&*$config).try_into()?;
147                        let tcp = TcpStream::connect(config.get_addr())
148                            .await
149                            .migration_err("could not connect to database", None)?;
150                        let client = Client::connect(config, tcp.compat_write())
151                            .await
152                            .migration_err("could not connect to database", None)?;
153
154                        $op(client).await
155                    } else {
156                        panic!("tried to migrate async from config for a mssql database, but tiberius-config feature was not enabled!");
157                    }
158                }
159            }
160        }
161    }
162}
163
164// rewrite all the default methods as we overrode Transaction and Query
165#[cfg(any(feature = "mysql", feature = "postgres", feature = "rusqlite"))]
166impl crate::Migrate for Config {
167    fn get_last_applied_migration(
168        &mut self,
169        migration_table_name: &str,
170    ) -> Result<Option<Migration>, Error> {
171        with_connection!(self, |mut conn| {
172            let mut migrations: Vec<Migration> = Query::query(
173                &mut conn,
174                &GET_LAST_APPLIED_MIGRATION_QUERY
175                    .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
176            )
177            .migration_err("error getting last applied migration", None)?;
178
179            Ok(migrations.pop())
180        })
181    }
182
183    fn get_applied_migrations(
184        &mut self,
185        migration_table_name: &str,
186    ) -> Result<Vec<Migration>, Error> {
187        with_connection!(self, |mut conn| {
188            let migrations: Vec<Migration> = Query::query(
189                &mut conn,
190                &GET_APPLIED_MIGRATIONS_QUERY
191                    .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
192            )
193            .migration_err("error getting applied migrations", None)?;
194
195            Ok(migrations)
196        })
197    }
198
199    fn migrate(
200        &mut self,
201        migrations: &[Migration],
202        abort_divergent: bool,
203        abort_missing: bool,
204        grouped: bool,
205        target: Target,
206        migration_table_name: &str,
207    ) -> Result<Report, Error> {
208        with_connection!(self, |mut conn| {
209            crate::Migrate::migrate(
210                &mut conn,
211                migrations,
212                abort_divergent,
213                abort_missing,
214                grouped,
215                target,
216                migration_table_name,
217            )
218        })
219    }
220}
221
222#[cfg(any(
223    feature = "mysql_async",
224    feature = "tokio-postgres",
225    feature = "tiberius-config"
226))]
227#[async_trait]
228impl crate::AsyncMigrate for Config {
229    async fn get_last_applied_migration(
230        &mut self,
231        migration_table_name: &str,
232    ) -> Result<Option<Migration>, Error> {
233        with_connection_async!(self, move |mut conn| async move {
234            let mut migrations: Vec<Migration> = AsyncQuery::query(
235                &mut conn,
236                &GET_LAST_APPLIED_MIGRATION_QUERY
237                    .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
238            )
239            .await
240            .migration_err("error getting last applied migration", None)?;
241
242            Ok(migrations.pop())
243        })
244    }
245
246    async fn get_applied_migrations(
247        &mut self,
248        migration_table_name: &str,
249    ) -> Result<Vec<Migration>, Error> {
250        with_connection_async!(self, move |mut conn| async move {
251            let migrations: Vec<Migration> = AsyncQuery::query(
252                &mut conn,
253                &GET_APPLIED_MIGRATIONS_QUERY
254                    .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
255            )
256            .await
257            .migration_err("error getting last applied migration", None)?;
258            Ok(migrations)
259        })
260    }
261
262    async fn migrate(
263        &mut self,
264        migrations: &[Migration],
265        abort_divergent: bool,
266        abort_missing: bool,
267        grouped: bool,
268        target: Target,
269        migration_table_name: &str,
270    ) -> Result<Report, Error> {
271        with_connection_async!(self, move |mut conn| async move {
272            crate::AsyncMigrate::migrate(
273                &mut conn,
274                migrations,
275                abort_divergent,
276                abort_missing,
277                grouped,
278                target,
279                migration_table_name,
280            )
281            .await
282        })
283    }
284}