systemprompt-database 0.10.2

PostgreSQL infrastructure for systemprompt.io AI governance. SQLx-backed pool, generic repository traits, and compile-time query verification. Part of the systemprompt.io AI governance pipeline.
Documentation
//! Reverting applied migrations via their declared `down` SQL.

use super::exec::execute_statements_transactional;
use super::{MigrationResult, MigrationService};
use crate::services::SqlExecutor;
use systemprompt_extension::{Extension, LoaderError, Migration};
use tracing::info;

impl MigrationService<'_> {
    pub async fn run_down_migrations(
        &self,
        extension: &dyn Extension,
        count: u32,
    ) -> Result<MigrationResult, LoaderError> {
        if count == 0 {
            return Ok(MigrationResult::default());
        }

        let ext_id = extension.metadata().id;
        self.ensure_migrations_table_exists().await?;

        let result = self
            .db
            .query_raw_with(
                &"SELECT version FROM extension_migrations WHERE extension_id = $1 ORDER BY \
                  version DESC LIMIT $2",
                &[&ext_id, &count],
            )
            .await
            .map_err(|e| LoaderError::MigrationFailed {
                extension: ext_id.to_string(),
                message: format!("Failed to query applied migrations for revert: {e}"),
            })?;

        let versions: Vec<u32> = result
            .rows
            .iter()
            .filter_map(|row| row.get("version")?.as_i64().map(|v| v as u32))
            .collect();

        if versions.is_empty() {
            return Ok(MigrationResult::default());
        }

        let migrations = extension.migrations();
        let mut migrations_run = 0;

        for version in versions {
            self.revert_version(ext_id, version, &migrations).await?;
            migrations_run += 1;
        }

        Ok(MigrationResult {
            migrations_run,
            migrations_skipped: 0,
        })
    }

    async fn revert_version(
        &self,
        ext_id: &str,
        version: u32,
        migrations: &[Migration],
    ) -> Result<(), LoaderError> {
        let migration = migrations
            .iter()
            .find(|m| m.version == version)
            .ok_or_else(|| LoaderError::MigrationFailed {
                extension: ext_id.to_string(),
                message: format!(
                    "Cannot revert migration {version}: not declared in Extension::migrations()"
                ),
            })?;

        let down_sql = migration
            .down
            .ok_or_else(|| LoaderError::MigrationNotReversible {
                extension: ext_id.to_string(),
                version,
            })?;

        info!(
            extension = %ext_id,
            version = migration.version,
            name = %migration.name,
            "Reverting migration"
        );

        let statements = SqlExecutor::parse_sql_statements(down_sql).map_err(|e| {
            LoaderError::MigrationFailed {
                extension: ext_id.to_string(),
                message: format!(
                    "Failed to parse down migration {} ({}): {e}",
                    migration.version, migration.name
                ),
            }
        })?;
        execute_statements_transactional(self.db, &statements, ext_id, migration).await?;

        self.delete_migration_record(ext_id, version).await
    }

    async fn delete_migration_record(&self, ext_id: &str, version: u32) -> Result<(), LoaderError> {
        self.db
            .execute(
                &"DELETE FROM extension_migrations WHERE extension_id = $1 AND version = $2",
                &[&ext_id, &version],
            )
            .await
            .map_err(|e| LoaderError::MigrationFailed {
                extension: ext_id.to_string(),
                message: format!("Failed to delete migration record {version}: {e}"),
            })?;

        Ok(())
    }
}