kinetics 0.15.1

Kinetics is a hosting platform for Rust applications that allows you to deploy all types of workloads by writing **only Rust code**.
Documentation
use crate::writer::Writer;
use color_eyre::owo_colors::OwoColorize;
use eyre::Context;
use sqlparser::{ast::Statement, dialect::PostgreSqlDialect, parser::Parser};
use sqlx::Row;
use std::path::Path;

/// A struct representing a set of database migration files.
///
/// The `Migrations` struct is used to manage and refer to a collection of database
/// migration files stored in a specific directory.
pub struct Migrations<'a> {
    /// Directory where migration files are stored
    path: &'a Path,

    /// Global output writer
    writer: &'a Writer,
}

/// Methods for managing database migrations
impl<'a> Migrations<'a> {
    /// Creates a new `Migrations` instance from a directory path
    ///
    /// `path` is a path to the migrations directory; it must exist in the filesystem
    pub fn new(path: &'a Path, writer: &'a Writer) -> eyre::Result<Self> {
        if !path.try_exists()? {
            return Err(eyre::eyre!(
                "Migrations directory does not exist: {}",
                path.display()
            ));
        }

        Ok(Self { path, writer })
    }

    /// Applies database migrations based on the stored migration files and the current state
    /// of the database.
    ///
    /// This function retrieves the latest applied migration ID, and determines which migrations
    /// (if any) need to be applied. It then applies the pending migrations sequentially
    /// and updates the `schema_migrations` table to record each migration.
    ///
    /// Apply DDL statements separately because DSQL does not support mixing DDL and DML
    /// statements within a transaction.
    pub async fn apply(&self, connection_string: String) -> eyre::Result<()> {
        let connection = sqlx::PgPool::connect(&connection_string)
            .await
            .wrap_err("Failed to connect to database")?;

        // Get latest applied migration
        let result = sqlx::query("SELECT MAX(id) FROM schema_migrations")
            .fetch_one(&connection)
            .await?;

        let last_db_id: String = result
            .try_get::<Option<String>, _>(0)
            .unwrap_or_default()
            .unwrap_or("0".to_string());

        let migrations = self.migrations(&last_db_id).await?;

        if migrations.is_empty() {
            self.writer.text(&format!(
                "{}",
                console::style("No migrations to apply...").yellow()
            ))?;

            return Ok(());
        }

        self.validate(&migrations).await?;

        for (filename, content) in migrations {
            sqlx::raw_sql(&content)
                .execute(&connection)
                .await
                .inspect_err(|e| log::error!("Error: {e:?}"))
                .wrap_err("Failed to apply migration")?;

            sqlx::query(r#"INSERT INTO "schema_migrations" (id) VALUES ($1)"#)
                .bind(&filename)
                .execute(&connection)
                .await?;

            self.writer.text(&format!(
                "{} {}\n",
                console::style("").green(),
                console::style(&filename).dimmed()
            ))?;
        }

        Ok(())
    }

    /// Creates a new migration file with a unique filename based on the current
    /// timestamp and an optional user-provided name.
    ///
    /// The migration file is created within a specified directory and is initially empty.
    pub async fn create(&self, name: Option<&str>) -> eyre::Result<()> {
        let timestamp = chrono::Utc::now().format("%Y%m%d%H%M%S");

        // Allow only alphanumeric characters and underscores
        let name = name
            .unwrap_or_default()
            .replace(" ", "_")
            .chars()
            .filter(|c| c.is_alphanumeric() || *c == '_')
            .take(100)
            .collect::<String>();

        // Generate a unique filename based on the current timestamp and optional migration name
        let filename = [timestamp.to_string(), name]
            .into_iter()
            .filter(|c| !c.is_empty())
            .collect::<Vec<_>>()
            .join("_");

        let filepath = self.path.join(format!("{}.up.sql", filename));

        // TODO Add some helpful comments to the migration file
        tokio::fs::write(&filepath, "")
            .await
            .wrap_err("Failed to create a migration file")?;

        self.writer.text(&format!(
            "{} {} {}\n",
            console::style("Created migration").green().bold(),
            console::style("at").dim(),
            console::style(format!("{}", filepath.to_string_lossy()))
                .underlined()
                .bold(),
        ))?;

        Ok(())
    }

    /// Retrieves a sorted (ASC) list of database migration files and their contents to be applied.
    ///
    /// `last_applied_id`: representing the identifier of the last applied migration.
    /// Only migration files with names greater than this identifier (in lexicographical order)
    /// will be included.
    ///
    /// Returns a vector of tuples. Each tuple contains:
    ///   - `String`: The name of the migration file
    ///   - `String`: The content of the migration file
    async fn migrations(&self, last_applied_id: &str) -> eyre::Result<Vec<(String, String)>> {
        let mut read_dir = tokio::fs::read_dir(self.path)
            .await
            .wrap_err("Failed to read migrations dir")?;

        let mut paths = Vec::new();

        // Collect all valid migration files
        while let Some(entry) = read_dir.next_entry().await? {
            let path = entry.path();

            if !path.is_file() {
                continue;
            }

            let filename = match path.file_name().and_then(|n| n.to_str()) {
                Some(name) => name.to_owned(),
                None => {
                    log::warn!("Invalid filename: {:?}. Skipping...", path);
                    continue;
                }
            };

            if filename.ends_with(".up.sql") {
                paths.push((filename, path));
            }
        }

        // Sort migrations by name in ASC (the oldest first) order
        paths.sort_by(|(name1, _), (name2, _)| name1.cmp(name2));

        let mut result = Vec::new();

        // Filter out migrations that have already been applied and read its content
        for (filename, path) in paths {
            if filename.as_str() > last_applied_id {
                let content = tokio::fs::read_to_string(path)
                    .await
                    .wrap_err("Failed to read file")?;

                result.push((filename, content));
            }
        }

        Ok(result)
    }

    /// Validates SQL statements of migrations
    ///
    /// Returns an error if DDL and DML are mixed in the same file
    async fn validate(&self, migrations: &Vec<(String, String)>) -> eyre::Result<()> {
        for (path, content) in migrations {
            // Strip ASYNC keyword before parsing — sqlparser doesn't support
            // CREATE ASYNC INDEX (DSQL syntax) but it's need DDL/DML classification only
            let sanitized = content.replace("CREATE ASYNC INDEX", "CREATE INDEX");
            let statements = Parser::parse_sql(&PostgreSqlDialect {}, &sanitized)
                .wrap_err("Failed to parse migration SQL")?;

            let mut ddl = Vec::new();
            let mut others = Vec::new();

            for stmt in statements {
                if matches!(
                    stmt,
                    Statement::CreateTable { .. }
                        | Statement::AlterTable { .. }
                        | Statement::Drop { .. }
                        | Statement::CreateIndex { .. }
                        | Statement::CreateView { .. }
                ) {
                    ddl.push(stmt.to_string());
                } else {
                    others.push(stmt.to_string());
                }
            }

            if !ddl.is_empty() && !others.is_empty() {
                eyre::bail!(
                    "Migration: {path} contains both DDL and DML statements. \
                    Please split them into separate migration files."
                )
            }
        }

        Ok(())
    }
}