1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
use super::{Action, MigrationContext};
use crate::{
    db::{Conn, Transaction},
    migrations::common,
    schema::Schema,
};
use anyhow::{anyhow, Context};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
pub struct AlterColumn {
    pub table: String,
    pub column: String,
    pub up: Option<String>,
    pub down: Option<String>,
    pub changes: ColumnChanges,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ColumnChanges {
    pub name: Option<String>,
    #[serde(rename = "type")]
    pub data_type: Option<String>,
    pub nullable: Option<bool>,
    pub default: Option<String>,
}

#[typetag::serde(name = "alter_column")]
impl Action for AlterColumn {
    fn describe(&self) -> String {
        format!("Altering column \"{}\" on \"{}\"", self.column, self.table)
    }

    fn run(
        &self,
        ctx: &MigrationContext,
        db: &mut dyn Conn,
        schema: &Schema,
    ) -> anyhow::Result<()> {
        // If we are only changing the name of a column, we don't have to do anything at this stage
        // We'll set the new schema to point to the old column. When the migration is completed,
        // we rename the actual column.
        if self.can_short_circuit() {
            return Ok(());
        }

        let table = schema.get_table(db, &self.table)?;

        let column = table
            .columns
            .iter()
            .find(|column| column.name == self.column)
            .ok_or_else(|| anyhow!("no such column {} exists", self.column))?;

        let temporary_column_name = self.temporary_column_name(ctx);
        let temporary_column_type = self.changes.data_type.as_ref().unwrap_or(&column.data_type);

        // Add temporary, nullable column
        let mut temp_column_definition_parts: Vec<&str> =
            vec![&temporary_column_name, &temporary_column_type];

        // Use either new default value or existing one if one exists
        let default_value = self.changes.default.as_ref().or(column.default.as_ref());
        if let Some(default) = default_value {
            temp_column_definition_parts.push("DEFAULT");
            temp_column_definition_parts.push(default);
        }

        let query = format!(
            "
			ALTER TABLE {table}
            ADD COLUMN IF NOT EXISTS {temp_column_definition}
			",
            table = self.table,
            temp_column_definition = temp_column_definition_parts.join(" "),
        );
        println!("Query {}", query);
        db.run(&query).context("failed to add temporary column")?;

        // If up or down wasn't provided, we default to simply moving the value over.
        // This is the correct behaviour for example when only changing the default value.
        let up = self.up.as_ref().unwrap_or(&self.column);
        let down = self.down.as_ref().unwrap_or(&self.column);

        let declarations: Vec<String> = table
            .columns
            .iter()
            .filter(|column| column.name != self.column)
            .map(|column| {
                format!(
                    "{alias} public.{table}.{real_name}%TYPE := NEW.{real_name};",
                    table = table.real_name,
                    alias = column.name,
                    real_name = column.real_name,
                )
            })
            .collect();

        let query = format!(
                "
                CREATE OR REPLACE FUNCTION {up_trigger}()
                RETURNS TRIGGER AS $$
                BEGIN
                    IF reshape.is_old_schema() THEN
                        DECLARE
                            {declarations}
                            {existing_column} public.{table}.{existing_column_real}%TYPE := NEW.{existing_column_real};
                        BEGIN
                            NEW.{temp_column} = {up};
                        END;
                    END IF;
                    RETURN NEW;
                END
                $$ language 'plpgsql';

                DROP TRIGGER IF EXISTS {up_trigger} ON {table};
                CREATE TRIGGER {up_trigger} BEFORE INSERT OR UPDATE ON {table} FOR EACH ROW EXECUTE PROCEDURE {up_trigger}();

                CREATE OR REPLACE FUNCTION {down_trigger}()
                RETURNS TRIGGER AS $$
                BEGIN
                    IF NOT reshape.is_old_schema() THEN
                        DECLARE
                            {declarations}
                            {existing_column} public.{table}.{temp_column}%TYPE := NEW.{temp_column};
                        BEGIN
                            NEW.{existing_column_real} = {down};
                        END;
                    END IF;
                    RETURN NEW;
                END
                $$ language 'plpgsql';

                DROP TRIGGER IF EXISTS {down_trigger} ON {table};
                CREATE TRIGGER {down_trigger} BEFORE INSERT OR UPDATE ON {table} FOR EACH ROW EXECUTE PROCEDURE {down_trigger}();
                ",
                existing_column = &self.column,
                existing_column_real = column.real_name,
                temp_column = self.temporary_column_name(ctx),
                up = up,
                down = down,
                table = self.table,
                up_trigger = self.up_trigger_name(ctx),
                down_trigger = self.down_trigger_name(ctx),
                declarations = declarations.join("\n"),
            );
        db.run(&query)
            .context("failed to create up and down triggers")?;

        // Backfill values in batches by touching the previous column
        common::batch_touch_rows(db, &table.real_name, &column.real_name)
            .context("failed to batch update existing rows")?;

        // Add a temporary NOT NULL constraint if the column shouldn't be nullable.
        // This constraint is set as NOT VALID so it doesn't apply to existing rows and
        // the existing rows don't need to be scanned under an exclusive lock.
        // Thanks to this, we can set the full column as NOT NULL later with minimal locking.
        if !column.nullable {
            let query = format!(
                "
                ALTER TABLE {table}
                ADD CONSTRAINT {constraint_name}
                CHECK ({column} IS NOT NULL) NOT VALID
                ",
                table = self.table,
                constraint_name = self.not_null_constraint_name(ctx),
                column = self.temporary_column_name(ctx),
            );
            db.run(&query)
                .context("failed to add NOT NULL constraint")?;
        }

        Ok(())
    }

