pg_tviews 0.1.0-beta.12

Transactional materialized views with incremental refresh for PostgreSQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
use crate::catalog::entity_for_table;
use crate::queue::{enqueue_refresh, enqueue_refresh_bulk, enqueue_refresh_dedup};
use crate::utils::{IntExtraction, quote_identifier, tuple_get_i64};
use pgrx::prelude::*;
/// Trigger Handler: Change Detection and Queue Management
///
/// This module implements `PostgreSQL` triggers for TVIEW change tracking:
/// - **Row-level Triggers**: Detects INSERT/UPDATE/DELETE on base tables
/// - **Primary Key Extraction**: Identifies changed rows for selective refresh
/// - **Queue Enqueueing**: Adds refresh requests to transaction queue
/// - **Bulk Operations**: Handles multi-row changes efficiently
///
/// ## Trigger Lifecycle
///
/// 1. `PostgreSQL` calls trigger for each changed row
/// 2. Extract primary key of changed row
/// 3. Map table OID to entity name
/// 4. Enqueue `(entity, pk)` pair for refresh
/// 5. Transaction commit processes the queue
///
/// ## Performance Considerations
///
/// - Triggers run in critical path - must be fast
/// - Bulk enqueueing for multi-row operations
/// - Minimal database queries during trigger execution
/// - Queue processing deferred to commit time
use pgrx::spi;

/// Result of attempting to extract a DISTINCT ON key from a tuple
enum KeyExtraction {
    /// Successfully extracted and converted to String
    Value(String),
    /// Column exists but value is NULL
    Null,
    /// All typed extraction attempts failed (unsupported column type)
    TypeMismatch,
}

/// Extract DISTINCT ON key value from tuple, trying multiple types
/// Returns the extraction result: Value on success, Null if column is NULL, TypeMismatch if unsupported type
fn extract_distinct_on_key<'a>(
    tuple: &PgHeapTuple<'a, AllocatedByPostgres>,
    key_col: &str,
) -> KeyExtraction {
    // Try String (TEXT, UUID, VARCHAR)
    match tuple.get_by_name::<String>(key_col) {
        Ok(Some(val)) => return KeyExtraction::Value(val),
        Ok(None) => return KeyExtraction::Null,
        Err(_) => {} // type mismatch, try next
    }
    // Try i64 (BIGINT)
    match tuple.get_by_name::<i64>(key_col) {
        Ok(Some(val)) => return KeyExtraction::Value(val.to_string()),
        Ok(None) => return KeyExtraction::Null,
        Err(_) => {}
    }
    // Try i32 (INTEGER)
    match tuple.get_by_name::<i32>(key_col) {
        Ok(Some(val)) => return KeyExtraction::Value(val.to_string()),
        Ok(None) => return KeyExtraction::Null,
        Err(_) => {}
    }
    KeyExtraction::TypeMismatch
}

/// Trigger handler function for TVIEW cascades
/// This is called by triggers installed on base tables when rows change
#[pg_trigger]
#[allow(clippy::unnecessary_wraps)] // Reason: pgrx #[pg_trigger] requires Result return type
fn pg_tview_trigger_handler<'a>(
    trigger: &'a PgTrigger<'a>,
) -> Result<Option<PgHeapTuple<'a, AllocatedByPostgres>>, spi::Error> {
    // Extract table OID
    let table_oid = match trigger.relation() {
        Ok(rel) => rel.oid(),
        Err(e) => {
            warning!("Failed to get trigger relation: {:?}", e);
            return Ok(None);
        }
    };

    // If triggers are suspended, record the change instead of enqueuing
    if crate::config::suspend_triggers() {
        // Record direct entity if any
        if let Ok(Some(entity_info)) = crate::queue::cache::table_cache::entity_info_cached(table_oid) {
            crate::suspend::record_change(&entity_info.name);
        }

        // Record entities from cascade paths (indirect dependencies)
        let paths: Vec<crate::cascade_path::CascadePath> =
            match crate::queue::cache::cascade_cache::cascade_paths_for_table(table_oid) {
                Ok(p) => p,
                Err(e) => {
                    warning!("Failed to load cascade paths for suspended trigger: {:?}", e);
                    vec![]
                }
            };
        for path in paths {
            crate::suspend::record_change(&path.entity_name);
        }

        return Ok(None);
    }

    // 1. Direct entity: this table IS a TVIEW source (e.g. tb_user → entity "user")
    match crate::queue::cache::table_cache::entity_info_cached(table_oid) {
        Ok(Some(entity_info)) => {
            let entity = &entity_info.name;
            // Check if this is a DISTINCT ON TVIEW using cached distinct_on_key
            if let Some(key_col) = &entity_info.distinct_on_key {
                // DISTINCT ON TVIEW: enqueue dedup key value instead of base PK
                let tuple = match trigger.new().or_else(|| trigger.old()) {
                    Some(t) => t,
                    None => {
                        warning!("No tuple in trigger context for DISTINCT ON TVIEW '{entity}'");
                        return Ok(None);
                    }
                };
                match extract_distinct_on_key(&tuple, key_col) {
                    KeyExtraction::Value(key_val) => {
                        enqueue_refresh_dedup(entity, &key_val);
                    }
                    KeyExtraction::Null => {
                        warning!(
                            "DISTINCT ON key '{key_col}' is NULL for entity '{entity}' — skipping refresh"
                        );
                    }
                    KeyExtraction::TypeMismatch => {
                        warning!(
                            "Cannot extract DISTINCT ON key '{key_col}' for '{entity}': \
                             unsupported column type — skipping refresh for this row"
                        );
                    }
                }
            } else {
                // Standard PK-based TVIEW: extract pk_<entity>
                let pk_value = match crate::utils::extract_pk(trigger) {
                    Ok(pk) => pk,
                    Err(e) => {
                        warning!("Failed to extract primary key from trigger: {:?}", e);
                        return Ok(None);
                    }
                };
                enqueue_refresh(entity, pk_value);
            }
            return Ok(None);
        }
        Ok(None) => { /* fall through to indirect lookup */ }
        Err(e) => {
            warning!(
                "Failed to resolve entity for table OID {:?}: {:?}",
                table_oid,
                e
            );
            return Ok(None);
        }
    }

    // 2. Indirect: this table is a dependency of one or more TVIEWs
    //    Follow cascade paths to determine which TVIEW rows need refreshing
    enqueue_cascade_parents(trigger, table_oid);

    Ok(None)
}

