springtime_migrate_refinery/
runner.rs1use crate::config::MigrationConfigProvider;
4use crate::migration::MigrationSource;
5use crate::refinery::Runner;
6use itertools::Itertools;
7use springtime::future::{BoxFuture, FutureExt};
8use springtime::runner::ApplicationRunner;
9use springtime_di::instance_provider::{ComponentInstancePtr, ErrorPtr};
10use springtime_di::{component_alias, injectable, Component};
11use tracing::{debug, info};
12
13#[injectable]
18pub trait MigrationRunnerExecutor {
19 fn run_migrations<'a>(&'a self, runner: &'a Runner) -> BoxFuture<'a, Result<(), ErrorPtr>>;
21}
22
23#[derive(Component)]
24struct MigrationRunner {
25 config_provider: ComponentInstancePtr<dyn MigrationConfigProvider + Send + Sync>,
26 migration_sources: Vec<ComponentInstancePtr<dyn MigrationSource + Send + Sync>>,
27 executors: Vec<ComponentInstancePtr<dyn MigrationRunnerExecutor + Send + Sync>>,
28}
29
30#[component_alias]
31impl ApplicationRunner for MigrationRunner {
32 fn run(&self) -> BoxFuture<'_, Result<(), ErrorPtr>> {
33 async {
34 let config = self.config_provider.config().await?;
35 if !config.run_migrations_on_start {
36 debug!("Migrations disabled.");
37 return Ok(());
38 }
39
40 if self.migration_sources.is_empty() {
41 info!("Not running any migrations, since no sources are available.");
42 return Ok(());
43 }
44
45 let migrations: Vec<_> = self
46 .migration_sources
47 .iter()
48 .map(|source| source.migrations())
49 .flatten_ok()
50 .try_collect()?;
51
52 info!(
53 "Running {} migrations by {} executors...",
54 migrations.len(),
55 self.executors.len()
56 );
57
58 let mut runner = Runner::new(&migrations)
59 .set_target(config.target.into())
60 .set_grouped(config.grouped)
61 .set_abort_divergent(config.abort_divergent)
62 .set_abort_missing(config.abort_missing);
63 runner.set_migration_table_name(&config.migration_table_name);
64
65 for executor in &self.executors {
66 executor.run_migrations(&runner).await?;
67 }
68
69 debug!("Done running migrations.");
70
71 Ok(())
72 }
73 .boxed()
74 }
75
76 fn priority(&self) -> i8 {
77 100
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use crate::config::{MigrationConfig, MigrationConfigProvider};
84 use crate::migration::MockMigrationSource;
85 use crate::runner::{MigrationRunner, MigrationRunnerExecutor};
86 use mockall::automock;
87 use refinery_core::{Migration, Runner};
88 use springtime::future::{BoxFuture, FutureExt};
89 use springtime::runner::ApplicationRunner;
90 use springtime_di::instance_provider::{ComponentInstancePtr, ErrorPtr};
91
92 #[automock]
93 pub trait TestMigrationRunnerExecutor {
94 fn run_migrations(&self, runner: &Runner) -> BoxFuture<'_, Result<(), ErrorPtr>>;
95 }
96
97 struct MockMigrationRunnerExecutor {
98 inner: MockTestMigrationRunnerExecutor,
99 }
100
101 impl MockMigrationRunnerExecutor {
102 fn new() -> Self {
103 Self {
104 inner: MockTestMigrationRunnerExecutor::new(),
105 }
106 }
107 }
108
109 impl MigrationRunnerExecutor for MockMigrationRunnerExecutor {
110 fn run_migrations<'a>(&'a self, runner: &'a Runner) -> BoxFuture<'a, Result<(), ErrorPtr>> {
111 self.inner.run_migrations(runner)
112 }
113 }
114
115 #[derive(Default)]
116 struct TestMigrationConfigProvider {
117 config: MigrationConfig,
118 }
119
120 impl MigrationConfigProvider for TestMigrationConfigProvider {
121 fn config(&self) -> BoxFuture<'_, Result<&MigrationConfig, ErrorPtr>> {
122 async { Ok(&self.config) }.boxed()
123 }
124 }
125
126 #[tokio::test]
127 async fn should_execute_migrations() {
128 let mut migration_source = MockMigrationSource::new();
129 migration_source
130 .expect_migrations()
131 .times(1)
132 .return_const(Ok(vec![Migration::unapplied("V00__test", "test").unwrap()]));
133
134 let mut executor = MockMigrationRunnerExecutor::new();
135 executor
136 .inner
137 .expect_run_migrations()
138 .times(1)
139 .returning(|_| async { Ok(()) }.boxed());
140
141 let runner = MigrationRunner {
142 config_provider: ComponentInstancePtr::new(TestMigrationConfigProvider::default()),
143 migration_sources: vec![ComponentInstancePtr::new(migration_source)],
144 executors: vec![ComponentInstancePtr::new(executor)],
145 };
146 runner.run().await.unwrap();
147 }
148}