disintegrate_postgres/
migrator.rs1use disintegrate::{DomainIdentifierInfo, Event};
10use disintegrate_serde::Serde;
11
12use crate::PgEventStore;
13
14#[derive(thiserror::Error, Debug)]
16#[error(transparent)]
17pub struct Error(#[from] sqlx::Error);
18
19pub struct Migrator<E, S>
28where
29 E: Event + Clone,
30 S: Serde<E> + Send + Sync,
31{
32 event_store: PgEventStore<E, S>,
33}
34
35impl<E, S> Migrator<E, S>
36where
37 E: Event + Clone,
38 S: Serde<E> + Send + Sync,
39{
40 pub fn new(event_store: PgEventStore<E, S>) -> Self {
41 Self { event_store }
42 }
43
44 pub async fn init_event_store(&self) -> Result<(), Error> {
46 const RESERVED_NAMES: &[&str] = &[
47 "event_id",
48 "payload",
49 "event_type",
50 "inserted_at",
51 "__epoch_id",
52 ];
53
54 sqlx::query(include_str!("event_store/sql/table_event.sql"))
55 .execute(&self.event_store.pool)
56 .await?;
57 sqlx::query(include_str!("event_store/sql/idx_event_type.sql"))
58 .execute(&self.event_store.pool)
59 .await?;
60 sqlx::query(include_str!("event_store/sql/table_event_sequence.sql"))
61 .execute(&self.event_store.pool)
62 .await?;
63 sqlx::query(include_str!("event_store/sql/idx_event_sequence_type.sql"))
64 .execute(&self.event_store.pool)
65 .await?;
66 sqlx::query(include_str!(
67 "event_store/sql/idx_event_sequence_committed.sql"
68 ))
69 .execute(&self.event_store.pool)
70 .await?;
71 sqlx::query(include_str!(
72 "event_store/sql/fn_event_store_current_epoch.sql"
73 ))
74 .execute(&self.event_store.pool)
75 .await?;
76 sqlx::query(include_str!(
77 "event_store/sql/fn_event_store_begin_epoch.sql"
78 ))
79 .execute(&self.event_store.pool)
80 .await?;
81
82 for domain_identifier in E::SCHEMA.domain_identifiers {
83 if RESERVED_NAMES.contains(&domain_identifier.ident) {
84 panic!("Domain identifier name {domain_identifier} is reserved. Please use a different name.", domain_identifier = domain_identifier.ident);
85 }
86 self.add_domain_identifier_column("event", domain_identifier)
87 .await?;
88 self.add_domain_identifier_column("event_sequence", domain_identifier)
89 .await?;
90 }
91 Ok(())
92 }
93
94 pub async fn init_listener(&self) -> Result<(), Error> {
96 sqlx::query(include_str!("listener/sql/table_event_listener.sql"))
97 .execute(&self.event_store.pool)
98 .await?;
99 sqlx::query(include_str!("listener/sql/fn_notify_event_listener.sql"))
100 .execute(&self.event_store.pool)
101 .await?;
102 sqlx::query(include_str!(
103 "listener/sql/trigger_notify_event_listener.sql"
104 ))
105 .execute(&self.event_store.pool)
106 .await?;
107 Ok(())
108 }
109
110 pub async fn migrate_v2_1_0_to_v3_0_0(&self) -> Result<(), Error> {
115 self.migrate_hash_index_to_btree("idx_event_sequence_type", "event_sequence", "event_type")
116 .await?;
117
118 self.migrate_hash_index_to_btree("idx_events_type", "event", "event_type")
119 .await?;
120
121 for domain_identifier in E::SCHEMA.domain_identifiers {
122 let column_name = domain_identifier.ident;
123
124 self.migrate_hash_index_to_btree(
125 &format!("idx_event_{column_name}"),
126 "event",
127 &column_name,
128 )
129 .await?;
130 self.migrate_hash_index_to_btree(
131 &format!("idx_event_sequence_{column_name}"),
132 "event_sequence",
133 &column_name,
134 )
135 .await?;
136 }
137 Ok(())
138 }
139
140 async fn add_domain_identifier_column(
141 &self,
142 table: &str,
143 domain_identifier: &DomainIdentifierInfo,
144 ) -> Result<(), Error> {
145 let column_name = domain_identifier.ident;
146 let sql_type = match domain_identifier.type_info {
147 disintegrate::IdentifierType::String => "TEXT",
148 disintegrate::IdentifierType::i64 => "BIGINT",
149 disintegrate::IdentifierType::Uuid => "UUID",
150 };
151 sqlx::query(&format!(
152 "ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {column_name} {sql_type}"
153 ))
154 .execute(&self.event_store.pool)
155 .await?;
156
157 sqlx::query(&format!(
158 "CREATE INDEX IF NOT EXISTS idx_{table}_{column_name} ON {table} ({column_name}) WHERE {column_name} IS NOT NULL"
159 ))
160 .execute(&self.event_store.pool)
161 .await?;
162 Ok(())
163 }
164
165 async fn migrate_hash_index_to_btree(
166 &self,
167 index_name: &str,
168 table: &str,
169 column: &str,
170 ) -> Result<(), Error> {
171 let index_type: Option<String> = sqlx::query_scalar(&format!("SELECT am.amname AS index_type FROM pg_class c JOIN pg_am am ON am.oid = c.relam WHERE c.relname = '{index_name}'"))
172 .fetch_optional(&self.event_store.pool)
173 .await?;
174
175 if index_type.is_some_and(|ty| ty == "hash") {
176 sqlx::query(&format!(
177 "CREATE INDEX IF NOT EXISTS {index_name}_tmp ON {table} ({column})"
178 ))
179 .execute(&self.event_store.pool)
180 .await?;
181
182 sqlx::query(&format!("DROP INDEX CONCURRENTLY {index_name}"))
183 .execute(&self.event_store.pool)
184 .await?;
185
186 sqlx::query(&format!(
187 "ALTER INDEX {index_name}_tmp RENAME TO {index_name}"
188 ))
189 .execute(&self.event_store.pool)
190 .await?;
191 }
192
193 Ok(())
194 }
195}