pg_tviews 0.1.0-beta.11

Transactional materialized views with incremental refresh for PostgreSQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
use super::ops::{
    clear_queue, is_crash_recovery_checked, mark_crash_recovery_checked, take_queue_snapshot,
};
use crate::TViewResult;
use pgrx::datum::DatumWithOid;
use pgrx::pg_sys;
use pgrx::prelude::*;
use std::collections::HashSet;
use std::os::raw::c_void;
use std::panic::AssertUnwindSafe;

// Thread-local storage for savepoint support
thread_local! {
    /// Current savepoint depth (0 = no savepoints)
    static SAVEPOINT_DEPTH: std::cell::RefCell<usize> = const { std::cell::RefCell::new(0) };

    /// Queue snapshots for each savepoint level
    static QUEUE_SNAPSHOTS: std::cell::RefCell<Vec<HashSet<super::key::RefreshKey>>> =
        const { std::cell::RefCell::new(Vec::new()) };
}

/// Transaction event types
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum XactEvent {
    Commit,
    Abort,
    PreCommit,
    Prepare, // XACT_EVENT_PREPARE
}

/// Register the transaction callback (called from enqueue logic)
///
/// This uses `PostgreSQL`'s `RegisterXactCallback` FFI to install our handler.
/// The callback will be invoked at transaction commit/abort.
pub unsafe fn register_xact_callback() {
    // SAFETY: Called from PostgreSQL backend context. RegisterXactCallback
    // registers a valid extern "C" callback function pointer.
    unsafe {
        pg_sys::RegisterXactCallback(Some(tview_xact_callback), std::ptr::null_mut());
    }
}

/// Register the subtransaction callback for savepoint support
///
/// This uses `PostgreSQL`'s `RegisterSubXactCallback` FFI to handle savepoints.
/// The callback will be invoked when savepoints are created/released/rolled back.
pub unsafe fn register_subxact_callback() {
    // SAFETY: Called from PostgreSQL backend context. RegisterSubXactCallback
    // registers a valid extern "C" callback function pointer.
    unsafe {
        pg_sys::RegisterSubXactCallback(Some(tview_subxact_callback), std::ptr::null_mut());
    }

    // Initialize SAVEPOINT_DEPTH from current transaction nest level
    // When loaded inside a DO block, subtransactions may already be open
    let nest_level = unsafe { pg_sys::GetCurrentTransactionNestLevel() };
    SAVEPOINT_DEPTH.with(|d| {
        *d.borrow_mut() = (nest_level as usize).saturating_sub(1);
    });

    // Push placeholder queue snapshots for existing subtransactions
    QUEUE_SNAPSHOTS.with(|s| {
        let mut snapshots = s.borrow_mut();
        for _ in 0..(nest_level as usize).saturating_sub(1) {
            snapshots.push(HashSet::new());
        }
    });
}

