Skip to main content

systemprompt_database/lifecycle/migrations/
down.rs

1//! Reverting applied migrations via their declared `down` SQL.
2
3use super::exec::execute_statements_transactional;
4use super::{MigrationResult, MigrationService};
5use crate::services::SqlExecutor;
6use systemprompt_extension::{Extension, LoaderError, Migration};
7use tracing::info;
8
9impl MigrationService<'_> {
10    pub async fn run_down_migrations(
11        &self,
12        extension: &dyn Extension,
13        count: u32,
14    ) -> Result<MigrationResult, LoaderError> {
15        if count == 0 {
16            return Ok(MigrationResult::default());
17        }
18
19        let ext_id = extension.metadata().id;
20        self.ensure_migrations_table_exists().await?;
21
22        let result = self
23            .db
24            .query_raw_with(
25                &"SELECT version FROM extension_migrations WHERE extension_id = $1 ORDER BY \
26                  version DESC LIMIT $2",
27                &[&ext_id, &count],
28            )
29            .await
30            .map_err(|e| LoaderError::MigrationFailed {
31                extension: ext_id.to_string(),
32                message: format!("Failed to query applied migrations for revert: {e}"),
33            })?;
34
35        let versions: Vec<u32> = result
36            .rows
37            .iter()
38            .filter_map(|row| row.get("version")?.as_i64().map(|v| v as u32))
39            .collect();
40
41        if versions.is_empty() {
42            return Ok(MigrationResult::default());
43        }
44
45        let migrations = extension.migrations();
46        let mut migrations_run = 0;
47
48        for version in versions {
49            self.revert_version(ext_id, version, &migrations).await?;
50            migrations_run += 1;
51        }
52
53        Ok(MigrationResult {
54            migrations_run,
55            migrations_skipped: 0,
56        })
57    }
58
59    async fn revert_version(
60        &self,
61        ext_id: &str,
62        version: u32,
63        migrations: &[Migration],
64    ) -> Result<(), LoaderError> {
65        let migration = migrations
66            .iter()
67            .find(|m| m.version == version)
68            .ok_or_else(|| LoaderError::MigrationFailed {
69                extension: ext_id.to_string(),
70                message: format!(
71                    "Cannot revert migration {version}: not declared in Extension::migrations()"
72                ),
73            })?;
74
75        let down_sql = migration
76            .down
77            .ok_or_else(|| LoaderError::MigrationNotReversible {
78                extension: ext_id.to_string(),
79                version,
80            })?;
81
82        info!(
83            extension = %ext_id,
84            version = migration.version,
85            name = %migration.name,
86            "Reverting migration"
87        );
88
89        let statements = SqlExecutor::parse_sql_statements(down_sql).map_err(|e| {
90            LoaderError::MigrationFailed {
91                extension: ext_id.to_string(),
92                message: format!(
93                    "Failed to parse down migration {} ({}): {e}",
94                    migration.version, migration.name
95                ),
96            }
97        })?;
98        execute_statements_transactional(self.db, &statements, ext_id, migration).await?;
99
100        self.delete_migration_record(ext_id, version).await
101    }
102
103    async fn delete_migration_record(&self, ext_id: &str, version: u32) -> Result<(), LoaderError> {
104        self.db
105            .execute(
106                &"DELETE FROM extension_migrations WHERE extension_id = $1 AND version = $2",
107                &[&ext_id, &version],
108            )
109            .await
110            .map_err(|e| LoaderError::MigrationFailed {
111                extension: ext_id.to_string(),
112                message: format!("Failed to delete migration record {version}: {e}"),
113            })?;
114
115        Ok(())
116    }
117}