kaccy_db/
migration_utils.rs

1//! Migration utilities for tracking and managing database schema versions
2//!
3//! This module provides utilities for checking migration status, validating
4//! schema integrity, and managing database migrations programmatically.
5
6use crate::error::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use sqlx::PgPool;
10
11/// Information about a database migration
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct MigrationInfo {
14    /// Migration version number
15    pub version: i64,
16    /// Migration description/name
17    pub description: String,
18    /// When the migration was installed
19    pub installed_on: DateTime<Utc>,
20    /// Execution time in milliseconds
21    pub execution_time: i64,
22    /// Whether the migration was successful
23    pub success: bool,
24    /// Checksum of the migration file
25    pub checksum: Option<String>,
26}
27
28/// Summary of migration status
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct MigrationStatus {
31    /// Current schema version
32    pub current_version: i64,
33    /// Total number of applied migrations
34    pub total_migrations: usize,
35    /// Pending migrations (if any)
36    pub pending_count: usize,
37    /// Whether all migrations are up to date
38    pub is_up_to_date: bool,
39    /// Last migration applied
40    pub last_migration: Option<MigrationInfo>,
41}
42
43/// Get the current migration status
44///
45/// Returns information about applied migrations and current schema version.
46/// This assumes sqlx's _sqlx_migrations table exists.
47pub async fn get_migration_status(pool: &PgPool) -> Result<MigrationStatus> {
48    // Check if migrations table exists
49    let table_exists = sqlx::query_scalar::<_, bool>(
50        r#"
51        SELECT EXISTS (
52            SELECT FROM information_schema.tables
53            WHERE table_schema = 'public'
54            AND table_name = '_sqlx_migrations'
55        )
56        "#,
57    )
58    .fetch_one(pool)
59    .await?;
60
61    if !table_exists {
62        return Ok(MigrationStatus {
63            current_version: 0,
64            total_migrations: 0,
65            pending_count: 0,
66            is_up_to_date: false,
67            last_migration: None,
68        });
69    }
70
71    // Get migration information
72    let migrations = sqlx::query_as::<_, (i64, String, DateTime<Utc>, i64, bool)>(
73        r#"
74        SELECT version, description, installed_on, execution_time, success
75        FROM _sqlx_migrations
76        ORDER BY version DESC
77        "#,
78    )
79    .fetch_all(pool)
80    .await?;
81
82    let total_migrations = migrations.len();
83    let current_version = migrations.first().map(|m| m.0).unwrap_or(0);
84
85    let last_migration = migrations.first().map(|m| MigrationInfo {
86        version: m.0,
87        description: m.1.clone(),
88        installed_on: m.2,
89        execution_time: m.3,
90        success: m.4,
91        checksum: None,
92    });
93
94    Ok(MigrationStatus {
95        current_version,
96        total_migrations,
97        pending_count: 0, // Would need to compare with migration files
98        is_up_to_date: true,
99        last_migration,
100    })
101}
102
103/// List all applied migrations
104pub async fn list_applied_migrations(pool: &PgPool) -> Result<Vec<MigrationInfo>> {
105    let migrations = sqlx::query_as::<_, (i64, String, DateTime<Utc>, i64, bool)>(
106        r#"
107        SELECT version, description, installed_on, execution_time, success
108        FROM _sqlx_migrations
109        ORDER BY version ASC
110        "#,
111    )
112    .fetch_all(pool)
113    .await?;
114
115    Ok(migrations
116        .into_iter()
117        .map(|m| MigrationInfo {
118            version: m.0,
119            description: m.1,
120            installed_on: m.2,
121            execution_time: m.3,
122            success: m.4,
123            checksum: None,
124        })
125        .collect())
126}
127
128/// Check if a specific migration version has been applied
129pub async fn is_migration_applied(pool: &PgPool, version: i64) -> Result<bool> {
130    let exists = sqlx::query_scalar::<_, bool>(
131        r#"
132        SELECT EXISTS (
133            SELECT 1 FROM _sqlx_migrations
134            WHERE version = $1 AND success = true
135        )
136        "#,
137    )
138    .bind(version)
139    .fetch_one(pool)
140    .await?;
141
142    Ok(exists)
143}
144
145/// Verify schema integrity by checking for required tables
146pub async fn verify_schema_integrity(pool: &PgPool) -> Result<SchemaIntegrity> {
147    let required_tables = vec![
148        "users",
149        "tokens",
150        "balances",
151        "orders",
152        "trades",
153        "reputation_events",
154        "output_commitments",
155        "admin_actions",
156    ];
157
158    let mut missing_tables = Vec::new();
159    let mut existing_tables = Vec::new();
160
161    for table in required_tables {
162        let exists = sqlx::query_scalar::<_, bool>(
163            r#"
164            SELECT EXISTS (
165                SELECT FROM information_schema.tables
166                WHERE table_schema = 'public'
167                AND table_name = $1
168            )
169            "#,
170        )
171        .bind(table)
172        .fetch_one(pool)
173        .await?;
174
175        if exists {
176            existing_tables.push(table.to_string());
177        } else {
178            missing_tables.push(table.to_string());
179        }
180    }
181
182    Ok(SchemaIntegrity {
183        is_valid: missing_tables.is_empty(),
184        existing_tables,
185        missing_tables,
186    })
187}
188
189/// Schema integrity check result
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct SchemaIntegrity {
192    /// Whether all required tables exist
193    pub is_valid: bool,
194    /// List of existing required tables
195    pub existing_tables: Vec<String>,
196    /// List of missing required tables
197    pub missing_tables: Vec<String>,
198}
199
200/// Get database schema version from a custom version table
201///
202/// This is useful if you maintain a separate version table for semantic versioning
203pub async fn get_schema_version(pool: &PgPool) -> Result<Option<String>> {
204    // Check if custom schema_version table exists
205    let table_exists = sqlx::query_scalar::<_, bool>(
206        r#"
207        SELECT EXISTS (
208            SELECT FROM information_schema.tables
209            WHERE table_schema = 'public'
210            AND table_name = 'schema_version'
211        )
212        "#,
213    )
214    .fetch_one(pool)
215    .await?;
216
217    if !table_exists {
218        return Ok(None);
219    }
220
221    let version = sqlx::query_scalar::<_, String>(
222        r#"
223        SELECT version FROM schema_version
224        ORDER BY installed_at DESC
225        LIMIT 1
226        "#,
227    )
228    .fetch_optional(pool)
229    .await?;
230
231    Ok(version)
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[test]
239    fn test_migration_info_structure() {
240        let info = MigrationInfo {
241            version: 1,
242            description: "initial".to_string(),
243            installed_on: Utc::now(),
244            execution_time: 100,
245            success: true,
246            checksum: Some("abc123".to_string()),
247        };
248
249        assert_eq!(info.version, 1);
250        assert!(info.success);
251    }
252
253    #[test]
254    fn test_migration_status_structure() {
255        let status = MigrationStatus {
256            current_version: 5,
257            total_migrations: 5,
258            pending_count: 0,
259            is_up_to_date: true,
260            last_migration: None,
261        };
262
263        assert_eq!(status.current_version, 5);
264        assert!(status.is_up_to_date);
265    }
266
267    #[test]
268    fn test_schema_integrity_valid() {
269        let integrity = SchemaIntegrity {
270            is_valid: true,
271            existing_tables: vec!["users".to_string(), "tokens".to_string()],
272            missing_tables: vec![],
273        };
274
275        assert!(integrity.is_valid);
276        assert!(integrity.missing_tables.is_empty());
277    }
278
279    #[test]
280    fn test_schema_integrity_invalid() {
281        let integrity = SchemaIntegrity {
282            is_valid: false,
283            existing_tables: vec!["users".to_string()],
284            missing_tables: vec!["tokens".to_string()],
285        };
286
287        assert!(!integrity.is_valid);
288        assert_eq!(integrity.missing_tables.len(), 1);
289    }
290
291    #[test]
292    fn test_migration_info_serialization() {
293        let info = MigrationInfo {
294            version: 1,
295            description: "test".to_string(),
296            installed_on: Utc::now(),
297            execution_time: 50,
298            success: true,
299            checksum: None,
300        };
301
302        let json = serde_json::to_string(&info).unwrap();
303        let deserialized: MigrationInfo = serde_json::from_str(&json).unwrap();
304
305        assert_eq!(deserialized.version, info.version);
306        assert_eq!(deserialized.description, info.description);
307    }
308}