Skip to main content

disintegrate_postgres/
migrator.rs

1//! Database initialization and migration utilities for the `PgEventStore`.
2//!
3//! This module provides helpers to:
4//! - initialize the PostgreSQL schema for a fresh deployment
5//! - apply schema migrations when upgrading versions
6//!
7//! The migrator is typically executed during application startup or via
8//! dedicated administrative tooling.
9use disintegrate::{DomainIdInfo, Event};
10use disintegrate_serde::Serde;
11
12use crate::PgEventStore;
13
14/// PostgreSQL Migrator error.
15#[derive(thiserror::Error, Debug)]
16#[error(transparent)]
17pub struct Error(#[from] sqlx::Error);
18
19/// Helper for initializing and migrating the `PgEventStore` database schema.
20///
21/// `Migrator` encapsulates the logic required to:
22/// - initialize the database for a fresh deployment
23/// - apply schema migrations when upgrading between versions
24///
25/// It is intended to be used during application startup or in dedicated
26/// maintenance / administration workflows.
27pub struct Migrator<E, S>
28where
29    E: Event + Clone,
30    S: Serde<E> + Send + Sync,
31{
32    event_store: PgEventStore<E, S>,
33}
34
35impl<E, S> Migrator<E, S>
36where
37    E: Event + Clone,
38    S: Serde<E> + Send + Sync,
39{
40    pub fn new(event_store: PgEventStore<E, S>) -> Self {
41        Self { event_store }
42    }
43
44    /// Init `PgEventStore` database
45    pub async fn init_event_store(&self) -> Result<(), Error> {
46        const RESERVED_NAMES: &[&str] = &["event_id", "payload", "event_type", "inserted_at"];
47
48        sqlx::query(include_str!("event_store/sql/seq_event_event_id.sql"))
49            .execute(&self.event_store.pool)
50            .await?;
51        sqlx::query(include_str!("event_store/sql/table_event.sql"))
52            .execute(&self.event_store.pool)
53            .await?;
54        sqlx::query(include_str!("event_store/sql/idx_event_type.sql"))
55            .execute(&self.event_store.pool)
56            .await?;
57        for domain_id in E::SCHEMA.domain_ids {
58            if RESERVED_NAMES.contains(&domain_id.ident) {
59                panic!(
60                    "Domain id name {domain_id} is reserved. Please use a different name.",
61                    domain_id = domain_id.ident
62                );
63            }
64            self.add_domain_id_column("event", domain_id).await?;
65        }
66        Ok(())
67    }
68
69    /// Init `PgEventListener` database
70    pub async fn init_listener(&self) -> Result<(), Error> {
71        sqlx::query(include_str!("listener/sql/table_event_listener.sql"))
72            .execute(&self.event_store.pool)
73            .await?;
74        sqlx::query(include_str!("listener/sql/fn_notify_event_listener.sql"))
75            .execute(&self.event_store.pool)
76            .await?;
77        sqlx::query(include_str!(
78            "listener/sql/trigger_notify_event_listener.sql"
79        ))
80        .execute(&self.event_store.pool)
81        .await?;
82        Ok(())
83    }
84
85    /// Migrate the database from version 2.1.0 to version 3.0.0
86    ///
87    /// Backward compatible migration replacing `HASH` indexes with `BTREE` indexes
88    /// to improve performance.
89    pub async fn migrate_v2_1_0_to_v3_0_0(&self) -> Result<(), Error> {
90        self.migrate_hash_index_to_btree("idx_event_sequence_type", "event_sequence", "event_type")
91            .await?;
92
93        self.migrate_hash_index_to_btree("idx_events_type", "event", "event_type")
94            .await?;
95
96        for domain_id in E::SCHEMA.domain_ids {
97            let column_name = domain_id.ident;
98
99            self.migrate_hash_index_to_btree(
100                &format!("idx_event_{column_name}"),
101                "event",
102                &column_name,
103            )
104            .await?;
105            self.migrate_hash_index_to_btree(
106                &format!("idx_event_sequence_{column_name}"),
107                "event_sequence",
108                &column_name,
109            )
110            .await?;
111        }
112        Ok(())
113    }
114
115    /// Migrate the database from version 3.x.x to version 4.0.0
116    pub async fn migrate_v3_x_x_to_v4_0_0(&self) -> Result<(), Error> {
117        sqlx::query(
118            "SELECT setval('seq_event_event_id', COALESCE((SELECT MAX(event_id) FROM event), 0))",
119        )
120        .execute(&self.event_store.pool)
121        .await?;
122        sqlx::query(
123            "ALTER TABLE event ALTER COLUMN event_id SET DEFAULT nextval('seq_event_event_id')",
124        )
125        .execute(&self.event_store.pool)
126        .await?;
127        Ok(())
128    }
129
130    async fn add_domain_id_column(
131        &self,
132        table: &str,
133        domain_id: &DomainIdInfo,
134    ) -> Result<(), Error> {
135        let column_name = domain_id.ident;
136        let sql_type = match domain_id.type_info {
137            disintegrate::IdentifierType::String => "TEXT",
138            disintegrate::IdentifierType::i64 => "BIGINT",
139            disintegrate::IdentifierType::Uuid => "UUID",
140        };
141        sqlx::query(&format!(
142            "ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {column_name} {sql_type}"
143        ))
144        .execute(&self.event_store.pool)
145        .await?;
146
147        sqlx::query(&format!(
148            "CREATE INDEX IF NOT EXISTS idx_{table}_{column_name} ON {table} ({column_name}) WHERE {column_name} IS NOT NULL"
149        ))
150        .execute(&self.event_store.pool)
151        .await?;
152        Ok(())
153    }
154
155    async fn migrate_hash_index_to_btree(
156        &self,
157        index_name: &str,
158        table: &str,
159        column: &str,
160    ) -> Result<(), Error> {
161        let index_type: Option<String> = sqlx::query_scalar(&format!("SELECT am.amname AS index_type FROM pg_class c JOIN pg_am am ON am.oid = c.relam WHERE c.relname = '{index_name}'"))
162        .fetch_optional(&self.event_store.pool)
163        .await?;
164
165        if index_type.is_some_and(|ty| ty == "hash") {
166            sqlx::query(&format!(
167                "CREATE INDEX IF NOT EXISTS {index_name}_tmp ON {table} ({column})"
168            ))
169            .execute(&self.event_store.pool)
170            .await?;
171
172            sqlx::query(&format!("DROP INDEX CONCURRENTLY {index_name}"))
173                .execute(&self.event_store.pool)
174                .await?;
175
176            sqlx::query(&format!(
177                "ALTER INDEX {index_name}_tmp RENAME TO  {index_name}"
178            ))
179            .execute(&self.event_store.pool)
180            .await?;
181        }
182
183        Ok(())
184    }
185}