use super::ops::{
clear_queue, is_crash_recovery_checked, mark_crash_recovery_checked, take_queue_snapshot,
};
use crate::TViewResult;
use pgrx::datum::DatumWithOid;
use pgrx::pg_sys;
use pgrx::prelude::*;
use std::collections::HashSet;
use std::os::raw::c_void;
use std::panic::AssertUnwindSafe;
thread_local! {
static SAVEPOINT_DEPTH: std::cell::RefCell<usize> = const { std::cell::RefCell::new(0) };
static QUEUE_SNAPSHOTS: std::cell::RefCell<Vec<HashSet<super::key::RefreshKey>>> =
const { std::cell::RefCell::new(Vec::new()) };
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum XactEvent {
Commit,
Abort,
PreCommit,
Prepare, }
pub unsafe fn register_xact_callback() {
unsafe {
pg_sys::RegisterXactCallback(Some(tview_xact_callback), std::ptr::null_mut());
}
}
pub unsafe fn register_subxact_callback() {
unsafe {
pg_sys::RegisterSubXactCallback(Some(tview_subxact_callback), std::ptr::null_mut());
}
let nest_level = unsafe { pg_sys::GetCurrentTransactionNestLevel() };
SAVEPOINT_DEPTH.with(|d| {
*d.borrow_mut() = (nest_level as usize).saturating_sub(1);
});
QUEUE_SNAPSHOTS.with(|s| {
let mut snapshots = s.borrow_mut();
for _ in 0..(nest_level as usize).saturating_sub(1) {
snapshots.push(HashSet::new());
}
});
}
#[unsafe(no_mangle)]
unsafe extern "C-unwind" fn tview_xact_callback(event: u32, _arg: *mut c_void) {
#[allow(non_upper_case_globals)] let xact_event = match event {
pg_sys::XactEvent::XACT_EVENT_COMMIT => XactEvent::Commit,
pg_sys::XactEvent::XACT_EVENT_PRE_COMMIT => XactEvent::PreCommit,
pg_sys::XactEvent::XACT_EVENT_ABORT => XactEvent::Abort,
pg_sys::XactEvent::XACT_EVENT_PREPARE => XactEvent::Prepare,
_ => return, };
match xact_event {
XactEvent::PreCommit | XactEvent::Commit => {
crate::audit::clear_audit_buffer();
super::ops::clear_crash_recovery_cache();
crate::metrics::metrics_api::reset_metrics();
}
XactEvent::Prepare => {
crate::audit::clear_audit_buffer();
crate::metrics::metrics_api::reset_metrics();
}
XactEvent::Abort => {
clear_queue();
super::ops::clear_crash_recovery_cache();
super::cache::cascade_cache::clear_cache();
crate::audit::clear_audit_buffer();
crate::metrics::metrics_api::reset_metrics();
}
}
}
#[unsafe(no_mangle)]
unsafe extern "C-unwind" fn tview_subxact_callback(
event: u32,
_subxid: pg_sys::SubTransactionId,
_parent_subid: pg_sys::SubTransactionId,
_arg: *mut c_void,
) {
let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
match event {
pg_sys::SubXactEvent::SUBXACT_EVENT_START_SUB => {
SAVEPOINT_DEPTH.with(|d| {
let mut depth = d.borrow_mut();
*depth += 1;
});
let snapshot = take_queue_snapshot();
QUEUE_SNAPSHOTS.with(|s| {
s.borrow_mut().push(snapshot);
});
}
pg_sys::SubXactEvent::SUBXACT_EVENT_ABORT_SUB => {
decrement_savepoint_depth();
if let Some(snapshot) = QUEUE_SNAPSHOTS.with(|s| s.borrow_mut().pop()) {
super::state::replace_queue(snapshot);
}
}
pg_sys::SubXactEvent::SUBXACT_EVENT_COMMIT_SUB => {
decrement_savepoint_depth();
QUEUE_SNAPSHOTS.with(|s| {
s.borrow_mut().pop();
});
}
_ => {
}
}
}));
if result.is_err() {
warning!("PANIC in subtransaction callback - this is a bug!");
}
}
fn decrement_savepoint_depth() {
SAVEPOINT_DEPTH.with(|d| {
let mut depth = d.borrow_mut();
if *depth == 0 {
warning!("pg_tviews: subxact depth underflow — event ordering unexpected");
}
*depth = depth.saturating_sub(1);
});
}
pub fn flush_refresh_queue() -> TViewResult<()> {
let mut pending = take_queue_snapshot();
if pending.is_empty() {
return Ok(());
}
let refresh_timer = crate::metrics::metrics_api::record_refresh_start();
let graph = super::cache::graph_cache::load_cached()?;
let mut processed: std::collections::HashSet<super::key::RefreshKey> =
std::collections::HashSet::with_capacity(pending.len().max(16));
let mut iteration = 1;
loop {
while !pending.is_empty() {
let sorted_keys = graph.sort_keys(pending.drain().collect());
let mut keys_by_entity: std::collections::HashMap<String, Vec<super::key::RefreshKey>> =
std::collections::HashMap::with_capacity(8);
for key in sorted_keys {
if !processed.insert(key.clone()) {
continue;
}
keys_by_entity
.entry(key.entity.clone())
.or_default()
.push(key);
}
for (entity, entity_keys) in keys_by_entity {
if !is_crash_recovery_checked(&entity) {
mark_crash_recovery_checked(&entity);
if crate::lifecycle::detect_post_crash_truncation(&entity)? {
Spi::run_with_args(
"SELECT pg_tviews_refresh($1)",
&[unsafe {
DatumWithOid::new(
&entity,
PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value(),
)
}],
)?;
}
}
if entity_keys.len() == 1 {
let key = &entity_keys[0];
let parents = refresh_and_get_parents(key, &graph)?;
for parent_key in parents {
if !processed.contains(&parent_key) {
pending.insert(parent_key);
}
}
} else {
let mut pks =
Vec::with_capacity(entity_keys.iter().filter(|k| !k.is_dedup()).count());
for key in &entity_keys {
if !key.is_dedup() {
pks.push(key.pk);
}
}
crate::refresh::refresh_bulk(&entity, &pks)?;
let parent_map = crate::propagate::find_parents_batch(&entity_keys, &graph)?;
for parent_keys in parent_map.values() {
for parent_key in parent_keys {
if !processed.contains(parent_key) {
pending.insert(parent_key.clone());
}
}
}
}
}
iteration += 1;
let max_depth = crate::config::max_propagation_depth();
if iteration > max_depth {
return Err(crate::TViewError::PropagationDepthExceeded {
max_depth,
processed: processed.len(),
});
}
}
let late = take_queue_snapshot();
if late.is_empty() {
break;
}
pending = late;
}
{
let mut entity_counts: std::collections::HashMap<&str, i64> =
std::collections::HashMap::new();
for key in &processed {
*entity_counts.entry(&key.entity).or_insert(0) += 1;
}
for (entity, count) in entity_counts {
crate::audit::log_refresh(entity, count);
}
}
crate::metrics::metrics_api::record_refresh_complete(
processed.len(),
iteration - 1,
&refresh_timer,
);
Ok(())
}
fn refresh_and_get_parents(
key: &super::key::RefreshKey,
graph: &super::EntityDepGraph,
) -> TViewResult<Vec<super::key::RefreshKey>> {
use crate::catalog::TviewMeta;
let meta = TviewMeta::load_by_entity(&key.entity)?.ok_or_else(|| {
crate::TViewError::MetadataNotFound {
entity: key.entity.clone(),
}
})?;
if let Some(dedup) = &key.dedup_key {
crate::refresh::refresh_by_dedup_key(meta.view_oid, dedup)?;
} else {
crate::refresh::refresh_pk(meta.view_oid, key.pk)?;
}
let parent_keys = crate::propagate::find_parents_for(key, graph)?;
Ok(parent_keys)
}