diesel_async/
migrations.rs

1use diesel::migration::{Migration, MigrationVersion, Result};
2
3use crate::async_connection_wrapper::AsyncConnectionWrapper;
4use crate::AsyncConnection;
5
6/// A diesel-migration [`MigrationHarness`](diesel_migrations::MigrationHarness) to run migrations
7/// via an [`AsyncConnection`](crate::AsyncConnection)
8///
9/// Internally this harness is using [`tokio::task::block_in_place`] and [`AsyncConnectionWrapper`]
10/// to utilize sync Diesel's migration infrastructure. For most applications this shouldn't
11/// be problematic as migrations are usually run at application startup and most applications
12/// default to use the multithreaded tokio runtime. In turn this also means that you cannot use
13/// this migration harness if you use the current thread variant of the tokio runtime or if
14/// you run migrations in a very special setup (e.g by using [`tokio::select!`] or [`tokio::join!`]
15/// on a future produced by running the migrations). Consider manually construct a blocking task via
16/// [`tokio::task::spawn_blocking`] instead.
17///
18/// ## Example
19///
20/// ```no_run
21/// # include!("doctest_setup.rs");
22/// # async fn run_test() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>{
23/// use diesel_async::AsyncMigrationHarness;
24/// use diesel_migrations::{FileBasedMigrations, MigrationHarness};
25///
26/// let mut connection = connection_no_data().await;
27///
28/// // Alternativly use `diesel_migrations::embed_migrations!()`
29/// // to get a list of migrations
30/// let migrations = FileBasedMigrations::find_migrations_directory()?;
31///
32/// let mut harness = AsyncMigrationHarness::new(connection);
33/// harness.run_pending_migrations(migrations)?;
34/// // get back the connection from the harness
35/// let connection = harness.into_inner();
36/// #      Ok(())
37/// # }
38/// # #[tokio::main]
39/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
40/// #     run_test().await?;
41/// #     Ok(())
42/// # }
43/// ```
44///
45/// ## Example with pool
46///
47/// ```no_run
48/// # include!("doctest_setup.rs");
49/// # #[cfg(feature = "deadpool")]
50/// # use diesel_async::pooled_connection::AsyncDieselConnectionManager;
51/// #
52/// # #[cfg(all(feature = "postgres", feature = "deadpool"))]
53/// # fn get_config() -> AsyncDieselConnectionManager<diesel_async::AsyncPgConnection> {
54/// #     let db_url = database_url_from_env("PG_DATABASE_URL");
55/// let config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(db_url);
56/// #     config
57/// #  }
58/// #
59/// # #[cfg(all(feature = "mysql", feature = "deadpool"))]
60/// # fn get_config() -> AsyncDieselConnectionManager<diesel_async::AsyncMysqlConnection> {
61/// #     let db_url = database_url_from_env("MYSQL_DATABASE_URL");
62/// #    let config = AsyncDieselConnectionManager::<diesel_async::AsyncMysqlConnection>::new(db_url);
63/// #     config
64/// #  }
65/// #
66/// # #[cfg(all(feature = "sqlite", feature = "deadpool"))]
67/// # fn get_config() -> AsyncDieselConnectionManager<diesel_async::sync_connection_wrapper::SyncConnectionWrapper<diesel::SqliteConnection>> {
68/// #     let db_url = database_url_from_env("SQLITE_DATABASE_URL");
69/// #     let config = AsyncDieselConnectionManager::<diesel_async::sync_connection_wrapper::SyncConnectionWrapper<diesel::SqliteConnection>>::new(db_url);
70/// #     config
71/// # }
72/// # #[cfg(feature = "deadpool")]
73/// # async fn run_test() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>{
74/// use diesel_async::pooled_connection::deadpool::Pool;
75/// use diesel_async::AsyncMigrationHarness;
76/// use diesel_migrations::{FileBasedMigrations, MigrationHarness};
77///
78/// // Alternativly use `diesel_migrations::embed_migrations!()`
79/// // to get a list of migrations
80/// let migrations = FileBasedMigrations::find_migrations_directory()?;
81///
82/// let pool = Pool::builder(get_config()).build()?;
83/// let mut harness = AsyncMigrationHarness::new(pool.get().await?);
84/// harness.run_pending_migrations(migrations)?;
85/// #      Ok(())
86/// # }
87///
88/// # #[cfg(not(feature = "deadpool"))]
89/// # async fn run_test() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>
90/// # {
91/// #   Ok(())
92/// # }
93/// #
94/// # #[tokio::main]
95/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
96/// #     run_test().await?;
97/// #     Ok(())
98/// # }
99/// ```
100pub struct AsyncMigrationHarness<C> {
101    conn: AsyncConnectionWrapper<C, crate::async_connection_wrapper::implementation::Tokio>,
102}
103
104impl<C> AsyncMigrationHarness<C>
105where
106    C: AsyncConnection,
107{
108    /// Construct a new `AsyncMigrationHarness` from a given connection
109    pub fn new(connection: C) -> Self {
110        Self {
111            conn: AsyncConnectionWrapper::from(connection),
112        }
113    }
114
115    /// Return the connection stored inside this instance of `AsyncMigrationHarness`
116    pub fn into_inner(self) -> C {
117        self.conn.into_inner()
118    }
119}
120
121impl<C> From<C> for AsyncMigrationHarness<C>
122where
123    C: AsyncConnection,
124{
125    fn from(value: C) -> Self {
126        AsyncMigrationHarness::new(value)
127    }
128}
129
130impl<C> diesel_migrations::MigrationHarness<C::Backend> for AsyncMigrationHarness<C>
131where
132    C: AsyncConnection,
133    AsyncConnectionWrapper<C, crate::async_connection_wrapper::implementation::Tokio>:
134        diesel::Connection<Backend = C::Backend> + diesel_migrations::MigrationHarness<C::Backend>,
135{
136    fn run_migration(
137        &mut self,
138        migration: &dyn Migration<C::Backend>,
139    ) -> Result<MigrationVersion<'static>> {
140        tokio::task::block_in_place(|| {
141            diesel_migrations::MigrationHarness::run_migration(&mut self.conn, migration)
142        })
143    }
144
145    fn revert_migration(
146        &mut self,
147        migration: &dyn Migration<C::Backend>,
148    ) -> Result<MigrationVersion<'static>> {
149        tokio::task::block_in_place(|| {
150            diesel_migrations::MigrationHarness::revert_migration(&mut self.conn, migration)
151        })
152    }
153
154    fn applied_migrations(&mut self) -> Result<Vec<MigrationVersion<'static>>> {
155        tokio::task::block_in_place(|| {
156            diesel_migrations::MigrationHarness::applied_migrations(&mut self.conn)
157        })
158    }
159}