disintegrate_postgres/
migrator.rs

1//! Database initialization and migration utilities for the `PgEventStore`.
2//!
3//! This module provides helpers to:
4//! - initialize the PostgreSQL schema for a fresh deployment
5//! - apply schema migrations when upgrading versions
6//!
7//! The migrator is typically executed during application startup or via
8//! dedicated administrative tooling.
9use disintegrate::{DomainIdentifierInfo, Event};
10use disintegrate_serde::Serde;
11
12use crate::PgEventStore;
13
14/// PostgreSQL Migrator error.
15#[derive(thiserror::Error, Debug)]
16#[error(transparent)]
17pub struct Error(#[from] sqlx::Error);
18
19/// Helper for initializing and migrating the `PgEventStore` database schema.
20///
21/// `Migrator` encapsulates the logic required to:
22/// - initialize the database for a fresh deployment
23/// - apply schema migrations when upgrading between versions
24///
25/// It is intended to be used during application startup or in dedicated
26/// maintenance / administration workflows.
27pub struct Migrator<E, S>
28where
29    E: Event + Clone,
30    S: Serde<E> + Send + Sync,
31{
32    event_store: PgEventStore<E, S>,
33}
34
35impl<E, S> Migrator<E, S>
36where
37    E: Event + Clone,
38    S: Serde<E> + Send + Sync,
39{
40    pub fn new(event_store: PgEventStore<E, S>) -> Self {
41        Self { event_store }
42    }
43
44    /// Init `PgEventStore` database
45    pub async fn init_event_store(&self) -> Result<(), Error> {
46        const RESERVED_NAMES: &[&str] = &[
47            "event_id",
48            "payload",
49            "event_type",
50            "inserted_at",
51            "__epoch_id",
52        ];
53
54        sqlx::query(include_str!("event_store/sql/table_event.sql"))
55            .execute(&self.event_store.pool)
56            .await?;
57        sqlx::query(include_str!("event_store/sql/idx_event_type.sql"))
58            .execute(&self.event_store.pool)
59            .await?;
60        sqlx::query(include_str!("event_store/sql/table_event_sequence.sql"))
61            .execute(&self.event_store.pool)
62            .await?;
63        sqlx::query(include_str!("event_store/sql/idx_event_sequence_type.sql"))
64            .execute(&self.event_store.pool)
65            .await?;
66        sqlx::query(include_str!(
67            "event_store/sql/idx_event_sequence_committed.sql"
68        ))
69        .execute(&self.event_store.pool)
70        .await?;
71        sqlx::query(include_str!(
72            "event_store/sql/fn_event_store_current_epoch.sql"
73        ))
74        .execute(&self.event_store.pool)
75        .await?;
76        sqlx::query(include_str!(
77            "event_store/sql/fn_event_store_begin_epoch.sql"
78        ))
79        .execute(&self.event_store.pool)
80        .await?;
81
82        for domain_identifier in E::SCHEMA.domain_identifiers {
83            if RESERVED_NAMES.contains(&domain_identifier.ident) {
84                panic!("Domain identifier name {domain_identifier} is reserved. Please use a different name.", domain_identifier = domain_identifier.ident);
85            }
86            self.add_domain_identifier_column("event", domain_identifier)
87                .await?;
88            self.add_domain_identifier_column("event_sequence", domain_identifier)
89                .await?;
90        }
91        Ok(())
92    }
93
94    /// Init `PgEventListener` database
95    pub async fn init_listener(&self) -> Result<(), Error> {
96        sqlx::query(include_str!("listener/sql/table_event_listener.sql"))
97            .execute(&self.event_store.pool)
98            .await?;
99        sqlx::query(include_str!("listener/sql/fn_notify_event_listener.sql"))
100            .execute(&self.event_store.pool)
101            .await?;
102        sqlx::query(include_str!(
103            "listener/sql/trigger_notify_event_listener.sql"
104        ))
105        .execute(&self.event_store.pool)
106        .await?;
107        Ok(())
108    }
109
110    /// Migrate the database from version 2.1.0 to version 3.0.0
111    ///
112    /// Backward compatible migration replacing `HASH` indexes with `BTREE` indexes
113    /// to improve performance.
114    pub async fn migrate_v2_1_0_to_v3_0_0(&self) -> Result<(), Error> {
115        self.migrate_hash_index_to_btree("idx_event_sequence_type", "event_sequence", "event_type")
116            .await?;
117
118        self.migrate_hash_index_to_btree("idx_events_type", "event", "event_type")
119            .await?;
120
121        for domain_identifier in E::SCHEMA.domain_identifiers {
122            let column_name = domain_identifier.ident;
123
124            self.migrate_hash_index_to_btree(
125                &format!("idx_event_{column_name}"),
126                "event",
127                &column_name,
128            )
129            .await?;
130            self.migrate_hash_index_to_btree(
131                &format!("idx_event_sequence_{column_name}"),
132                "event_sequence",
133                &column_name,
134            )
135            .await?;
136        }
137        Ok(())
138    }
139
140    async fn add_domain_identifier_column(
141        &self,
142        table: &str,
143        domain_identifier: &DomainIdentifierInfo,
144    ) -> Result<(), Error> {
145        let column_name = domain_identifier.ident;
146        let sql_type = match domain_identifier.type_info {
147            disintegrate::IdentifierType::String => "TEXT",
148            disintegrate::IdentifierType::i64 => "BIGINT",
149            disintegrate::IdentifierType::Uuid => "UUID",
150        };
151        sqlx::query(&format!(
152            "ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {column_name} {sql_type}"
153        ))
154        .execute(&self.event_store.pool)
155        .await?;
156
157        sqlx::query(&format!(
158            "CREATE INDEX IF NOT EXISTS idx_{table}_{column_name} ON {table} ({column_name}) WHERE {column_name} IS NOT NULL"
159        ))
160        .execute(&self.event_store.pool)
161        .await?;
162        Ok(())
163    }
164
165    async fn migrate_hash_index_to_btree(
166        &self,
167        index_name: &str,
168        table: &str,
169        column: &str,
170    ) -> Result<(), Error> {
171        let index_type: Option<String> = sqlx::query_scalar(&format!("SELECT am.amname AS index_type FROM pg_class c JOIN pg_am am ON am.oid = c.relam WHERE c.relname = '{index_name}'"))
172        .fetch_optional(&self.event_store.pool)
173        .await?;
174
175        if index_type.is_some_and(|ty| ty == "hash") {
176            sqlx::query(&format!(
177                "CREATE INDEX IF NOT EXISTS {index_name}_tmp ON {table} ({column})"
178            ))
179            .execute(&self.event_store.pool)
180            .await?;
181
182            sqlx::query(&format!("DROP INDEX CONCURRENTLY {index_name}"))
183                .execute(&self.event_store.pool)
184                .await?;
185
186            sqlx::query(&format!(
187                "ALTER INDEX {index_name}_tmp RENAME TO  {index_name}"
188            ))
189            .execute(&self.event_store.pool)
190            .await?;
191        }
192
193        Ok(())
194    }
195}