1use std::collections::BTreeMap;
2
3use sqlx::postgres::PgConnection;
4
5use crate::MigrationId;
6
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct MigrationRecord {
9 pub id: MigrationId,
10 pub name: String,
11 pub run_at: time::PrimitiveDateTime,
12}
13
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct MigrationLog {
16 pub(crate) log: BTreeMap<MigrationId, MigrationRecord>,
17}
18
19impl MigrationLog {
20 pub async fn new(conn: &mut PgConnection) -> Result<Self, QueryError> {
21 let applied = applied_migrations(conn).await?;
22
23 let index = applied
24 .into_iter()
25 .map(|row| {
26 (
27 MigrationId(row.id),
28 MigrationRecord {
29 id: MigrationId(row.id),
30 name: row.name,
31 run_at: row.run_at,
32 },
33 )
34 })
35 .collect();
36
37 Ok(Self { log: index })
38 }
39
40 pub fn iter(&self) -> impl Iterator<Item = &MigrationRecord> {
41 self.log.values()
42 }
43
44 pub fn last(&self) -> Option<MigrationRecord> {
45 self.iter().cloned().max_by_key(|row| (row.run_at, row.id))
46 }
47}
48
49#[derive(sqlx::FromRow, Debug, Clone, PartialEq, Eq)]
50struct MigrationRow {
51 pub id: i64,
52 pub name: String,
53 pub run_at: time::PrimitiveDateTime,
54}
55
56async fn applied_migrations(conn: &mut PgConnection) -> Result<Vec<MigrationRow>, QueryError> {
57 let query = sqlx::query_as("select * from schema_migrations order by id asc");
58 match query.fetch_all(conn).await {
59 Ok(res) => Ok(res),
60 Err(err) => {
61 if let sqlx::Error::Database(ref db_err) = err {
62 if let Some(code) = db_err.code() {
63 if code == "42P01" {
65 return Ok(Vec::new());
68 }
69 }
70 }
71 Err(QueryError(err))
72 }
73 }
74}
75
76#[derive(thiserror::Error, Debug)]
77#[error("failed to query applied migrations: {0}")]
78pub struct QueryError(sqlx::Error);
79
80#[cfg(test)]
81mod tests {
82 use sqlx::Executor;
83
84 use crate::testing::*;
85 use crate::MigrationIndex;
86
87 use super::*;
88
89 #[tokio::test]
90 async fn missing_table() {
91 let env = TestEnv::new().await.unwrap();
92
93 let config = env.config();
94 let mut conn = config.connect().await.unwrap();
95
96 conn.execute("drop table if exists schema_migrations")
97 .await
98 .unwrap();
99
100 let log = MigrationLog::new(&mut conn).await.unwrap();
101 assert!(log.log.is_empty(), "{:?}", log);
102 }
103
104 #[tokio::test]
105 async fn last_applied_uninit() {
106 let env = TestEnv::new().await.unwrap();
107
108 let config = env.config();
109 let mut conn = config.connect().await.unwrap();
110
111 let last = MigrationLog::new(&mut conn).await.unwrap().last();
112 assert_eq!(None, last);
113 }
114
115 #[tokio::test]
116 async fn last_applied_init() {
117 let env = TestEnv::initialized().await.unwrap();
118
119 let config = env.config();
120 let mut conn = config.connect().await.unwrap();
121
122 let last = MigrationLog::new(&mut conn).await.unwrap().last();
123 assert!(last.is_some());
124
125 let last = last.unwrap();
126 assert_eq!(MigrationId(0), last.id);
127 assert_eq!("init", &last.name);
128 }
129
130 #[tokio::test]
131 async fn last_applied_out_of_order() {
132 let env = TestEnv::initialized().await.unwrap();
133 let config = env.config();
134
135 let mut index = MigrationIndex::new(&config.migrations_dir).unwrap();
136
137 let one = index.create(fake_migration(1, "one")).unwrap();
138 let two = index.create(fake_migration(2, "two")).unwrap();
139 let _ = index.create(fake_migration(3, "three")).unwrap();
140
141 let mut conn = config.connect().await.unwrap();
143 two.up(&mut conn).await.unwrap();
144 one.up(&mut conn).await.unwrap();
145
146 let last = MigrationLog::new(&mut conn).await.unwrap().last();
147 assert!(last.is_some());
148
149 let last = last.unwrap();
150 assert_eq!(MigrationId(1), last.id);
151 assert_eq!("one", &last.name);
152 }
153}