flowfine/
runner.rs

1use crate::migration::Migration;
2use crate::runner::MigrationExecutionError::*;
3use async_trait::async_trait;
4use chrono::{Duration, Utc};
5use lazy_static::lazy_static;
6use nanoid::nanoid;
7use scylla::frame::value::Timestamp;
8use scylla::transport::errors::QueryError;
9use scylla::{FromRow, QueryResult, Session};
10use sha2::{Digest, Sha256};
11use std::sync::Arc;
12use thiserror::Error;
13
14lazy_static! {
15    static ref NANOID_LENGTH: usize = 15;
16    static ref HISTORY_TABLE_NAME: String = "flowfine_history".to_string();
17}
18
19#[derive(Error, Debug)]
20pub enum MigrationExecutionError {
21    #[error("Migration table was not created: {0}")]
22    CreateHistoryTableError(QueryError),
23
24    #[error("Migration {0} failed: {1}")]
25    RunMigrationError(String, QueryError),
26
27    #[error("")]
28    MigrationError(QueryError),
29
30    #[error("Migration history could not be applied: {0}")]
31    ApplyHistoryError(QueryError),
32}
33
34#[derive(FromRow)]
35pub struct AppliedMigration {
36    pub id: String,
37    pub version: String,
38    pub name: String,
39    pub filename: String,
40    pub checksum: String,
41    pub applied_at: Duration,
42    pub success: bool,
43}
44
45#[async_trait]
46pub trait MigrationRunner {
47    async fn run(
48        &self,
49        migrations: Vec<Migration>,
50    ) -> Result<Vec<AppliedMigration>, MigrationExecutionError>;
51}
52
53pub struct ScyllaMigrationRunner {
54    session: Arc<Session>,
55    keyspace: String,
56}
57
58impl<'a> ScyllaMigrationRunner {
59    pub fn new(session: Arc<Session>, keyspace: &str) -> Self {
60        Self {
61            session,
62            keyspace: keyspace.to_string(),
63        }
64    }
65
66    async fn apply_migration(&self, migration: &Migration) -> Result<(), MigrationExecutionError> {
67        for query in &migration.queries {
68            self.session
69                .query(query.clone(), &[])
70                .await
71                .map_err(|err| RunMigrationError(migration.filename.clone(), err.clone()))?;
72        }
73
74        Ok(())
75    }
76
77    async fn apply_history(
78        &self,
79        success: bool,
80        migration: &Migration,
81    ) -> Result<AppliedMigration, MigrationExecutionError> {
82        let query = format!(
83            "INSERT INTO {keyspace}.{history_table} (id, version, name, filename, checksum, applied_at, success) VALUES (?, ?, ?, ?, ?, ?, ?);",
84            keyspace = self.keyspace,
85            history_table = *HISTORY_TABLE_NAME
86        );
87
88        let nanoid_len = *NANOID_LENGTH;
89        let applied_migration = AppliedMigration {
90            id: nanoid!(nanoid_len).to_string(),
91            version: migration.version.clone(),
92            name: migration.name.clone(),
93            filename: migration.filename.clone(),
94            checksum: self.create_checksum(&migration),
95            applied_at: Duration::nanoseconds(Utc::now().timestamp_nanos()), //todo: move the code to date utils
96            success,
97        };
98
99        self.session
100            .query(
101                query,
102                (
103                    &applied_migration.id,
104                    &applied_migration.version,
105                    &applied_migration.name,
106                    &applied_migration.filename,
107                    &applied_migration.checksum,
108                    Timestamp(applied_migration.applied_at),
109                    &applied_migration.success,
110                ),
111            )
112            .await
113            .map(|_| applied_migration)
114            .map_err(|err| ApplyHistoryError(err.clone()))
115    }
116
117    async fn find_latest_applied_migration(
118        &self,
119    ) -> Result<Option<AppliedMigration>, MigrationExecutionError> {
120        let query = format!(
121            "SELECT id, version, name, filename, checksum, applied_at, success
122                FROM {keyspace}.{history_table} 
123                WHERE success = true LIMIT 1;
124             ",
125            keyspace = self.keyspace,
126            history_table = *HISTORY_TABLE_NAME
127        );
128
129        self.session
130            .query(query, &[])
131            .await
132            .map(|query_result| {
133                query_result
134                    .maybe_first_row_typed::<AppliedMigration>()
135                    .unwrap()
136            })
137            .map_err(|err| MigrationError(err.clone()))
138    }
139
140    fn create_checksum(&self, migration: &Migration) -> String {
141        let checksum = Sha256::new()
142            .chain_update(migration.version.as_bytes())
143            .chain_update(migration.name.as_bytes())
144            .chain_update(migration.content.as_bytes())
145            .finalize();
146
147        format!("{:x}", checksum)
148    }
149
150    async fn create_history_table(&self) -> Result<QueryResult, MigrationExecutionError> {
151        let query = format!(
152            "CREATE TABLE IF NOT EXISTS {keyspace}.{history_table} (
153                id         TEXT,
154                version    TEXT,
155                name       TEXT, 
156                filename   TEXT,
157                checksum   TEXT,
158                success    BOOLEAN,
159                applied_at TIMESTAMP,
160                PRIMARY KEY (success, applied_at)
161            ) WITH CLUSTERING ORDER BY (applied_at DESC);
162            ",
163            keyspace = self.keyspace,
164            history_table = *HISTORY_TABLE_NAME
165        );
166
167        self.session
168            .query(query, &[])
169            .await
170            .map_err(|err| CreateHistoryTableError(err))
171    }
172}
173
174#[async_trait]
175impl MigrationRunner for ScyllaMigrationRunner {
176    async fn run(
177        &self,
178        migrations: Vec<Migration>,
179    ) -> Result<Vec<AppliedMigration>, MigrationExecutionError> {
180        let mut applied_migrations = Vec::new();
181
182        self.create_history_table().await?;
183        let latest_migration = self.find_latest_applied_migration().await?;
184
185        for migration in migrations {
186            //todo: add property in config to verify integrity of all migrations
187            if let Some(latest_migration) = &latest_migration {
188                if latest_migration.version >= migration.version {
189                    continue;
190                }
191            }
192
193            match self.apply_migration(&migration).await {
194                Ok(_) => {
195                    let applied_migration = self.apply_history(true, &migration).await?;
196                    applied_migrations.push(applied_migration);
197                    println!("Applied migration.rs {}", migration.filename)
198                }
199                Err(err) => {
200                    self.apply_history(false, &migration).await?;
201                    println!("Failed to apply migration.rs {}", migration.filename);
202                    return Err(err);
203                }
204            };
205        }
206
207        Ok(applied_migrations)
208    }
209}