systemprompt_database/lifecycle/migrations/
down.rs1use super::exec::execute_statements_transactional;
4use super::{MigrationResult, MigrationService};
5use crate::services::SqlExecutor;
6use systemprompt_extension::{Extension, LoaderError, Migration};
7use tracing::info;
8
9impl MigrationService<'_> {
10 pub async fn run_down_migrations(
11 &self,
12 extension: &dyn Extension,
13 count: u32,
14 ) -> Result<MigrationResult, LoaderError> {
15 if count == 0 {
16 return Ok(MigrationResult::default());
17 }
18
19 let ext_id = extension.metadata().id;
20 self.ensure_migrations_table_exists().await?;
21
22 let result = self
23 .db
24 .query_raw_with(
25 &"SELECT version FROM extension_migrations WHERE extension_id = $1 ORDER BY \
26 version DESC LIMIT $2",
27 &[&ext_id, &count],
28 )
29 .await
30 .map_err(|e| LoaderError::MigrationFailed {
31 extension: ext_id.to_string(),
32 message: format!("Failed to query applied migrations for revert: {e}"),
33 })?;
34
35 let versions: Vec<u32> = result
36 .rows
37 .iter()
38 .filter_map(|row| row.get("version")?.as_i64().map(|v| v as u32))
39 .collect();
40
41 if versions.is_empty() {
42 return Ok(MigrationResult::default());
43 }
44
45 let migrations = extension.migrations();
46 let mut migrations_run = 0;
47
48 for version in versions {
49 self.revert_version(ext_id, version, &migrations).await?;
50 migrations_run += 1;
51 }
52
53 Ok(MigrationResult {
54 migrations_run,
55 migrations_skipped: 0,
56 })
57 }
58
59 async fn revert_version(
60 &self,
61 ext_id: &str,
62 version: u32,
63 migrations: &[Migration],
64 ) -> Result<(), LoaderError> {
65 let migration = migrations
66 .iter()
67 .find(|m| m.version == version)
68 .ok_or_else(|| LoaderError::MigrationFailed {
69 extension: ext_id.to_string(),
70 message: format!(
71 "Cannot revert migration {version}: not declared in Extension::migrations()"
72 ),
73 })?;
74
75 let down_sql = migration
76 .down
77 .ok_or_else(|| LoaderError::MigrationNotReversible {
78 extension: ext_id.to_string(),
79 version,
80 })?;
81
82 info!(
83 extension = %ext_id,
84 version = migration.version,
85 name = %migration.name,
86 "Reverting migration"
87 );
88
89 let statements = SqlExecutor::parse_sql_statements(down_sql).map_err(|e| {
90 LoaderError::MigrationFailed {
91 extension: ext_id.to_string(),
92 message: format!(
93 "Failed to parse down migration {} ({}): {e}",
94 migration.version, migration.name
95 ),
96 }
97 })?;
98 execute_statements_transactional(self.db, &statements, ext_id, migration).await?;
99
100 self.delete_migration_record(ext_id, version).await
101 }
102
103 async fn delete_migration_record(&self, ext_id: &str, version: u32) -> Result<(), LoaderError> {
104 self.db
105 .execute(
106 &"DELETE FROM extension_migrations WHERE extension_id = $1 AND version = $2",
107 &[&ext_id, &version],
108 )
109 .await
110 .map_err(|e| LoaderError::MigrationFailed {
111 extension: ext_id.to_string(),
112 message: format!("Failed to delete migration record {version}: {e}"),
113 })?;
114
115 Ok(())
116 }
117}