    fn complete<'a>(
        &self,
        ctx: &MigrationContext,
        db: &'a mut dyn Conn,
    ) -> anyhow::Result<Option<Transaction<'a>>> {
        if self.can_short_circuit() {
            if let Some(new_name) = &self.changes.name {
                let query = format!(
                    "
			        ALTER TABLE {table}
			        RENAME COLUMN {existing_name} TO {new_name}
			        ",
                    table = self.table,
                    existing_name = self.column,
                    new_name = new_name,
                );
                db.run(&query).context("failed to rename column")?;
            }
            return Ok(None);
        }

        // Update column to be NOT NULL if necessary
        let has_not_null_constraint = !db
            .query_with_params(
                "
                SELECT constraint_name
                FROM information_schema.constraint_column_usage
                WHERE constraint_name = $1
                ",
                &[&self.not_null_constraint_name(ctx)],
            )
            .context("failed to get any NOT NULL constraint")?
            .is_empty();
        if has_not_null_constraint {
            // Validate the temporary constraint (should always be valid).
            // This performs a sequential scan but does not take an exclusive lock.
            let query = format!(
                "
                ALTER TABLE {table}
                VALIDATE CONSTRAINT {constraint_name}
                ",
                table = self.table,
                constraint_name = self.not_null_constraint_name(ctx),
            );
            db.run(&query)
                .context("failed to validate NOT NULL constraint")?;

            // Update the column to be NOT NULL.
            // This requires an exclusive lock but since PG 12 it can check
            // the existing constraint for correctness which makes the lock short-lived.
            // Source: https://dba.stackexchange.com/a/268128
            let query = format!(
                "
                ALTER TABLE {table}
                ALTER COLUMN {column} SET NOT NULL
                ",
                table = self.table,
                column = self.temporary_column_name(ctx),
            );
            db.run(&query).context("failed to set column as NOT NULL")?;

            // Drop the temporary constraint
            let query = format!(
                "
                ALTER TABLE {table}
                DROP CONSTRAINT {constraint_name}
                ",
                table = self.table,
                constraint_name = self.not_null_constraint_name(ctx),
            );
            db.run(&query)
                .context("failed to drop NOT NULL constraint")?;
        }

        // Remove old column
        let query = format!(
            "
            ALTER TABLE {} DROP COLUMN {} CASCADE
			",
            self.table, self.column
        );
        db.run(&query).context("failed to drop old column")?;

        // Rename temporary column
        let column_name = self.changes.name.as_deref().unwrap_or(&self.column);
        let query = format!(
            "
            ALTER TABLE {table} RENAME COLUMN {temp_column} TO {name}
			",
            table = self.table,
            temp_column = self.temporary_column_name(ctx),
            name = column_name,
        );
        db.run(&query)
            .context("failed to rename temporary column")?;

        // Remove triggers and procedures
        let query = format!(
            "
            DROP TRIGGER IF EXISTS {up_trigger} ON {table};
            DROP FUNCTION IF EXISTS {up_trigger};

            DROP TRIGGER IF EXISTS {down_trigger} ON {table};
            DROP FUNCTION IF EXISTS {down_trigger};
            ",
            table = self.table,
            up_trigger = self.up_trigger_name(ctx),
            down_trigger = self.down_trigger_name(ctx),
        );
        db.run(&query)
            .context("failed to drop up and down triggers")?;

        Ok(None)
    }

    fn update_schema(&self, ctx: &MigrationContext, schema: &mut Schema) {
        // If we are only changing the name of a column, we haven't created a temporary column
        // Instead, we rename the schema column but point it to the old column
        if self.can_short_circuit() {
            if let Some(new_name) = &self.changes.name {
                schema.change_table(&self.table, |table_changes| {
                    table_changes.change_column(&self.column, |column_changes| {
                        column_changes.set_name(new_name);
                    });
                });
            }

            return;
        }

        schema.change_table(&self.table, |table_changes| {
            table_changes.change_column(&self.column, |column_changes| {
                column_changes.set_column(&self.temporary_column_name(ctx));
            });
        });
    }

    fn abort(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> {
        // Drop temporary column
        let query = format!(
            "
			ALTER TABLE {table}
            DROP COLUMN IF EXISTS {temp_column};
			",
            table = self.table,
            temp_column = self.temporary_column_name(ctx),
        );
        db.run(&query).context("failed to drop temporary column")?;

        // Remove triggers and procedures
        let query = format!(
            "
            DROP TRIGGER IF EXISTS {up_trigger} ON {table};
            DROP FUNCTION IF EXISTS {up_trigger};

            DROP TRIGGER IF EXISTS {down_trigger} ON {table};
            DROP FUNCTION IF EXISTS {down_trigger};
            ",
            table = self.table,
            up_trigger = self.up_trigger_name(ctx),
            down_trigger = self.down_trigger_name(ctx),
        );
        db.run(&query)
            .context("failed to drop up and down triggers")?;

        Ok(())
    }
}

impl AlterColumn {
    fn temporary_column_name(&self, ctx: &MigrationContext) -> String {
        format!("{}_new_{}", ctx.prefix(), self.column)
    }

    fn up_trigger_name(&self, ctx: &MigrationContext) -> String {
        format!("{}_alter_column_up_trigger", ctx.prefix())
    }

    fn down_trigger_name(&self, ctx: &MigrationContext) -> String {
        format!("{}_alter_column_down_trigger", ctx.prefix_inverse())
    }

    fn not_null_constraint_name(&self, ctx: &MigrationContext) -> String {
        format!("{}_alter_column_temporary", ctx.prefix())
    }

    fn can_short_circuit(&self) -> bool {
        self.changes.name.is_some()
            && self.changes.data_type.is_none()
            && self.changes.nullable.is_none()
            && self.changes.default.is_none()
    }
}