1use crate::error::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use sqlx::PgPool;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct MigrationInfo {
14 pub version: i64,
16 pub description: String,
18 pub installed_on: DateTime<Utc>,
20 pub execution_time: i64,
22 pub success: bool,
24 pub checksum: Option<String>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct MigrationStatus {
31 pub current_version: i64,
33 pub total_migrations: usize,
35 pub pending_count: usize,
37 pub is_up_to_date: bool,
39 pub last_migration: Option<MigrationInfo>,
41}
42
43pub async fn get_migration_status(pool: &PgPool) -> Result<MigrationStatus> {
48 let table_exists = sqlx::query_scalar::<_, bool>(
50 r#"
51 SELECT EXISTS (
52 SELECT FROM information_schema.tables
53 WHERE table_schema = 'public'
54 AND table_name = '_sqlx_migrations'
55 )
56 "#,
57 )
58 .fetch_one(pool)
59 .await?;
60
61 if !table_exists {
62 return Ok(MigrationStatus {
63 current_version: 0,
64 total_migrations: 0,
65 pending_count: 0,
66 is_up_to_date: false,
67 last_migration: None,
68 });
69 }
70
71 let migrations = sqlx::query_as::<_, (i64, String, DateTime<Utc>, i64, bool)>(
73 r#"
74 SELECT version, description, installed_on, execution_time, success
75 FROM _sqlx_migrations
76 ORDER BY version DESC
77 "#,
78 )
79 .fetch_all(pool)
80 .await?;
81
82 let total_migrations = migrations.len();
83 let current_version = migrations.first().map(|m| m.0).unwrap_or(0);
84
85 let last_migration = migrations.first().map(|m| MigrationInfo {
86 version: m.0,
87 description: m.1.clone(),
88 installed_on: m.2,
89 execution_time: m.3,
90 success: m.4,
91 checksum: None,
92 });
93
94 Ok(MigrationStatus {
95 current_version,
96 total_migrations,
97 pending_count: 0, is_up_to_date: true,
99 last_migration,
100 })
101}
102
103pub async fn list_applied_migrations(pool: &PgPool) -> Result<Vec<MigrationInfo>> {
105 let migrations = sqlx::query_as::<_, (i64, String, DateTime<Utc>, i64, bool)>(
106 r#"
107 SELECT version, description, installed_on, execution_time, success
108 FROM _sqlx_migrations
109 ORDER BY version ASC
110 "#,
111 )
112 .fetch_all(pool)
113 .await?;
114
115 Ok(migrations
116 .into_iter()
117 .map(|m| MigrationInfo {
118 version: m.0,
119 description: m.1,
120 installed_on: m.2,
121 execution_time: m.3,
122 success: m.4,
123 checksum: None,
124 })
125 .collect())
126}
127
128pub async fn is_migration_applied(pool: &PgPool, version: i64) -> Result<bool> {
130 let exists = sqlx::query_scalar::<_, bool>(
131 r#"
132 SELECT EXISTS (
133 SELECT 1 FROM _sqlx_migrations
134 WHERE version = $1 AND success = true
135 )
136 "#,
137 )
138 .bind(version)
139 .fetch_one(pool)
140 .await?;
141
142 Ok(exists)
143}
144
145pub async fn verify_schema_integrity(pool: &PgPool) -> Result<SchemaIntegrity> {
147 let required_tables = vec![
148 "users",
149 "tokens",
150 "balances",
151 "orders",
152 "trades",
153 "reputation_events",
154 "output_commitments",
155 "admin_actions",
156 ];
157
158 let mut missing_tables = Vec::new();
159 let mut existing_tables = Vec::new();
160
161 for table in required_tables {
162 let exists = sqlx::query_scalar::<_, bool>(
163 r#"
164 SELECT EXISTS (
165 SELECT FROM information_schema.tables
166 WHERE table_schema = 'public'
167 AND table_name = $1
168 )
169 "#,
170 )
171 .bind(table)
172 .fetch_one(pool)
173 .await?;
174
175 if exists {
176 existing_tables.push(table.to_string());
177 } else {
178 missing_tables.push(table.to_string());
179 }
180 }
181
182 Ok(SchemaIntegrity {
183 is_valid: missing_tables.is_empty(),
184 existing_tables,
185 missing_tables,
186 })
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct SchemaIntegrity {
192 pub is_valid: bool,
194 pub existing_tables: Vec<String>,
196 pub missing_tables: Vec<String>,
198}
199
200pub async fn get_schema_version(pool: &PgPool) -> Result<Option<String>> {
204 let table_exists = sqlx::query_scalar::<_, bool>(
206 r#"
207 SELECT EXISTS (
208 SELECT FROM information_schema.tables
209 WHERE table_schema = 'public'
210 AND table_name = 'schema_version'
211 )
212 "#,
213 )
214 .fetch_one(pool)
215 .await?;
216
217 if !table_exists {
218 return Ok(None);
219 }
220
221 let version = sqlx::query_scalar::<_, String>(
222 r#"
223 SELECT version FROM schema_version
224 ORDER BY installed_at DESC
225 LIMIT 1
226 "#,
227 )
228 .fetch_optional(pool)
229 .await?;
230
231 Ok(version)
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 #[test]
239 fn test_migration_info_structure() {
240 let info = MigrationInfo {
241 version: 1,
242 description: "initial".to_string(),
243 installed_on: Utc::now(),
244 execution_time: 100,
245 success: true,
246 checksum: Some("abc123".to_string()),
247 };
248
249 assert_eq!(info.version, 1);
250 assert!(info.success);
251 }
252
253 #[test]
254 fn test_migration_status_structure() {
255 let status = MigrationStatus {
256 current_version: 5,
257 total_migrations: 5,
258 pending_count: 0,
259 is_up_to_date: true,
260 last_migration: None,
261 };
262
263 assert_eq!(status.current_version, 5);
264 assert!(status.is_up_to_date);
265 }
266
267 #[test]
268 fn test_schema_integrity_valid() {
269 let integrity = SchemaIntegrity {
270 is_valid: true,
271 existing_tables: vec!["users".to_string(), "tokens".to_string()],
272 missing_tables: vec![],
273 };
274
275 assert!(integrity.is_valid);
276 assert!(integrity.missing_tables.is_empty());
277 }
278
279 #[test]
280 fn test_schema_integrity_invalid() {
281 let integrity = SchemaIntegrity {
282 is_valid: false,
283 existing_tables: vec!["users".to_string()],
284 missing_tables: vec!["tokens".to_string()],
285 };
286
287 assert!(!integrity.is_valid);
288 assert_eq!(integrity.missing_tables.len(), 1);
289 }
290
291 #[test]
292 fn test_migration_info_serialization() {
293 let info = MigrationInfo {
294 version: 1,
295 description: "test".to_string(),
296 installed_on: Utc::now(),
297 execution_time: 50,
298 success: true,
299 checksum: None,
300 };
301
302 let json = serde_json::to_string(&info).unwrap();
303 let deserialized: MigrationInfo = serde_json::from_str(&json).unwrap();
304
305 assert_eq!(deserialized.version, info.version);
306 assert_eq!(deserialized.description, info.description);
307 }
308}