/// Transaction callback handler (invoked by `PostgreSQL`)
///
/// This is called at transaction events (COMMIT, ABORT, etc.)
///
/// # Safety
/// This is an extern "C-unwind" callback invoked by `PostgreSQL` internals.
///
/// # Error handling
/// Errors from `handle_pre_commit`/`handle_prepare` are reported via pgrx's
/// `error!()` macro, which triggers `ereport(ERROR)` and longjmps out of
/// the callback.  `PostgreSQL` will then abort the transaction.
///
/// We intentionally avoid `catch_unwind` here: SPI operations in the
/// pre-commit handler may trigger `PostgreSQL` longjmps, and intercepting
/// those via `catch_unwind` corrupts `PG_exception_stack`, causing SIGABRT.
#[unsafe(no_mangle)]
unsafe extern "C-unwind" fn tview_xact_callback(event: u32, _arg: *mut c_void) {
    // Map PostgreSQL XactEvent C enum to our Rust enum.
    // Use pg_sys constants to be version-safe.
    #[allow(non_upper_case_globals)] // Reason: pg_sys XactEvent constants use UPPER_CASE naming
    let xact_event = match event {
        pg_sys::XactEvent::XACT_EVENT_COMMIT => XactEvent::Commit,
        pg_sys::XactEvent::XACT_EVENT_PRE_COMMIT => XactEvent::PreCommit,
        pg_sys::XactEvent::XACT_EVENT_ABORT => XactEvent::Abort,
        pg_sys::XactEvent::XACT_EVENT_PREPARE => XactEvent::Prepare,
        _ => return, // Ignore PARALLEL_*, PRE_PREPARE, etc.
    };

    // Handle event.
    //
    // NOTE: SPI is NOT available during transaction callbacks (PRE_COMMIT, COMMIT, ABORT).
    // Executing SPI queries here crashes the server. Queue flush (which uses SPI) is
    // handled by the ProcessUtility hook intercepting COMMIT instead.
    match xact_event {
        XactEvent::PreCommit | XactEvent::Commit => {
            // Queue flush + audit flush happen in ProcessUtility hook before COMMIT.
            // Clear audit buffer as safety net (should already be empty after flush).
            crate::audit::clear_audit_buffer();
            super::ops::clear_crash_recovery_cache();
            crate::metrics::metrics_api::reset_metrics();
        }
        XactEvent::Prepare => {
            // PREPARE TRANSACTION also goes through ProcessUtility hook.
            crate::audit::clear_audit_buffer();
            crate::metrics::metrics_api::reset_metrics();
        }
        XactEvent::Abort => {
            clear_queue();
            super::ops::clear_crash_recovery_cache();
            super::cache::cascade_cache::clear_cache();
            crate::audit::clear_audit_buffer();
            crate::metrics::metrics_api::reset_metrics();
        }
    }
}

/// Subtransaction callback handler (invoked by `PostgreSQL` for savepoints)
///
/// This is called when savepoints are created, released, or rolled back to.
/// We need to maintain queue snapshots to properly handle ROLLBACK TO SAVEPOINT.
///
/// # Safety
/// This is an extern "C-unwind" callback invoked by `PostgreSQL` internals.
/// Must not panic or unwind.
#[unsafe(no_mangle)]
unsafe extern "C-unwind" fn tview_subxact_callback(
    event: u32,
    _subxid: pg_sys::SubTransactionId,
    _parent_subid: pg_sys::SubTransactionId,
    _arg: *mut c_void,
) {
    let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
        match event {
            pg_sys::SubXactEvent::SUBXACT_EVENT_START_SUB => {
                // SAVEPOINT created: increment depth and snapshot current queue
                SAVEPOINT_DEPTH.with(|d| {
                    let mut depth = d.borrow_mut();
                    *depth += 1;
                });

                // Take snapshot of current queue state
                let snapshot = take_queue_snapshot();
                QUEUE_SNAPSHOTS.with(|s| {
                    s.borrow_mut().push(snapshot);
                });
            }
            pg_sys::SubXactEvent::SUBXACT_EVENT_ABORT_SUB => {
                // ROLLBACK TO SAVEPOINT: restore queue to snapshot
                decrement_savepoint_depth();

                // Restore queue from snapshot
                if let Some(snapshot) = QUEUE_SNAPSHOTS.with(|s| s.borrow_mut().pop()) {
                    // Replace current queue with the snapshot
                    super::state::replace_queue(snapshot);
                }
            }
            pg_sys::SubXactEvent::SUBXACT_EVENT_COMMIT_SUB => {
                // RELEASE SAVEPOINT: just decrement depth and discard snapshot
                decrement_savepoint_depth();

                // Discard the snapshot (savepoint committed)
                QUEUE_SNAPSHOTS.with(|s| {
                    s.borrow_mut().pop();
                });
            }
            _ => {
                // Ignore other subtransaction events
            }
        }
    }));

    if result.is_err() {
        // Non-fatal: savepoint tracking is defensive. Use warning instead of error
        // to avoid SIGABRT from panic_any in raw extern "C-unwind" context.
        warning!("PANIC in subtransaction callback - this is a bug!");
    }
}

