systemprompt_database/lifecycle/migrations/
status.rs1use super::MigrationService;
4use std::collections::HashSet;
5use systemprompt_extension::{Extension, LoaderError, Migration};
6
7#[derive(Debug, Clone)]
8pub struct AppliedMigration {
9 pub extension_id: String,
10 pub version: u32,
11 pub name: String,
12 pub checksum: String,
13 pub applied_at: Option<String>,
14}
15
16#[derive(Debug, Clone)]
17pub struct PendingMigration {
18 pub extension_id: String,
19 pub version: u32,
20 pub name: String,
21 pub sql: &'static str,
22 pub checksum: String,
23 pub no_tx: bool,
24}
25
26#[derive(Debug, Clone)]
27pub struct ChecksumDrift {
28 pub extension_id: String,
29 pub version: u32,
30 pub name: String,
31 pub stored_checksum: String,
32 pub current_checksum: String,
33}
34
35#[derive(Debug, Clone, Default)]
36pub struct ExtensionMigrationStatus {
37 pub extension_id: String,
38 pub applied: Vec<AppliedMigration>,
39 pub pending: Vec<PendingMigration>,
40 pub drift: Vec<ChecksumDrift>,
41}
42
43#[derive(Debug, Default, Clone, Copy)]
44pub struct MigrationResult {
45 pub migrations_run: usize,
46 pub migrations_skipped: usize,
47}
48
49#[derive(Debug)]
50pub struct MigrationStatus {
51 pub extension_id: String,
52 pub total_defined: usize,
53 pub total_applied: usize,
54 pub pending_count: usize,
55 pub pending: Vec<Migration>,
56 pub applied: Vec<AppliedMigration>,
57}
58
59impl MigrationService<'_> {
60 pub async fn plan_pending(
61 &self,
62 extension: &dyn Extension,
63 ) -> Result<Vec<PendingMigration>, LoaderError> {
64 let ext_id = extension.metadata().id;
65 let defined = extension.migrations();
66
67 if defined.is_empty() {
68 return Ok(Vec::new());
69 }
70
71 self.ensure_migrations_table_exists().await?;
72 let applied_versions: HashSet<u32> = self
73 .get_applied_migrations(ext_id)
74 .await?
75 .into_iter()
76 .map(|m| m.version)
77 .collect();
78
79 Ok(defined
80 .into_iter()
81 .filter(|m| !applied_versions.contains(&m.version))
82 .map(|m| PendingMigration {
83 extension_id: ext_id.to_string(),
84 version: m.version,
85 name: m.name.clone(),
86 sql: m.sql,
87 checksum: m.checksum(),
88 no_tx: false,
89 })
90 .collect())
91 }
92
93 pub async fn status(
94 &self,
95 extension: &dyn Extension,
96 ) -> Result<ExtensionMigrationStatus, LoaderError> {
97 let ext_id = extension.metadata().id;
98 let defined = extension.migrations();
99
100 self.ensure_migrations_table_exists().await?;
101 let applied = self.get_applied_migrations(ext_id).await?;
102
103 let applied_versions: HashSet<u32> = applied.iter().map(|m| m.version).collect();
104 let applied_checksums: std::collections::HashMap<u32, &str> = applied
105 .iter()
106 .map(|m| (m.version, m.checksum.as_str()))
107 .collect();
108
109 let mut pending = Vec::new();
110 let mut drift = Vec::new();
111
112 for m in &defined {
113 let current_checksum = m.checksum();
114 if applied_versions.contains(&m.version) {
115 if let Some(&stored_checksum) = applied_checksums.get(&m.version) {
116 if stored_checksum != current_checksum {
117 drift.push(ChecksumDrift {
118 extension_id: ext_id.to_string(),
119 version: m.version,
120 name: m.name.clone(),
121 stored_checksum: stored_checksum.to_string(),
122 current_checksum,
123 });
124 }
125 }
126 } else {
127 pending.push(PendingMigration {
128 extension_id: ext_id.to_string(),
129 version: m.version,
130 name: m.name.clone(),
131 sql: m.sql,
132 checksum: current_checksum,
133 no_tx: false,
134 });
135 }
136 }
137
138 Ok(ExtensionMigrationStatus {
139 extension_id: ext_id.to_string(),
140 applied,
141 pending,
142 drift,
143 })
144 }
145
146 pub async fn get_migration_status(
147 &self,
148 extension: &dyn Extension,
149 ) -> Result<MigrationStatus, LoaderError> {
150 self.ensure_migrations_table_exists().await?;
151
152 let ext_id = extension.metadata().id;
153 let defined_migrations = extension.migrations();
154 let applied = self.get_applied_migrations(ext_id).await?;
155
156 let applied_versions: HashSet<u32> = applied.iter().map(|m| m.version).collect();
157
158 let pending: Vec<_> = defined_migrations
159 .iter()
160 .filter(|m| !applied_versions.contains(&m.version))
161 .cloned()
162 .collect();
163
164 Ok(MigrationStatus {
165 extension_id: ext_id.to_string(),
166 total_defined: defined_migrations.len(),
167 total_applied: applied.len(),
168 pending_count: pending.len(),
169 pending,
170 applied,
171 })
172 }
173}