disintegrate_postgres/
migrator.rs1use disintegrate::{DomainIdInfo, Event};
10use disintegrate_serde::Serde;
11
12use crate::PgEventStore;
13
14#[derive(thiserror::Error, Debug)]
16#[error(transparent)]
17pub struct Error(#[from] sqlx::Error);
18
19pub struct Migrator<E, S>
28where
29 E: Event + Clone,
30 S: Serde<E> + Send + Sync,
31{
32 event_store: PgEventStore<E, S>,
33}
34
35impl<E, S> Migrator<E, S>
36where
37 E: Event + Clone,
38 S: Serde<E> + Send + Sync,
39{
40 pub fn new(event_store: PgEventStore<E, S>) -> Self {
41 Self { event_store }
42 }
43
44 pub async fn init_event_store(&self) -> Result<(), Error> {
46 const RESERVED_NAMES: &[&str] = &["event_id", "payload", "event_type", "inserted_at"];
47
48 sqlx::query(include_str!("event_store/sql/seq_event_event_id.sql"))
49 .execute(&self.event_store.pool)
50 .await?;
51 sqlx::query(include_str!("event_store/sql/table_event.sql"))
52 .execute(&self.event_store.pool)
53 .await?;
54 sqlx::query(include_str!("event_store/sql/idx_event_type.sql"))
55 .execute(&self.event_store.pool)
56 .await?;
57 sqlx::query(include_str!(
58 "event_store/sql/fn_event_store_current_epoch.sql"
59 ))
60 .execute(&self.event_store.pool)
61 .await?;
62 sqlx::query(include_str!(
63 "event_store/sql/fn_event_store_begin_epoch.sql"
64 ))
65 .execute(&self.event_store.pool)
66 .await?;
67 for domain_id in E::SCHEMA.domain_ids {
68 if RESERVED_NAMES.contains(&domain_id.ident) {
69 panic!(
70 "Domain id name {domain_id} is reserved. Please use a different name.",
71 domain_id = domain_id.ident
72 );
73 }
74 self.add_domain_id_column("event", domain_id).await?;
75 }
76 Ok(())
77 }
78
79 pub async fn init_listener(&self) -> Result<(), Error> {
81 sqlx::query(include_str!("listener/sql/table_event_listener.sql"))
82 .execute(&self.event_store.pool)
83 .await?;
84 sqlx::query(include_str!("listener/sql/fn_notify_event_listener.sql"))
85 .execute(&self.event_store.pool)
86 .await?;
87 sqlx::query(include_str!(
88 "listener/sql/trigger_notify_event_listener.sql"
89 ))
90 .execute(&self.event_store.pool)
91 .await?;
92 Ok(())
93 }
94
95 pub async fn migrate_v2_1_0_to_v3_0_0(&self) -> Result<(), Error> {
100 self.migrate_hash_index_to_btree("idx_event_sequence_type", "event_sequence", "event_type")
101 .await?;
102
103 self.migrate_hash_index_to_btree("idx_events_type", "event", "event_type")
104 .await?;
105
106 for domain_id in E::SCHEMA.domain_ids {
107 let column_name = domain_id.ident;
108
109 self.migrate_hash_index_to_btree(
110 &format!("idx_event_{column_name}"),
111 "event",
112 &column_name,
113 )
114 .await?;
115 self.migrate_hash_index_to_btree(
116 &format!("idx_event_sequence_{column_name}"),
117 "event_sequence",
118 &column_name,
119 )
120 .await?;
121 }
122 Ok(())
123 }
124
125 pub async fn migrate_v3_x_x_to_v4_0_0(&self) -> Result<(), Error> {
127 sqlx::query(
128 "SELECT setval('seq_event_event_id', COALESCE((SELECT MAX(event_id) FROM event), 0))",
129 )
130 .execute(&self.event_store.pool)
131 .await?;
132 sqlx::query(
133 "ALTER TABLE event ALTER COLUMN event_id SET DEFAULT nextval('seq_event_event_id')",
134 )
135 .execute(&self.event_store.pool)
136 .await?;
137 Ok(())
138 }
139
140 async fn add_domain_id_column(
141 &self,
142 table: &str,
143 domain_id: &DomainIdInfo,
144 ) -> Result<(), Error> {
145 let column_name = domain_id.ident;
146 let sql_type = match domain_id.type_info {
147 disintegrate::IdentifierType::String => "TEXT",
148 disintegrate::IdentifierType::i64 => "BIGINT",
149 disintegrate::IdentifierType::Uuid => "UUID",
150 };
151 sqlx::query(&format!(
152 "ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {column_name} {sql_type}"
153 ))
154 .execute(&self.event_store.pool)
155 .await?;
156
157 sqlx::query(&format!(
158 "CREATE INDEX IF NOT EXISTS idx_{table}_{column_name} ON {table} ({column_name}) WHERE {column_name} IS NOT NULL"
159 ))
160 .execute(&self.event_store.pool)
161 .await?;
162 Ok(())
163 }
164
165 async fn migrate_hash_index_to_btree(
166 &self,
167 index_name: &str,
168 table: &str,
169 column: &str,
170 ) -> Result<(), Error> {
171 let index_type: Option<String> = sqlx::query_scalar(&format!("SELECT am.amname AS index_type FROM pg_class c JOIN pg_am am ON am.oid = c.relam WHERE c.relname = '{index_name}'"))
172 .fetch_optional(&self.event_store.pool)
173 .await?;
174
175 if index_type.is_some_and(|ty| ty == "hash") {
176 sqlx::query(&format!(
177 "CREATE INDEX IF NOT EXISTS {index_name}_tmp ON {table} ({column})"
178 ))
179 .execute(&self.event_store.pool)
180 .await?;
181
182 sqlx::query(&format!("DROP INDEX CONCURRENTLY {index_name}"))
183 .execute(&self.event_store.pool)
184 .await?;
185
186 sqlx::query(&format!(
187 "ALTER INDEX {index_name}_tmp RENAME TO {index_name}"
188 ))
189 .execute(&self.event_store.pool)
190 .await?;
191 }
192
193 Ok(())
194 }
195}