rustorm_migrate/
runner.rs1use crate::history;
2use crate::parser;
3use rustorm_core::error::{OrmError, OrmResult};
4use sqlx::PgPool;
5use std::path::{Path, PathBuf};
6use std::time::Instant;
7
8#[derive(Debug, Clone)]
10pub struct MigrationFile {
11 pub version: String,
12 pub name: String,
13 pub up_sql: String,
14 pub down_sql: String,
15 pub path: PathBuf,
16}
17
18#[derive(Debug, Clone)]
20pub struct MigrationStatus {
21 pub version: String,
22 pub name: String,
23 pub applied: bool,
24 pub checksum_ok: bool,
25}
26
27pub struct MigrationRunner {
29 migrations_dir: Option<PathBuf>,
30 migrations: Vec<MigrationFile>,
31}
32
33impl MigrationRunner {
34 pub fn new() -> Self {
35 Self {
36 migrations_dir: None,
37 migrations: vec![],
38 }
39 }
40
41 pub fn add_directory(mut self, dir: impl Into<PathBuf>) -> Self {
43 self.migrations_dir = Some(dir.into());
44 self
45 }
46
47 pub async fn migrate(&self, pool: &PgPool) -> OrmResult<Vec<String>> {
49 history::ensure_history_table(pool).await?;
50
51 let files = self.load_files()?;
52 let applied = history::applied_versions(pool).await?;
53 let mut executed = vec![];
54
55 for file in &files {
56 if applied.contains(&file.version) {
58 history::verify_checksum(pool, &file.version, &file.up_sql).await?;
59 continue;
60 }
61
62 eprintln!(" → Применяю: {}_{}", file.version, file.name);
64 let start = Instant::now();
65
66 for stmt in split_statements(&file.up_sql) {
68 if stmt.trim().is_empty() {
69 continue;
70 }
71 sqlx::query(&stmt).execute(pool).await.map_err(|e| {
72 OrmError::Migration(format!("Ошибка в миграции {}: {}", file.version, e))
73 })?;
74 }
75
76 let duration_ms = start.elapsed().as_millis() as i32;
77 history::record_migration(pool, &file.version, &file.name, &file.up_sql, duration_ms)
78 .await?;
79
80 executed.push(format!("{}_{}", file.version, file.name));
81 }
82
83 Ok(executed)
84 }
85
86 pub async fn rollback(&self, pool: &PgPool, steps: usize) -> OrmResult<Vec<String>> {
88 history::ensure_history_table(pool).await?;
89
90 let files = self.load_files()?;
91 let applied = history::applied_versions(pool).await?;
92 let mut rolled_back = vec![];
93
94 let to_rollback: Vec<&MigrationFile> = files
96 .iter()
97 .filter(|f| applied.contains(&f.version))
98 .rev()
99 .take(steps)
100 .collect();
101
102 for file in to_rollback {
103 if file.down_sql.is_empty() {
104 return Err(OrmError::Migration(format!(
105 "Миграция {} не имеет DOWN секции, откат невозможен",
106 file.version
107 )));
108 }
109
110 eprintln!(" ← Откатываю: {}_{}", file.version, file.name);
111
112 for stmt in split_statements(&file.down_sql) {
113 if stmt.trim().is_empty() {
114 continue;
115 }
116 sqlx::query(&stmt).execute(pool).await.map_err(|e| {
117 OrmError::Migration(format!("Ошибка при откате {}: {}", file.version, e))
118 })?;
119 }
120
121 history::remove_migration_record(pool, &file.version).await?;
122 rolled_back.push(format!("{}_{}", file.version, file.name));
123 }
124
125 Ok(rolled_back)
126 }
127
128 pub async fn status(&self, pool: &PgPool) -> OrmResult<Vec<MigrationStatus>> {
130 history::ensure_history_table(pool).await?;
131 let files = self.load_files()?;
132 let applied = history::applied_versions(pool).await?;
133
134 let mut statuses = vec![];
135 for file in &files {
136 let is_applied = applied.contains(&file.version);
137 statuses.push(MigrationStatus {
138 version: file.version.clone(),
139 name: file.name.clone(),
140 applied: is_applied,
141 checksum_ok: true, });
143 }
144 Ok(statuses)
145 }
146
147 fn load_files(&self) -> OrmResult<Vec<MigrationFile>> {
148 let mut all = self.migrations.clone();
149
150 if let Some(ref dir) = self.migrations_dir {
151 let from_dir = parser::parse_migration_dir(dir).map_err(OrmError::Migration)?;
152 all.extend(from_dir);
153 }
154
155 all.sort_by(|a, b| a.version.cmp(&b.version));
157 all.dedup_by(|a, b| a.version == b.version);
158 Ok(all)
159 }
160}
161
162impl Default for MigrationRunner {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168fn split_statements(sql: &str) -> Vec<String> {
170 sql.split(';')
171 .map(|s| s.trim().to_string())
172 .filter(|s| !s.is_empty())
173 .collect()
174}