1use crate::config::Config;
2use crate::traits::r#async::{AsyncQuery, AsyncTransaction};
3use crate::traits::sync::{Query, Transaction};
4use crate::Migration;
5#[cfg(any(
6 feature = "mysql",
7 feature = "postgres",
8 feature = "rusqlite",
9 feature = "tokio-postgres",
10 feature = "mysql_async",
11 feature = "tiberius-config"
12))]
13use crate::{
14 config::ConfigDbType,
15 error::WrapMigrationError,
16 traits::{GET_APPLIED_MIGRATIONS_QUERY, GET_LAST_APPLIED_MIGRATION_QUERY},
17 Error, Report, Target,
18};
19use async_trait::async_trait;
20use std::convert::Infallible;
21
22impl Transaction for Config {
24 type Error = Infallible;
25
26 fn execute<'a, T: Iterator<Item = &'a str>>(
27 &mut self,
28 _queries: T,
29 ) -> Result<usize, Self::Error> {
30 Ok(0)
31 }
32}
33
34impl Query<Vec<Migration>> for Config {
35 fn query(&mut self, _query: &str) -> Result<Vec<Migration>, Self::Error> {
36 Ok(Vec::new())
37 }
38}
39
40#[async_trait]
41impl AsyncTransaction for Config {
42 type Error = Infallible;
43
44 async fn execute<'a, T: Iterator<Item = &'a str> + Send>(
45 &mut self,
46 _queries: T,
47 ) -> Result<usize, Self::Error> {
48 Ok(0)
49 }
50}
51
52#[async_trait]
53impl AsyncQuery<Vec<Migration>> for Config {
54 async fn query(
55 &mut self,
56 _query: &str,
57 ) -> Result<Vec<Migration>, <Self as AsyncTransaction>::Error> {
58 Ok(Vec::new())
59 }
60}
61#[cfg(any(feature = "mysql", feature = "postgres", feature = "rusqlite"))]
63#[allow(clippy::redundant_closure_call)]
64macro_rules! with_connection {
65 ($config:ident, $op: expr) => {
66 #[allow(clippy::redundant_closure_call)]
67 match $config.db_type() {
68 ConfigDbType::Mysql => {
69 cfg_if::cfg_if! {
70 if #[cfg(feature = "mysql")] {
71 let url = crate::config::build_db_url("mysql", &$config);
72 let opts = mysql::Opts::from_url(&url).migration_err("could not parse url", None)?;
73 let conn = mysql::Conn::new(opts).migration_err("could not connect to database", None)?;
74 $op(conn)
75 } else {
76 panic!("tried to migrate from config for a mysql database, but feature mysql not enabled!");
77 }
78 }
79 }
80 ConfigDbType::Sqlite => {
81 cfg_if::cfg_if! {
82 if #[cfg(feature = "rusqlite")] {
83 let path = $config.db_path().map(|p| p.to_path_buf()).unwrap_or_default();
85 let conn = rusqlite::Connection::open_with_flags(path, rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE).migration_err("could not open database", None)?;
86 $op(conn)
87 } else {
88 panic!("tried to migrate from config for a sqlite database, but feature rusqlite not enabled!");
89 }
90 }
91 }
92 ConfigDbType::Postgres => {
93 cfg_if::cfg_if! {
94 if #[cfg(feature = "postgres")] {
95 let path = crate::config::build_db_url("postgresql", &$config);
96
97 let conn;
98 if $config.use_tls() {
99 let connector = native_tls::TlsConnector::new().unwrap();
100 let connector = postgres_native_tls::MakeTlsConnector::new(connector);
101 conn = postgres::Client::connect(path.as_str(), connector).migration_err("could not connect to database", None)?;
102 } else {
103 conn = postgres::Client::connect(path.as_str(), postgres::NoTls).migration_err("could not connect to database", None)?;
104 }
105
106 $op(conn)
107 } else {
108 panic!("tried to migrate from config for a postgresql database, but feature postgres not enabled!");
109 }
110 }
111 }
112 ConfigDbType::Mssql => {
113 panic!("tried to synchronously migrate from config for a mssql database, but tiberius is an async driver");
114 }
115 }
116 }
117}
118
119#[cfg(any(
120 feature = "tokio-postgres",
121 feature = "mysql_async",
122 feature = "tiberius-config"
123))]
124macro_rules! with_connection_async {
125 ($config: ident, $op: expr) => {
126 #[allow(clippy::redundant_closure_call)]
127 match $config.db_type() {
128 ConfigDbType::Mysql => {
129 cfg_if::cfg_if! {
130 if #[cfg(feature = "mysql_async")] {
131 let url = crate::config::build_db_url("mysql", $config);
132 let pool = mysql_async::Pool::from_url(&url).migration_err("could not connect to the database", None)?;
133 $op(pool).await
134 } else {
135 panic!("tried to migrate async from config for a mysql database, but feature mysql_async not enabled!");
136 }
137 }
138 }
139 ConfigDbType::Sqlite => {
140 panic!("tried to migrate async from config for a sqlite database, but this feature is not implemented yet");
141 }
142 ConfigDbType::Postgres => {
143 cfg_if::cfg_if! {
144 if #[cfg(feature = "tokio-postgres")] {
145 let path = crate::config::build_db_url("postgresql", $config);
146 if $config.use_tls() {
147 let connector = native_tls::TlsConnector::new().unwrap();
148 let connector = postgres_native_tls::MakeTlsConnector::new(connector);
149 let (client, connection) = tokio_postgres::connect(path.as_str(), connector).await.migration_err("could not connect to database", None)?;
150 tokio::spawn(async move {
151 if let Err(e) = connection.await {
152 eprintln!("connection error: {}", e);
153 }
154 });
155 $op(client).await
156 } else {
157 let (client, connection) = tokio_postgres::connect(path.as_str(), tokio_postgres::NoTls).await.migration_err("could not connect to database", None)?;
158 tokio::spawn(async move {
159 if let Err(e) = connection.await {
160 eprintln!("connection error: {}", e);
161 }
162 });
163 $op(client).await
164 }
165 } else {
166 panic!("tried to migrate async from config for a postgresql database, but tokio-postgres was not enabled!");
167 }
168 }
169 }
170 ConfigDbType::Mssql => {
171 cfg_if::cfg_if! {
172 if #[cfg(feature = "tiberius-config")] {
173 use tiberius::{Client, Config};
174 use tokio::net::TcpStream;
175 use tokio_util::compat::TokioAsyncWriteCompatExt;
176 use std::convert::TryInto;
177
178 let config: Config = (&*$config).try_into()?;
179 let tcp = TcpStream::connect(config.get_addr())
180 .await
181 .migration_err("could not connect to database", None)?;
182 let client = Client::connect(config, tcp.compat_write())
183 .await
184 .migration_err("could not connect to database", None)?;
185
186 $op(client).await
187 } else {
188 panic!("tried to migrate async from config for a mssql database, but tiberius-config feature was not enabled!");
189 }
190 }
191 }
192 }
193 }
194}
195
196#[cfg(any(feature = "mysql", feature = "postgres", feature = "rusqlite"))]
198impl crate::Migrate for Config {
199 fn get_last_applied_migration(
200 &mut self,
201 migration_table_name: &str,
202 ) -> Result<Option<Migration>, Error> {
203 with_connection!(self, |mut conn| {
204 let mut migrations: Vec<Migration> = Query::query(
205 &mut conn,
206 &GET_LAST_APPLIED_MIGRATION_QUERY
207 .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
208 )
209 .migration_err("error getting last applied migration", None)?;
210
211 Ok(migrations.pop())
212 })
213 }
214
215 fn get_applied_migrations(
216 &mut self,
217 migration_table_name: &str,
218 ) -> Result<Vec<Migration>, Error> {
219 with_connection!(self, |mut conn| {
220 let migrations: Vec<Migration> = Query::query(
221 &mut conn,
222 &GET_APPLIED_MIGRATIONS_QUERY
223 .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
224 )
225 .migration_err("error getting applied migrations", None)?;
226
227 Ok(migrations)
228 })
229 }
230
231 fn migrate(
232 &mut self,
233 migrations: &[Migration],
234 abort_divergent: bool,
235 abort_missing: bool,
236 grouped: bool,
237 target: Target,
238 migration_table_name: &str,
239 ) -> Result<Report, Error> {
240 with_connection!(self, |mut conn| {
241 crate::Migrate::migrate(
242 &mut conn,
243 migrations,
244 abort_divergent,
245 abort_missing,
246 grouped,
247 target,
248 migration_table_name,
249 )
250 })
251 }
252}
253
254#[cfg(any(
255 feature = "mysql_async",
256 feature = "tokio-postgres",
257 feature = "tiberius-config"
258))]
259#[async_trait]
260impl crate::AsyncMigrate for Config {
261 async fn get_last_applied_migration(
262 &mut self,
263 migration_table_name: &str,
264 ) -> Result<Option<Migration>, Error> {
265 with_connection_async!(self, move |mut conn| async move {
266 let mut migrations: Vec<Migration> = AsyncQuery::query(
267 &mut conn,
268 &GET_LAST_APPLIED_MIGRATION_QUERY
269 .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
270 )
271 .await
272 .migration_err("error getting last applied migration", None)?;
273
274 Ok(migrations.pop())
275 })
276 }
277
278 async fn get_applied_migrations(
279 &mut self,
280 migration_table_name: &str,
281 ) -> Result<Vec<Migration>, Error> {
282 with_connection_async!(self, move |mut conn| async move {
283 let migrations: Vec<Migration> = AsyncQuery::query(
284 &mut conn,
285 &GET_APPLIED_MIGRATIONS_QUERY
286 .replace("%MIGRATION_TABLE_NAME%", migration_table_name),
287 )
288 .await
289 .migration_err("error getting last applied migration", None)?;
290 Ok(migrations)
291 })
292 }
293
294 async fn migrate(
295 &mut self,
296 migrations: &[Migration],
297 abort_divergent: bool,
298 abort_missing: bool,
299 grouped: bool,
300 target: Target,
301 migration_table_name: &str,
302 ) -> Result<Report, Error> {
303 with_connection_async!(self, move |mut conn| async move {
304 crate::AsyncMigrate::migrate(
305 &mut conn,
306 migrations,
307 abort_divergent,
308 abort_missing,
309 grouped,
310 target,
311 migration_table_name,
312 )
313 .await
314 })
315 }
316}