refinery_core/drivers/
config.rs

1#[cfg(any(
2    feature = "mysql",
3    feature = "postgres",
4    feature = "tokio-postgres",
5    feature = "mysql_async"
6))]
7use crate::config::build_db_url;
8use crate::config::{Config, ConfigDbType};
9use crate::error::WrapMigrationError;
10use crate::traits::r#async::{AsyncQuery, AsyncTransaction};
11use crate::traits::sync::{Query, Transaction};
12use crate::traits::{GET_APPLIED_MIGRATIONS_QUERY, GET_LAST_APPLIED_MIGRATION_QUERY};
13use crate::{Error, Migration, Report, Target};
14use async_trait::async_trait;
15use std::convert::Infallible;
16
17// we impl all the dependent traits as noop's and then override the methods that call them on Migrate and AsyncMigrate
18impl Transaction for Config {
19    type Error = Infallible;
20
21    fn execute<'a, T: Iterator<Item = &'a str>>(
22        &mut self,
23        _queries: T,
24    ) -> Result<usize, Self::Error> {
25        Ok(0)
26    }
27}
28
29impl Query<Vec<Migration>> for Config {
30    fn query(&mut self, _query: &str) -> Result<Vec<Migration>, Self::Error> {
31        Ok(Vec::new())
32    }
33}
34
35#[async_trait]
36impl AsyncTransaction for Config {
37    type Error = Infallible;
38
39    async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
40        &mut self,
41        _queries: T,
42    ) -> Result<usize, Self::Error> {
43        Ok(0)
44    }
45}
46
47#[async_trait]
48impl AsyncQuery<Vec<Migration>> for Config {
49    async fn query(
50        &mut self,
51        _query: &str,
52    ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
53        Ok(Vec::new())
54    }
55}
56// this is written as macro so that we don't have to deal with type signatures
57#[cfg(any(feature = "mysql", feature = "postgres", feature = "rusqlite"))]
58#[allow(clippy::redundant_closure_call)]
59macro_rules! with_connection {
60    ($config:ident, $op: expr) => {
61        #[allow(clippy::redundant_closure_call)]
62        match $config.db_type() {
63            ConfigDbType::Mysql => {
64                cfg_if::cfg_if! {
65                    if #[cfg(feature = "mysql")] {
66                        let url = build_db_url("mysql", &$config);
67                        let opts = mysql::Opts::from_url(&url).migration_err("could not parse url", None)?;
68                        let conn = mysql::Conn::new(opts).migration_err("could not connect to database", None)?;
69                        $op(conn)
70                    } else {
71                        panic!("tried to migrate from config for a mysql database, but feature mysql not enabled!");
72                    }
73                }
74            }
75            ConfigDbType::Sqlite => {
76                cfg_if::cfg_if! {
77                    if #[cfg(feature = "rusqlite")] {
78                        //may have been checked earlier on config parsing, even if not let it fail with a Rusqlite db file not found error
79                        let path = $config.db_path().map(|p| p.to_path_buf()).unwrap_or_default();
80                        let conn = rusqlite::Connection::open_with_flags(path, rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE).migration_err("could not open database", None)?;
81                        $op(conn)
82                    } else {
83                        panic!("tried to migrate from config for a sqlite database, but feature rusqlite not enabled!");
84                    }
85                }
86            }
87            ConfigDbType::Postgres => {
88                cfg_if::cfg_if! {
89                    if #[cfg(feature = "postgres")] {
90                        let path = build_db_url("postgresql", &$config);
91                        let conn = postgres::Client::connect(path.as_str(), postgres::NoTls).migration_err("could not connect to database", None)?;
92                        $op(conn)
93                    } else {
94                        panic!("tried to migrate from config for a postgresql database, but feature postgres not enabled!");
95                    }
96                }
97            }
98            ConfigDbType::Mssql => {
99                panic!("tried to synchronously migrate from config for a mssql database, but tiberius is an async driver");
100            }
101        }
102    }
103}
104
105#[cfg(any(
106    feature = "tokio-postgres",
107    feature = "mysql_async",
108    feature = "tiberius-config"
109))]
110macro_rules! with_connection_async {
111    ($config: ident, $op: expr) => {
112        #[allow(clippy::redundant_closure_call)]
113        match $config.db_type() {
114            ConfigDbType::Mysql => {
115                cfg_if::cfg_if! {
116                    if #[cfg(feature = "mysql_async")] {
117                        let url = build_db_url("mysql", $config);
118                        let pool = mysql_async::Pool::from_url(&url).migration_err("could not connect to the database", None)?;
119                        $op(pool).await
120                    } else {
121                        panic!("tried to migrate async from config for a mysql database, but feature mysql_async not enabled!");
122                    }
123                }
124            }
125            ConfigDbType::Sqlite => {
126                panic!("tried to migrate async from config for a sqlite database, but this feature is not implemented yet");
127            }
128            ConfigDbType::Postgres => {
129                cfg_if::cfg_if! {
130                    if #[cfg(feature = "tokio-postgres")] {
131                        let path = build_db_url("postgresql", $config);
132                        let (client, connection ) = tokio_postgres::connect(path.as_str(), tokio_postgres::NoTls).await.migration_err("could not connect to database", None)?;
133                        tokio::spawn(async move {
134                            if let Err(e) = connection.await {
135                                eprintln!("connection error: {}", e);
136                            }
137                        });
138                        $op(client).await
139                    } else {
140                        panic!("tried to migrate async from config for a postgresql database, but tokio-postgres was not enabled!");
141                    }
142                }
143            }
144            ConfigDbType::Mssql => {
145                cfg_if::cfg_if! {
146                    if #[cfg(feature = "tiberius-config")] {
147                        use tiberius::{Client, Config};
148                        use tokio::net::TcpStream;
149                        use tokio_util::compat::TokioAsyncWriteCompatExt;
150                        use std::convert::TryInto;
151
152                        let config: Config = (&*$config).try_into()?;
153                        let tcp = TcpStream::connect(config.get_addr())
154                            .await
155                            .migration_err("could not connect to database", None)?;
156                        let client = Client::connect(config, tcp.compat_write())
157                            .await
158                            .migration_err("could not connect to database", None)?;
159
160                        $op(client).await
161                    } else {
162                        panic!("tried to migrate async from config for a mssql database, but tiberius-config feature was not enabled!");
163                    }
164                }
165            }
166        }
167    }
168}
169
170// rewrite all the default methods as we overrode Transaction and Query
171#[cfg(any(feature = "mysql", feature = "postgres", feature = "rusqlite"))]
172impl crate::Migrate for Config {
173    fn get_last_applied_migration(
174        &mut self,
175        migration_table_name: &str,
176    ) -> Result<Option<Migration>, Error> {
177        with_connection!(self, |mut conn| {
178            let mut migrations: Vec<Migration> = Query::query(
179                &mut conn,
180                &GET_LAST_APPLIED_MIGRATION_QUERY
181                    .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
182            )
183            .migration_err("error getting last applied migration", None)?;
184
185            Ok(migrations.pop())
186        })
187    }
188
189    fn get_applied_migrations(
190        &mut self,
191        migration_table_name: &str,
192    ) -> Result<Vec<Migration>, Error> {
193        with_connection!(self, |mut conn| {
194            let migrations: Vec<Migration> = Query::query(
195                &mut conn,
196                &GET_APPLIED_MIGRATIONS_QUERY
197                    .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
198            )
199            .migration_err("error getting applied migrations", None)?;
200
201            Ok(migrations)
202        })
203    }
204
205    fn migrate(
206        &mut self,
207        migrations: &[Migration],
208        abort_divergent: bool,
209        abort_missing: bool,
210        grouped: bool,
211        target: Target,
212        migration_table_name: &str,
213    ) -> Result<Report, Error> {
214        with_connection!(self, |mut conn| {
215            crate::Migrate::migrate(
216                &mut conn,
217                migrations,
218                abort_divergent,
219                abort_missing,
220                grouped,
221                target,
222                migration_table_name,
223            )
224        })
225    }
226}
227
228#[cfg(any(
229    feature = "mysql_async",
230    feature = "tokio-postgres",
231    feature = "tiberius-config"
232))]
233#[async_trait]
234impl crate::AsyncMigrate for Config {
235    async fn get_last_applied_migration(
236        &mut self,
237        migration_table_name: &str,
238    ) -> Result<Option<Migration>, Error> {
239        with_connection_async!(self, move |mut conn| async move {
240            let mut migrations: Vec<Migration> = AsyncQuery::query(
241                &mut conn,
242                &GET_LAST_APPLIED_MIGRATION_QUERY
243                    .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
244            )
245            .await
246            .migration_err("error getting last applied migration", None)?;
247
248            Ok(migrations.pop())
249        })
250    }
251
252    async fn get_applied_migrations(
253        &mut self,
254        migration_table_name: &str,
255    ) -> Result<Vec<Migration>, Error> {
256        with_connection_async!(self, move |mut conn| async move {
257            let migrations: Vec<Migration> = AsyncQuery::query(
258                &mut conn,
259                &GET_APPLIED_MIGRATIONS_QUERY
260                    .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
261            )
262            .await
263            .migration_err("error getting last applied migration", None)?;
264            Ok(migrations)
265        })
266    }
267
268    async fn migrate(
269        &mut self,
270        migrations: &[Migration],
271        abort_divergent: bool,
272        abort_missing: bool,
273        grouped: bool,
274        target: Target,
275        migration_table_name: &str,
276    ) -> Result<Report, Error> {
277        with_connection_async!(self, move |mut conn| async move {
278            crate::AsyncMigrate::migrate(
279                &mut conn,
280                migrations,
281                abort_divergent,
282                abort_missing,
283                grouped,
284                target,
285                migration_table_name,
286            )
287            .await
288        })
289    }
290}