/// Decrement `SAVEPOINT_DEPTH` with saturating subtraction.
///
/// Emits a warning if the depth is already 0, which indicates unexpected
/// event ordering (e.g., extension loaded mid-transaction).
fn decrement_savepoint_depth() {
    SAVEPOINT_DEPTH.with(|d| {
        let mut depth = d.borrow_mut();
        if *depth == 0 {
            warning!("pg_tviews: subxact depth underflow — event ordering unexpected");
        }
        *depth = depth.saturating_sub(1);
    });
}

/// Flush the refresh queue: process all pending TVIEW refreshes.
///
/// Called by the `ProcessUtility` hook when intercepting COMMIT, **before**
/// the actual commit begins. SPI must be available when this is called.
///
/// **Must NOT be called from transaction callbacks** (`PRE_COMMIT`, `COMMIT`, `ABORT`)
/// because `PostgreSQL` does not allow SPI queries during those callbacks.
///
/// This implementation correctly handles propagation by using a local queue
/// for discovered parent refreshes. The workflow:
///
/// 1. Take initial snapshot from triggers (from triggers)
/// 2. Process in dependency order (children before parents)
/// 3. Discover parent refreshes during processing
/// 4. Add parents to local pending queue
/// 5. Repeat until no more refreshes discovered (fixpoint)
///
/// # Correctness
///
/// - Each (entity, pk) processed exactly once (tracked in `processed` set)
/// - Dependency order respected (topological sort per iteration)
/// - Propagation coalesced (parents discovered during refresh added to queue)
/// - Transaction-safe (fail-fast aborts transaction on first error)
pub fn flush_refresh_queue() -> TViewResult<()> {
    // Take initial snapshot from triggers
    let mut pending = take_queue_snapshot();

    if pending.is_empty() {
        return Ok(());
    }

    // Start timing the entire refresh operation
    let refresh_timer = crate::metrics::metrics_api::record_refresh_start();

    // Load dependency graph once (cached)
    let graph = super::cache::graph_cache::load_cached()?;

    // Track processed keys to avoid duplicates
    // Pre-allocate with capacity based on initial pending size
    let mut processed: std::collections::HashSet<super::key::RefreshKey> =
        std::collections::HashSet::with_capacity(pending.len().max(16));

    // Outer drain loop: after the inner loop empties `pending`, check for
    // late-enqueued items from triggers that fired during refresh (e.g.,
    // pg_treekey cascading child rows in tb_location).  The `processed` set
    // carries across drain passes so already-refreshed keys are not repeated.
    let mut iteration = 1;
    loop {
        // Inner loop: process pending until empty (propagation via parents)
        while !pending.is_empty() {
            // Sort this batch by dependency order
            let sorted_keys = graph.sort_keys(pending.drain().collect());

            // Group keys by entity for bulk refresh
            // Pre-allocate with estimated entity count (typically 3-10 entities)
            let mut keys_by_entity: std::collections::HashMap<String, Vec<super::key::RefreshKey>> =
                std::collections::HashMap::with_capacity(8);

            for key in sorted_keys {
                // Skip if already processed (deduplication)
                if !processed.insert(key.clone()) {
                    continue;
                }
                keys_by_entity
                    .entry(key.entity.clone())
                    .or_default()
                    .push(key);
            }

            // Process each entity group
            for (entity, entity_keys) in keys_by_entity {
                // Check for post-crash truncation and auto-refresh if needed
                if !is_crash_recovery_checked(&entity) {
                    mark_crash_recovery_checked(&entity);
                    if crate::lifecycle::detect_post_crash_truncation(&entity)? {
                        // TVIEW is empty but backing view has data - perform full refresh first
                        Spi::run_with_args(
                            "SELECT pg_tviews_refresh($1)",
                            &[unsafe {
                                DatumWithOid::new(
                                    &entity,
                                    PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value(),
                                )
                            }],
                        )?;
                    }
                }

                if entity_keys.len() == 1 {
                    // Single key: use existing individual refresh
                    let key = &entity_keys[0];
                    let parents = refresh_and_get_parents(key, &graph)?;

                    // Add discovered parents to pending queue
                    for parent_key in parents {
                        if !processed.contains(&parent_key) {
                            pending.insert(parent_key);
                        }
                    }
                } else {
                    // Multiple keys for same entity: use bulk refresh (PK-only path)
                    // Pre-allocate Vec based on PK-only keys (exclude dedup keys)
                    let mut pks =
                        Vec::with_capacity(entity_keys.iter().filter(|k| !k.is_dedup()).count());
                    for key in &entity_keys {
                        if !key.is_dedup() {
                            pks.push(key.pk);
                        }
                    }

                    // Bulk refresh this entity
                    // FAIL-FAST: Propagate error immediately to abort transaction
                    crate::refresh::refresh_bulk(&entity, &pks)?;

                    // Discover parents for all keys in this entity group (P-07: batched)
                    // Instead of N × M separate queries, issue M batched queries (one per parent entity)
                    let parent_map = crate::propagate::find_parents_batch(&entity_keys, &graph)?;

                    // Add discovered parents to pending queue
                    for parent_keys in parent_map.values() {
                        for parent_key in parent_keys {
                            if !processed.contains(parent_key) {
                                pending.insert(parent_key.clone());
                            }
                        }
                    }
                }
            }

            iteration += 1;

            // Safety check: prevent infinite loops
            let max_depth = crate::config::max_propagation_depth();
            if iteration > max_depth {
                return Err(crate::TViewError::PropagationDepthExceeded {
                    max_depth,
                    processed: processed.len(),
                });
            }
        }

        // Drain any items enqueued by triggers that fired during refresh
        let late = take_queue_snapshot();
        if late.is_empty() {
            break;
        }
        pending = late;
    }

    // Buffer batched audit entries: one per entity with aggregated row count.
    // Actual INSERT happens in flush_audit_buffer() called from the COMMIT hook.
    {
        let mut entity_counts: std::collections::HashMap<&str, i64> =
            std::collections::HashMap::new();
        for key in &processed {
            *entity_counts.entry(&key.entity).or_insert(0) += 1;
        }
        for (entity, count) in entity_counts {
            crate::audit::log_refresh(entity, count);
        }
    }

    // Record metrics
    crate::metrics::metrics_api::record_refresh_complete(
        processed.len(),
        iteration - 1,
        &refresh_timer,
    );

    Ok(())
}

