1use crate::migration::Migration;
2use crate::runner::MigrationExecutionError::*;
3use async_trait::async_trait;
4use chrono::{Duration, Utc};
5use lazy_static::lazy_static;
6use nanoid::nanoid;
7use scylla::frame::value::Timestamp;
8use scylla::transport::errors::QueryError;
9use scylla::{FromRow, QueryResult, Session};
10use sha2::{Digest, Sha256};
11use std::sync::Arc;
12use thiserror::Error;
13
14lazy_static! {
15 static ref NANOID_LENGTH: usize = 15;
16 static ref HISTORY_TABLE_NAME: String = "flowfine_history".to_string();
17}
18
19#[derive(Error, Debug)]
20pub enum MigrationExecutionError {
21 #[error("Migration table was not created: {0}")]
22 CreateHistoryTableError(QueryError),
23
24 #[error("Migration {0} failed: {1}")]
25 RunMigrationError(String, QueryError),
26
27 #[error("")]
28 MigrationError(QueryError),
29
30 #[error("Migration history could not be applied: {0}")]
31 ApplyHistoryError(QueryError),
32}
33
34#[derive(FromRow)]
35pub struct AppliedMigration {
36 pub id: String,
37 pub version: String,
38 pub name: String,
39 pub filename: String,
40 pub checksum: String,
41 pub applied_at: Duration,
42 pub success: bool,
43}
44
45#[async_trait]
46pub trait MigrationRunner {
47 async fn run(
48 &self,
49 migrations: Vec<Migration>,
50 ) -> Result<Vec<AppliedMigration>, MigrationExecutionError>;
51}
52
53pub struct ScyllaMigrationRunner {
54 session: Arc<Session>,
55 keyspace: String,
56}
57
58impl<'a> ScyllaMigrationRunner {
59 pub fn new(session: Arc<Session>, keyspace: &str) -> Self {
60 Self {
61 session,
62 keyspace: keyspace.to_string(),
63 }
64 }
65
66 async fn apply_migration(&self, migration: &Migration) -> Result<(), MigrationExecutionError> {
67 for query in &migration.queries {
68 self.session
69 .query(query.clone(), &[])
70 .await
71 .map_err(|err| RunMigrationError(migration.filename.clone(), err.clone()))?;
72 }
73
74 Ok(())
75 }
76
77 async fn apply_history(
78 &self,
79 success: bool,
80 migration: &Migration,
81 ) -> Result<AppliedMigration, MigrationExecutionError> {
82 let query = format!(
83 "INSERT INTO {keyspace}.{history_table} (id, version, name, filename, checksum, applied_at, success) VALUES (?, ?, ?, ?, ?, ?, ?);",
84 keyspace = self.keyspace,
85 history_table = *HISTORY_TABLE_NAME
86 );
87
88 let nanoid_len = *NANOID_LENGTH;
89 let applied_migration = AppliedMigration {
90 id: nanoid!(nanoid_len).to_string(),
91 version: migration.version.clone(),
92 name: migration.name.clone(),
93 filename: migration.filename.clone(),
94 checksum: self.create_checksum(&migration),
95 applied_at: Duration::nanoseconds(Utc::now().timestamp_nanos()), success,
97 };
98
99 self.session
100 .query(
101 query,
102 (
103 &applied_migration.id,
104 &applied_migration.version,
105 &applied_migration.name,
106 &applied_migration.filename,
107 &applied_migration.checksum,
108 Timestamp(applied_migration.applied_at),
109 &applied_migration.success,
110 ),
111 )
112 .await
113 .map(|_| applied_migration)
114 .map_err(|err| ApplyHistoryError(err.clone()))
115 }
116
117 async fn find_latest_applied_migration(
118 &self,
119 ) -> Result<Option<AppliedMigration>, MigrationExecutionError> {
120 let query = format!(
121 "SELECT id, version, name, filename, checksum, applied_at, success
122 FROM {keyspace}.{history_table}
123 WHERE success = true LIMIT 1;
124 ",
125 keyspace = self.keyspace,
126 history_table = *HISTORY_TABLE_NAME
127 );
128
129 self.session
130 .query(query, &[])
131 .await
132 .map(|query_result| {
133 query_result
134 .maybe_first_row_typed::<AppliedMigration>()
135 .unwrap()
136 })
137 .map_err(|err| MigrationError(err.clone()))
138 }
139
140 fn create_checksum(&self, migration: &Migration) -> String {
141 let checksum = Sha256::new()
142 .chain_update(migration.version.as_bytes())
143 .chain_update(migration.name.as_bytes())
144 .chain_update(migration.content.as_bytes())
145 .finalize();
146
147 format!("{:x}", checksum)
148 }
149
150 async fn create_history_table(&self) -> Result<QueryResult, MigrationExecutionError> {
151 let query = format!(
152 "CREATE TABLE IF NOT EXISTS {keyspace}.{history_table} (
153 id TEXT,
154 version TEXT,
155 name TEXT,
156 filename TEXT,
157 checksum TEXT,
158 success BOOLEAN,
159 applied_at TIMESTAMP,
160 PRIMARY KEY (success, applied_at)
161 ) WITH CLUSTERING ORDER BY (applied_at DESC);
162 ",
163 keyspace = self.keyspace,
164 history_table = *HISTORY_TABLE_NAME
165 );
166
167 self.session
168 .query(query, &[])
169 .await
170 .map_err(|err| CreateHistoryTableError(err))
171 }
172}
173
174#[async_trait]
175impl MigrationRunner for ScyllaMigrationRunner {
176 async fn run(
177 &self,
178 migrations: Vec<Migration>,
179 ) -> Result<Vec<AppliedMigration>, MigrationExecutionError> {
180 let mut applied_migrations = Vec::new();
181
182 self.create_history_table().await?;
183 let latest_migration = self.find_latest_applied_migration().await?;
184
185 for migration in migrations {
186 if let Some(latest_migration) = &latest_migration {
188 if latest_migration.version >= migration.version {
189 continue;
190 }
191 }
192
193 match self.apply_migration(&migration).await {
194 Ok(_) => {
195 let applied_migration = self.apply_history(true, &migration).await?;
196 applied_migrations.push(applied_migration);
197 println!("Applied migration.rs {}", migration.filename)
198 }
199 Err(err) => {
200 self.apply_history(false, &migration).await?;
201 println!("Failed to apply migration.rs {}", migration.filename);
202 return Err(err);
203 }
204 };
205 }
206
207 Ok(applied_migrations)
208 }
209}