elif_orm/migrations/
manager.rs1use chrono::{DateTime, Utc};
6use sqlparser::dialect::GenericDialect;
7use sqlparser::parser::Parser;
8use std::fs;
9use std::path::Path;
10
11use super::definitions::{Migration, MigrationConfig};
12use crate::error::{OrmError, OrmResult};
13
14pub struct MigrationManager {
16 config: MigrationConfig,
17}
18
19impl MigrationManager {
20 pub fn new() -> Self {
22 Self::with_config(MigrationConfig::default())
23 }
24
25 pub fn with_config(config: MigrationConfig) -> Self {
27 Self { config }
28 }
29
30 pub fn config(&self) -> &MigrationConfig {
32 &self.config
33 }
34
35 pub async fn create_migration(&self, name: &str) -> OrmResult<String> {
37 fs::create_dir_all(&self.config.migrations_dir).map_err(|e| {
39 OrmError::Migration(format!("Failed to create migrations directory: {}", e))
40 })?;
41
42 let timestamp = Utc::now().format("%Y%m%d_%H%M%S").to_string();
44 let migration_id = format!("{}_{}", timestamp, name.replace(' ', "_").to_lowercase());
45 let filename = format!("{}.sql", migration_id);
46 let filepath = self.config.migrations_dir.join(&filename);
47
48 let template = self.create_migration_template(name, &migration_id);
50
51 fs::write(&filepath, template)
52 .map_err(|e| OrmError::Migration(format!("Failed to write migration file: {}", e)))?;
53
54 Ok(filename)
55 }
56
57 pub async fn load_migrations(&self) -> OrmResult<Vec<Migration>> {
59 if !self.config.migrations_dir.exists() {
60 return Ok(Vec::new());
61 }
62
63 let mut migrations = Vec::new();
64 let entries = fs::read_dir(&self.config.migrations_dir).map_err(|e| {
65 OrmError::Migration(format!("Failed to read migrations directory: {}", e))
66 })?;
67
68 for entry in entries {
69 let entry = entry.map_err(|e| {
70 OrmError::Migration(format!("Failed to read directory entry: {}", e))
71 })?;
72
73 let path = entry.path();
74 if path.extension().is_some_and(|ext| ext == "sql") {
75 let migration = self.parse_migration_file(&path).await?;
76 migrations.push(migration);
77 }
78 }
79
80 migrations.sort_by(|a, b| a.id.cmp(&b.id));
82 Ok(migrations)
83 }
84
85 async fn parse_migration_file(&self, path: &Path) -> OrmResult<Migration> {
87 let content = fs::read_to_string(path)
88 .map_err(|e| OrmError::Migration(format!("Failed to read migration file: {}", e)))?;
89
90 let filename = path
91 .file_stem()
92 .and_then(|s| s.to_str())
93 .ok_or_else(|| OrmError::Migration("Invalid migration filename".to_string()))?;
94
95 let parts: Vec<&str> = filename.split('_').collect();
97 if parts.len() < 2 {
98 return Err(OrmError::Migration(
99 "Migration filename must follow format: timestamp_name".to_string(),
100 ));
101 }
102
103 let id = filename.to_string();
104 let name = if parts.len() >= 3 && parts[0].len() == 8 && parts[1].len() == 6 {
105 parts[2..].join("_").replace('_', " ")
107 } else {
108 parts[1..].join("_").replace('_', " ")
110 };
111
112 let (up_sql, down_sql) = self.parse_migration_content(&content)?;
114
115 let created_at = self
117 .parse_migration_timestamp(parts[0])
118 .unwrap_or_else(|_| Utc::now());
119
120 Ok(Migration {
121 id,
122 name,
123 up_sql,
124 down_sql,
125 created_at,
126 })
127 }
128
129 fn parse_migration_content(&self, content: &str) -> OrmResult<(String, String)> {
131 let lines: Vec<&str> = content.lines().collect();
132 let mut up_sql = Vec::new();
133 let mut down_sql = Vec::new();
134 let mut current_section = "";
135
136 for line in lines {
137 let trimmed = line.trim().to_lowercase();
138
139 if trimmed.starts_with("-- up") || trimmed.contains("up migration") {
140 current_section = "up";
141 continue;
142 } else if trimmed.starts_with("-- down") || trimmed.contains("down migration") {
143 current_section = "down";
144 continue;
145 }
146
147 if line.trim().is_empty() || line.trim().starts_with("--") {
149 continue;
150 }
151
152 match current_section {
153 "up" => up_sql.push(line),
154 "down" => down_sql.push(line),
155 _ => {} }
157 }
158
159 Ok((
160 up_sql.join("\n").trim().to_string(),
161 down_sql.join("\n").trim().to_string(),
162 ))
163 }
164
165 fn parse_migration_timestamp(
167 &self,
168 timestamp_str: &str,
169 ) -> Result<DateTime<Utc>, chrono::ParseError> {
170 let formatted = format!("{}000000", ×tamp_str[..8]); let naive = chrono::NaiveDateTime::parse_from_str(&formatted, "%Y%m%d%H%M%S")?;
172 Ok(DateTime::from_naive_utc_and_offset(naive, Utc))
173 }
174
175 fn create_migration_template(&self, name: &str, migration_id: &str) -> String {
177 format!(
178 "-- Migration: {}\n\
179 -- ID: {}\n\
180 -- Created: {}\n\n\
181 -- Up migration\n\
182 -- Add your schema changes here\n\n\n\
183 -- Down migration \n\
184 -- Add rollback statements here\n\n",
185 name,
186 migration_id,
187 Utc::now().format("%Y-%m-%d %H:%M:%S UTC")
188 )
189 }
190
191 pub fn split_sql_statements(&self, sql: &str) -> OrmResult<Vec<String>> {
193 let dialect = GenericDialect {};
194 let mut statements = Vec::new();
195
196 match Parser::parse_sql(&dialect, sql) {
198 Ok(parsed_statements) => {
199 for stmt in parsed_statements {
200 statements.push(format!("{};", stmt));
201 }
202 Ok(statements)
203 }
204 Err(e) => {
205 tracing::warn!("SQL parsing failed, using naive semicolon splitting: {}", e);
207 let naive_statements = sql
208 .split(';')
209 .map(|s| s.trim())
210 .filter(|s| !s.is_empty())
211 .map(|s| format!("{};", s))
212 .collect();
213 Ok(naive_statements)
214 }
215 }
216 }
217
218 pub fn create_migrations_table_sql(&self) -> String {
220 format!(
221 "CREATE TABLE IF NOT EXISTS {} (\n \
222 id VARCHAR(255) PRIMARY KEY,\n \
223 applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n \
224 batch INTEGER NOT NULL\n\
225 );",
226 self.config.migrations_table
227 )
228 }
229
230 pub fn check_migration_sql(&self, migration_id: &str) -> (String, Vec<String>) {
232 (
233 format!(
234 "SELECT id FROM {} WHERE id = $1",
235 self.config.migrations_table
236 ),
237 vec![migration_id.to_string()],
238 )
239 }
240
241 pub fn record_migration_sql(&self, migration_id: &str, batch: i32) -> (String, Vec<String>) {
243 (
244 format!(
245 "INSERT INTO {} (id, applied_at, batch) VALUES ($1, $2::timestamp, $3::integer)",
246 self.config.migrations_table
247 ),
248 vec![
249 migration_id.to_string(),
250 Utc::now().to_rfc3339(),
251 batch.to_string(),
252 ],
253 )
254 }
255
256 pub fn remove_migration_sql(&self, migration_id: &str) -> (String, Vec<String>) {
258 (
259 format!("DELETE FROM {} WHERE id = $1", self.config.migrations_table),
260 vec![migration_id.to_string()],
261 )
262 }
263
264 pub fn get_latest_batch_sql(&self) -> String {
266 format!(
267 "SELECT COALESCE(MAX(batch), 0) FROM {}",
268 self.config.migrations_table
269 )
270 }
271
272 pub fn get_applied_migrations_sql(&self) -> String {
274 format!(
275 "SELECT id, applied_at, batch FROM {} ORDER BY batch DESC, applied_at DESC",
276 self.config.migrations_table
277 )
278 }
279}
280
281impl Default for MigrationManager {
282 fn default() -> Self {
283 Self::new()
284 }
285}