Skip to main content

systemprompt_database/lifecycle/migrations/
status.rs

1//! Migration status and plan queries, plus the value types they return.
2
3use super::MigrationService;
4use std::collections::HashSet;
5use systemprompt_extension::{Extension, LoaderError, Migration};
6
7#[derive(Debug, Clone)]
8pub struct AppliedMigration {
9    pub extension_id: String,
10    pub version: u32,
11    pub name: String,
12    pub checksum: String,
13    pub applied_at: Option<String>,
14}
15
16#[derive(Debug, Clone)]
17pub struct PendingMigration {
18    pub extension_id: String,
19    pub version: u32,
20    pub name: String,
21    pub sql: &'static str,
22    pub checksum: String,
23    pub no_tx: bool,
24}
25
26#[derive(Debug, Clone)]
27pub struct ChecksumDrift {
28    pub extension_id: String,
29    pub version: u32,
30    pub name: String,
31    pub stored_checksum: String,
32    pub current_checksum: String,
33}
34
35#[derive(Debug, Clone, Default)]
36pub struct ExtensionMigrationStatus {
37    pub extension_id: String,
38    pub applied: Vec<AppliedMigration>,
39    pub pending: Vec<PendingMigration>,
40    pub drift: Vec<ChecksumDrift>,
41}
42
43#[derive(Debug, Default, Clone, Copy)]
44pub struct MigrationResult {
45    pub migrations_run: usize,
46    pub migrations_skipped: usize,
47}
48
49#[derive(Debug)]
50pub struct MigrationStatus {
51    pub extension_id: String,
52    pub total_defined: usize,
53    pub total_applied: usize,
54    pub pending_count: usize,
55    pub pending: Vec<Migration>,
56    pub applied: Vec<AppliedMigration>,
57}
58
59impl MigrationService<'_> {
60    pub async fn plan_pending(
61        &self,
62        extension: &dyn Extension,
63    ) -> Result<Vec<PendingMigration>, LoaderError> {
64        let ext_id = extension.metadata().id;
65        let defined = extension.migrations();
66
67        if defined.is_empty() {
68            return Ok(Vec::new());
69        }
70
71        self.ensure_migrations_table_exists().await?;
72        let applied_versions: HashSet<u32> = self
73            .get_applied_migrations(ext_id)
74            .await?
75            .into_iter()
76            .map(|m| m.version)
77            .collect();
78
79        Ok(defined
80            .into_iter()
81            .filter(|m| !applied_versions.contains(&m.version))
82            .map(|m| PendingMigration {
83                extension_id: ext_id.to_string(),
84                version: m.version,
85                name: m.name.clone(),
86                sql: m.sql,
87                checksum: m.checksum(),
88                no_tx: false,
89            })
90            .collect())
91    }
92
93    pub async fn status(
94        &self,
95        extension: &dyn Extension,
96    ) -> Result<ExtensionMigrationStatus, LoaderError> {
97        let ext_id = extension.metadata().id;
98        let defined = extension.migrations();
99
100        self.ensure_migrations_table_exists().await?;
101        let applied = self.get_applied_migrations(ext_id).await?;
102
103        let applied_versions: HashSet<u32> = applied.iter().map(|m| m.version).collect();
104        let applied_checksums: std::collections::HashMap<u32, &str> = applied
105            .iter()
106            .map(|m| (m.version, m.checksum.as_str()))
107            .collect();
108
109        let mut pending = Vec::new();
110        let mut drift = Vec::new();
111
112        for m in &defined {
113            let current_checksum = m.checksum();
114            if applied_versions.contains(&m.version) {
115                if let Some(&stored_checksum) = applied_checksums.get(&m.version) {
116                    if stored_checksum != current_checksum {
117                        drift.push(ChecksumDrift {
118                            extension_id: ext_id.to_string(),
119                            version: m.version,
120                            name: m.name.clone(),
121                            stored_checksum: stored_checksum.to_string(),
122                            current_checksum,
123                        });
124                    }
125                }
126            } else {
127                pending.push(PendingMigration {
128                    extension_id: ext_id.to_string(),
129                    version: m.version,
130                    name: m.name.clone(),
131                    sql: m.sql,
132                    checksum: current_checksum,
133                    no_tx: false,
134                });
135            }
136        }
137
138        Ok(ExtensionMigrationStatus {
139            extension_id: ext_id.to_string(),
140            applied,
141            pending,
142            drift,
143        })
144    }
145
146    pub async fn get_migration_status(
147        &self,
148        extension: &dyn Extension,
149    ) -> Result<MigrationStatus, LoaderError> {
150        self.ensure_migrations_table_exists().await?;
151
152        let ext_id = extension.metadata().id;
153        let defined_migrations = extension.migrations();
154        let applied = self.get_applied_migrations(ext_id).await?;
155
156        let applied_versions: HashSet<u32> = applied.iter().map(|m| m.version).collect();
157
158        let pending: Vec<_> = defined_migrations
159            .iter()
160            .filter(|m| !applied_versions.contains(&m.version))
161            .cloned()
162            .collect();
163
164        Ok(MigrationStatus {
165            extension_id: ext_id.to_string(),
166            total_defined: defined_migrations.len(),
167            total_applied: applied.len(),
168            pending_count: pending.len(),
169            pending,
170            applied,
171        })
172    }
173}