/// Follow cascade paths from a base table change to enqueue parent TVIEW refreshes.
///
/// When a base table (e.g. `tb_item`) changes, loads cascade paths from the
/// transaction-scoped cache and follows each path hop-by-hop via SPI to
/// discover which TVIEW entity rows need refreshing.
fn enqueue_cascade_parents(trigger: &PgTrigger, table_oid: pg_sys::Oid) {
    let paths: Vec<crate::cascade_path::CascadePath> =
        match crate::queue::cache::cascade_cache::cascade_paths_for_table(table_oid) {
            Ok(p) => p,
            Err(e) => {
                warning!(
                    "Failed to load cascade paths for table {:?}: {:?}",
                    table_oid,
                    e
                );
                return;
            }
        };

    if paths.is_empty() {
        return;
    }

    let Some(tuple) = trigger.new().or_else(|| trigger.old()) else {
        warning!("No tuple available in trigger context");
        return;
    };

    for path in &paths {
        if let Err(e) = follow_cascade_path(path, &tuple) {
            warning!(
                "Cascade refresh failed for path {} → {}: {:?}",
                path.source_table,
                path.entity_name,
                e
            );
        }
    }
}

/// Follow a single cascade path to enqueue refresh(es) for the target entity.
fn follow_cascade_path(
    path: &crate::cascade_path::CascadePath,
    tuple: &PgHeapTuple<AllocatedByPostgres>,
) -> crate::TViewResult<()> {
    if path.unresolvable {
        // Full refresh fallback — enqueue with pk=0 sentinel
        // (the flush engine will treat this as "refresh all rows")
        warning!(
            "Unresolvable cascade path for entity '{}' — full refresh needed",
            path.entity_name
        );
        return Ok(());
    }

    // Step 1: Read initial_col from the changed tuple
    let mut current_ids = match tuple_get_i64(tuple, &path.initial_col) {
        IntExtraction::Value(pk) => vec![pk],
        IntExtraction::Null => return Ok(()), // FK is NULL, cascade stops
        IntExtraction::Missing => {
            warning!(
                "Initial column '{}' not found on tuple for cascade to '{}'",
                path.initial_col,
                path.entity_name
            );
            return Ok(());
        }
    };

    // Step 2: Follow each intermediate hop via SPI
    for hop in &path.hops {
        if current_ids.is_empty() {
            return Ok(());
        }
        current_ids = crate::queue::spi_batch_lookup(
            hop.table_oid,
            &hop.lookup_col,
            &hop.carry_col,
            &current_ids,
        )?;
    }

    // Step 3: Enqueue refresh for each resulting PK
    for pk in current_ids {
        enqueue_refresh(&path.entity_name, pk);
    }

    Ok(())
}