/// Refresh a single entity+pk and return discovered parent keys (without refreshing them)
///
/// This function:
/// 1. Refreshes the given (entity, pk) using existing refresh logic
/// 2. Discovers parent entities that depend on this one
/// 3. Returns parent keys WITHOUT refreshing them (defer to queue)
///
/// # Design Note
///
/// This function returns parent keys for queue processing instead of
/// calling `refresh_pk()` recursively.
fn refresh_and_get_parents(
    key: &super::key::RefreshKey,
    graph: &super::EntityDepGraph,
) -> TViewResult<Vec<super::key::RefreshKey>> {
    // Load metadata
    use crate::catalog::TviewMeta;
    let meta = TviewMeta::load_by_entity(&key.entity)?.ok_or_else(|| {
        crate::TViewError::MetadataNotFound {
            entity: key.entity.clone(),
        }
    })?;

    // Refresh this entity — dispatch on key type
    if let Some(dedup) = &key.dedup_key {
        crate::refresh::refresh_by_dedup_key(meta.view_oid, dedup)?;
    } else {
        crate::refresh::refresh_pk(meta.view_oid, key.pk)?;
    }

    // Find parent entities (NEW: returns keys instead of refreshing)
    let parent_keys = crate::propagate::find_parents_for(key, graph)?;

    Ok(parent_keys)
}

// NOTE: Full 2PC support (PREPARE TRANSACTION with queue persistence) is not
// implemented in 0.1.0. The ProcessUtility hook rejects PREPARE TRANSACTION
// when TVIEW refreshes are pending. See hooks.rs for the guard.