Skip to main content

rustorm_migrate/
runner.rs

1use crate::history;
2use crate::parser;
3use rustorm_core::error::{OrmError, OrmResult};
4use sqlx::PgPool;
5use std::path::{Path, PathBuf};
6use std::time::Instant;
7
8/// Файл миграции, распарсенный из .sql
9#[derive(Debug, Clone)]
10pub struct MigrationFile {
11    pub version: String,
12    pub name: String,
13    pub up_sql: String,
14    pub down_sql: String,
15    pub path: PathBuf,
16}
17
18/// Статус одной миграции.
19#[derive(Debug, Clone)]
20pub struct MigrationStatus {
21    pub version: String,
22    pub name: String,
23    pub applied: bool,
24    pub checksum_ok: bool,
25}
26
27/// Главный раннер миграций.
28pub struct MigrationRunner {
29    migrations_dir: Option<PathBuf>,
30    migrations: Vec<MigrationFile>,
31}
32
33impl MigrationRunner {
34    pub fn new() -> Self {
35        Self {
36            migrations_dir: None,
37            migrations: vec![],
38        }
39    }
40
41    /// Указать директорию с SQL-файлами миграций.
42    pub fn add_directory(mut self, dir: impl Into<PathBuf>) -> Self {
43        self.migrations_dir = Some(dir.into());
44        self
45    }
46
47    /// Применить все pending миграции.
48    pub async fn migrate(&self, pool: &PgPool) -> OrmResult<Vec<String>> {
49        history::ensure_history_table(pool).await?;
50
51        let files = self.load_files()?;
52        let applied = history::applied_versions(pool).await?;
53        let mut executed = vec![];
54
55        for file in &files {
56            // Проверить checksum если уже применена
57            if applied.contains(&file.version) {
58                history::verify_checksum(pool, &file.version, &file.up_sql).await?;
59                continue;
60            }
61
62            // Применить миграцию
63            eprintln!("  → Применяю: {}_{}", file.version, file.name);
64            let start = Instant::now();
65
66            // Разбить на отдельные SQL команды и выполнить
67            for stmt in split_statements(&file.up_sql) {
68                if stmt.trim().is_empty() {
69                    continue;
70                }
71                sqlx::query(&stmt).execute(pool).await.map_err(|e| {
72                    OrmError::Migration(format!("Ошибка в миграции {}: {}", file.version, e))
73                })?;
74            }
75
76            let duration_ms = start.elapsed().as_millis() as i32;
77            history::record_migration(pool, &file.version, &file.name, &file.up_sql, duration_ms)
78                .await?;
79
80            executed.push(format!("{}_{}", file.version, file.name));
81        }
82
83        Ok(executed)
84    }
85
86    /// Откатить N последних миграций.
87    pub async fn rollback(&self, pool: &PgPool, steps: usize) -> OrmResult<Vec<String>> {
88        history::ensure_history_table(pool).await?;
89
90        let files = self.load_files()?;
91        let applied = history::applied_versions(pool).await?;
92        let mut rolled_back = vec![];
93
94        // Применённые миграции в обратном порядке
95        let to_rollback: Vec<&MigrationFile> = files
96            .iter()
97            .filter(|f| applied.contains(&f.version))
98            .rev()
99            .take(steps)
100            .collect();
101
102        for file in to_rollback {
103            if file.down_sql.is_empty() {
104                return Err(OrmError::Migration(format!(
105                    "Миграция {} не имеет DOWN секции, откат невозможен",
106                    file.version
107                )));
108            }
109
110            eprintln!("  ← Откатываю: {}_{}", file.version, file.name);
111
112            for stmt in split_statements(&file.down_sql) {
113                if stmt.trim().is_empty() {
114                    continue;
115                }
116                sqlx::query(&stmt).execute(pool).await.map_err(|e| {
117                    OrmError::Migration(format!("Ошибка при откате {}: {}", file.version, e))
118                })?;
119            }
120
121            history::remove_migration_record(pool, &file.version).await?;
122            rolled_back.push(format!("{}_{}", file.version, file.name));
123        }
124
125        Ok(rolled_back)
126    }
127
128    /// Статус всех миграций.
129    pub async fn status(&self, pool: &PgPool) -> OrmResult<Vec<MigrationStatus>> {
130        history::ensure_history_table(pool).await?;
131        let files = self.load_files()?;
132        let applied = history::applied_versions(pool).await?;
133
134        let mut statuses = vec![];
135        for file in &files {
136            let is_applied = applied.contains(&file.version);
137            statuses.push(MigrationStatus {
138                version: file.version.clone(),
139                name: file.name.clone(),
140                applied: is_applied,
141                checksum_ok: true, // TODO: проверить checksum
142            });
143        }
144        Ok(statuses)
145    }
146
147    fn load_files(&self) -> OrmResult<Vec<MigrationFile>> {
148        let mut all = self.migrations.clone();
149
150        if let Some(ref dir) = self.migrations_dir {
151            let from_dir = parser::parse_migration_dir(dir).map_err(OrmError::Migration)?;
152            all.extend(from_dir);
153        }
154
155        // Сортируем по версии
156        all.sort_by(|a, b| a.version.cmp(&b.version));
157        all.dedup_by(|a, b| a.version == b.version);
158        Ok(all)
159    }
160}
161
162impl Default for MigrationRunner {
163    fn default() -> Self {
164        Self::new()
165    }
166}
167
168/// Разбивает SQL на отдельные команды по `;`
169fn split_statements(sql: &str) -> Vec<String> {
170    sql.split(';')
171        .map(|s| s.trim().to_string())
172        .filter(|s| !s.is_empty())
173        .collect()
174}