/// Statement-level AFTER trigger that flushes the refresh queue.
///
/// This fires once per statement (not per row) and processes all queued
/// refresh requests. It ensures auto-commit (implicit) transactions get
/// their TVIEWs refreshed, since the `ProcessUtility` hook only intercepts
/// explicit COMMIT statements.
///
/// For explicit transactions (BEGIN...COMMIT), both this trigger and the
/// `ProcessUtility` hook may run. The flush is idempotent — the second call
/// finds an empty queue and returns immediately.
///
/// If triggers are suspended, this trigger skips the flush.
#[pg_trigger]
#[allow(clippy::unnecessary_wraps)] // Reason: pgrx #[pg_trigger] requires Result return type
fn pg_tview_flush_trigger<'a>(
    _trigger: &'a PgTrigger<'a>,
) -> Result<Option<PgHeapTuple<'a, AllocatedByPostgres>>, spi::Error> {
    // Skip flush if triggers are suspended
    if crate::config::suspend_triggers() {
        return Ok(None);
    }

    if let Err(e) = crate::queue::flush_refresh_queue() {
        warning!("TVIEW refresh failed in statement trigger: {:?}", e);
    }
    if let Err(e) = crate::audit::flush_audit_buffer() {
        warning!("Audit flush failed in statement trigger: {:?}", e);
    }
    Ok(None)
}

/// Statement-level trigger handler for bulk operations
/// This is called once per statement instead of once per row
#[pg_trigger]
#[allow(clippy::unnecessary_wraps)] // Reason: pgrx #[pg_trigger] requires Result return type
fn pg_tview_stmt_trigger_handler<'a>(
    trigger: &'a PgTrigger<'a>,
) -> Result<Option<PgHeapTuple<'a, AllocatedByPostgres>>, spi::Error> {
    // Extract table OID
    let table_oid = match trigger.relation() {
        Ok(rel) => rel.oid(),
        Err(e) => {
            warning!("Failed to get trigger relation: {:?}", e);
            return Ok(None);
        }
    };

    // Map table OID → entity name
    let entity = match entity_for_table(table_oid) {
        Ok(Some(e)) => e,
        Ok(None) => {
            // Table not in pg_tview_meta, skip
            return Ok(None);
        }
        Err(e) => {
            warning!(
                "Failed to resolve entity for table OID {:?}: {:?}",
                table_oid,
                e
            );
            return Ok(None);
        }
    };

    // Extract all changed PKs from transition table
    let changed_pks = match extract_pks_from_transition_table(trigger) {
        Ok(pks) => pks,
        Err(e) => {
            warning!("Failed to extract PKs from transition table: {:?}", e);
            return Ok(None);
        }
    };

    if changed_pks.is_empty() {
        // No rows changed, nothing to do
        return Ok(None);
    }

    // Bulk enqueue all changed PKs
    enqueue_refresh_bulk(&entity, changed_pks);

    Ok(None)
}

/// Extract primary keys from `PostgreSQL` transition tables
/// Transition tables are special references visible only in trigger context
fn extract_pks_from_transition_table(trigger: &PgTrigger) -> spi::Result<Vec<i64>> {
    // Determine which transition table to use based on operation type
    // Check which transition table is available (INSERT has NEW, DELETE has OLD, UPDATE has both)
    let transition_table_name = if trigger.new().is_some() && trigger.old().is_none() {
        "new_table" // INSERT
    } else if trigger.new().is_none() && trigger.old().is_some() {
        "old_table" // DELETE
    } else if trigger.new().is_some() && trigger.old().is_some() {
        "new_table" // UPDATE (use NEW for consistency)
    } else {
        return Ok(Vec::new()); // Unsupported event
    };

    // Get PK column name (convention: pk_<entity>)
    let pk_column = get_pk_column_name(
        trigger
            .relation()
            .map_err(|_| crate::TViewError::SpiError {
                query: "get relation".to_string(),
                error: "Failed to get trigger relation".to_string(),
            })?
            .oid(),
    )?;

    // Query transition table for all PKs
    // IMPORTANT: Transition table references don't need quote_ident()
    // They are special PostgreSQL identifiers visible only in trigger context
    let query = format!(
        "SELECT DISTINCT {} FROM {}",
        quote_identifier(&pk_column),
        transition_table_name // No quoting - it's a special reference
    );

    Spi::connect(|client| {
        let rows = client.select(&query, None, &[])?;
        let mut pks = Vec::new();

        for row in rows {
            if let Some(pk) = row[&pk_column as &str].value::<i64>()? {
                pks.push(pk);
            }
        }

        Ok(pks)
    })
}

/// Get primary key column name for a table
/// Uses convention: `pk_<entity>` where entity is derived from table name `tb_<entity>`
fn get_pk_column_name(table_oid: pg_sys::Oid) -> spi::Result<String> {
    // Get entity name from table OID
    let entity = match entity_for_table(table_oid) {
        Ok(Some(e)) => e,
        Ok(None) => {
            return Err(crate::TViewError::SpiError {
                query: "entity_for_table".to_string(),
                error: "Table not managed by pg_tviews".to_string(),
            }
            .into());
        }
        Err(e) => {
            return Err(crate::TViewError::SpiError {
                query: "entity_for_table".to_string(),
                error: format!("Failed to get entity: {e:?}"),
            }
            .into());
        }
    };

    // Convention: pk_<entity>
    Ok(format!("pk_{entity}"))
}