1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
use crate::writer::Writer;
use color_eyre::owo_colors::OwoColorize;
use eyre::Context;
use sqlparser::{ast::Statement, dialect::PostgreSqlDialect, parser::Parser};
use sqlx::Row;
use std::path::Path;
/// A struct representing a set of database migration files.
///
/// The `Migrations` struct is used to manage and refer to a collection of database
/// migration files stored in a specific directory.
pub struct Migrations<'a> {
/// Directory where migration files are stored
path: &'a Path,
/// Global output writer
writer: &'a Writer,
}
/// Methods for managing database migrations
impl<'a> Migrations<'a> {
/// Creates a new `Migrations` instance from a directory path
///
/// `path` is a path to the migrations directory; it must exist in the filesystem
pub fn new(path: &'a Path, writer: &'a Writer) -> eyre::Result<Self> {
if !path.try_exists()? {
return Err(eyre::eyre!(
"Migrations directory does not exist: {}",
path.display()
));
}
Ok(Self { path, writer })
}
/// Applies database migrations based on the stored migration files and the current state
/// of the database.
///
/// This function retrieves the latest applied migration ID, and determines which migrations
/// (if any) need to be applied. It then applies the pending migrations sequentially
/// and updates the `schema_migrations` table to record each migration.
///
/// Apply DDL statements separately because DSQL does not support mixing DDL and DML
/// statements within a transaction.
pub async fn apply(&self, connection_string: String) -> eyre::Result<()> {
let connection = sqlx::PgPool::connect(&connection_string)
.await
.wrap_err("Failed to connect to database")?;
// Get latest applied migration
let result = sqlx::query("SELECT MAX(id) FROM schema_migrations")
.fetch_one(&connection)
.await?;
let last_db_id: String = result
.try_get::<Option<String>, _>(0)
.unwrap_or_default()
.unwrap_or("0".to_string());
let migrations = self.migrations(&last_db_id).await?;
if migrations.is_empty() {
self.writer.text(&format!(
"{}",
console::style("No migrations to apply...").yellow()
))?;
return Ok(());
}
self.validate(&migrations).await?;
for (filename, content) in migrations {
sqlx::raw_sql(&content)
.execute(&connection)
.await
.inspect_err(|e| log::error!("Error: {e:?}"))
.wrap_err("Failed to apply migration")?;
sqlx::query(r#"INSERT INTO "schema_migrations" (id) VALUES ($1)"#)
.bind(&filename)
.execute(&connection)
.await?;
self.writer.text(&format!(
"{} {}\n",
console::style("✓").green(),
console::style(&filename).dimmed()
))?;
}
Ok(())
}
/// Creates a new migration file with a unique filename based on the current
/// timestamp and an optional user-provided name.
///
/// The migration file is created within a specified directory and is initially empty.
pub async fn create(&self, name: Option<&str>) -> eyre::Result<()> {
let timestamp = chrono::Utc::now().format("%Y%m%d%H%M%S");
// Allow only alphanumeric characters and underscores
let name = name
.unwrap_or_default()
.replace(" ", "_")
.chars()
.filter(|c| c.is_alphanumeric() || *c == '_')
.take(100)
.collect::<String>();
// Generate a unique filename based on the current timestamp and optional migration name
let filename = [timestamp.to_string(), name]
.into_iter()
.filter(|c| !c.is_empty())
.collect::<Vec<_>>()
.join("_");
let filepath = self.path.join(format!("{}.up.sql", filename));
// TODO Add some helpful comments to the migration file
tokio::fs::write(&filepath, "")
.await
.wrap_err("Failed to create a migration file")?;
self.writer.text(&format!(
"{} {} {}\n",
console::style("Created migration").green().bold(),
console::style("at").dim(),
console::style(format!("{}", filepath.to_string_lossy()))
.underlined()
.bold(),
))?;
Ok(())
}
/// Retrieves a sorted (ASC) list of database migration files and their contents to be applied.
///
/// `last_applied_id`: representing the identifier of the last applied migration.
/// Only migration files with names greater than this identifier (in lexicographical order)
/// will be included.
///
/// Returns a vector of tuples. Each tuple contains:
/// - `String`: The name of the migration file
/// - `String`: The content of the migration file
async fn migrations(&self, last_applied_id: &str) -> eyre::Result<Vec<(String, String)>> {
let mut read_dir = tokio::fs::read_dir(self.path)
.await
.wrap_err("Failed to read migrations dir")?;
let mut paths = Vec::new();
// Collect all valid migration files
while let Some(entry) = read_dir.next_entry().await? {
let path = entry.path();
if !path.is_file() {
continue;
}
let filename = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name.to_owned(),
None => {
log::warn!("Invalid filename: {:?}. Skipping...", path);
continue;
}
};
if filename.ends_with(".up.sql") {
paths.push((filename, path));
}
}
// Sort migrations by name in ASC (the oldest first) order
paths.sort_by(|(name1, _), (name2, _)| name1.cmp(name2));
let mut result = Vec::new();
// Filter out migrations that have already been applied and read its content
for (filename, path) in paths {
if filename.as_str() > last_applied_id {
let content = tokio::fs::read_to_string(path)
.await
.wrap_err("Failed to read file")?;
result.push((filename, content));
}
}
Ok(result)
}
/// Validates SQL statements of migrations
///
/// Returns an error if DDL and DML are mixed in the same file
async fn validate(&self, migrations: &Vec<(String, String)>) -> eyre::Result<()> {
for (path, content) in migrations {
// Strip ASYNC keyword before parsing — sqlparser doesn't support
// CREATE ASYNC INDEX (DSQL syntax) but it's need DDL/DML classification only
let sanitized = content.replace("CREATE ASYNC INDEX", "CREATE INDEX");
let statements = Parser::parse_sql(&PostgreSqlDialect {}, &sanitized)
.wrap_err("Failed to parse migration SQL")?;
let mut ddl = Vec::new();
let mut others = Vec::new();
for stmt in statements {
if matches!(
stmt,
Statement::CreateTable { .. }
| Statement::AlterTable { .. }
| Statement::Drop { .. }
| Statement::CreateIndex { .. }
| Statement::CreateView { .. }
) {
ddl.push(stmt.to_string());
} else {
others.push(stmt.to_string());
}
}
if !ddl.is_empty() && !others.is_empty() {
eyre::bail!(
"Migration: {path} contains both DDL and DML statements. \
Please split them into separate migration files."
)
}
}
Ok(())
}
}