use crate::catalog::entity_for_table;
use crate::queue::{enqueue_refresh, enqueue_refresh_bulk, enqueue_refresh_dedup};
use crate::utils::{IntExtraction, quote_identifier, tuple_get_i64};
use pgrx::prelude::*;
use pgrx::spi;
enum KeyExtraction {
Value(String),
Null,
TypeMismatch,
}
fn extract_distinct_on_key<'a>(
tuple: &PgHeapTuple<'a, AllocatedByPostgres>,
key_col: &str,
) -> KeyExtraction {
match tuple.get_by_name::<String>(key_col) {
Ok(Some(val)) => return KeyExtraction::Value(val),
Ok(None) => return KeyExtraction::Null,
Err(_) => {} }
match tuple.get_by_name::<i64>(key_col) {
Ok(Some(val)) => return KeyExtraction::Value(val.to_string()),
Ok(None) => return KeyExtraction::Null,
Err(_) => {}
}
match tuple.get_by_name::<i32>(key_col) {
Ok(Some(val)) => return KeyExtraction::Value(val.to_string()),
Ok(None) => return KeyExtraction::Null,
Err(_) => {}
}
KeyExtraction::TypeMismatch
}
#[pg_trigger]
#[allow(clippy::unnecessary_wraps)] fn pg_tview_trigger_handler<'a>(
trigger: &'a PgTrigger<'a>,
) -> Result<Option<PgHeapTuple<'a, AllocatedByPostgres>>, spi::Error> {
let table_oid = match trigger.relation() {
Ok(rel) => rel.oid(),
Err(e) => {
warning!("Failed to get trigger relation: {:?}", e);
return Ok(None);
}
};
match crate::queue::cache::table_cache::entity_info_cached(table_oid) {
Ok(Some(entity_info)) => {
let entity = &entity_info.name;
if let Some(key_col) = &entity_info.distinct_on_key {
let tuple = match trigger.new().or_else(|| trigger.old()) {
Some(t) => t,
None => {
warning!("No tuple in trigger context for DISTINCT ON TVIEW '{entity}'");
return Ok(None);
}
};
match extract_distinct_on_key(&tuple, key_col) {
KeyExtraction::Value(key_val) => {
enqueue_refresh_dedup(entity, &key_val);
}
KeyExtraction::Null => {
warning!(
"DISTINCT ON key '{key_col}' is NULL for entity '{entity}' — skipping refresh"
);
}
KeyExtraction::TypeMismatch => {
warning!(
"Cannot extract DISTINCT ON key '{key_col}' for '{entity}': \
unsupported column type — skipping refresh for this row"
);
}
}
} else {
let pk_value = match crate::utils::extract_pk(trigger) {
Ok(pk) => pk,
Err(e) => {
warning!("Failed to extract primary key from trigger: {:?}", e);
return Ok(None);
}
};
enqueue_refresh(entity, pk_value);
}
return Ok(None);
}
Ok(None) => { }
Err(e) => {
warning!(
"Failed to resolve entity for table OID {:?}: {:?}",
table_oid,
e
);
return Ok(None);
}
}
enqueue_cascade_parents(trigger, table_oid);
Ok(None)
}
fn enqueue_cascade_parents(trigger: &PgTrigger, table_oid: pg_sys::Oid) {
let paths: Vec<crate::cascade_path::CascadePath> =
match crate::queue::cache::cascade_cache::cascade_paths_for_table(table_oid) {
Ok(p) => p,
Err(e) => {
warning!(
"Failed to load cascade paths for table {:?}: {:?}",
table_oid,
e
);
return;
}
};
if paths.is_empty() {
return;
}
let Some(tuple) = trigger.new().or_else(|| trigger.old()) else {
warning!("No tuple available in trigger context");
return;
};
for path in &paths {
if let Err(e) = follow_cascade_path(path, &tuple) {
warning!(
"Cascade refresh failed for path {} → {}: {:?}",
path.source_table,
path.entity_name,
e
);
}
}
}
fn follow_cascade_path(
path: &crate::cascade_path::CascadePath,
tuple: &PgHeapTuple<AllocatedByPostgres>,
) -> crate::TViewResult<()> {
if path.unresolvable {
warning!(
"Unresolvable cascade path for entity '{}' — full refresh needed",
path.entity_name
);
return Ok(());
}
let mut current_ids = match tuple_get_i64(tuple, &path.initial_col) {
IntExtraction::Value(pk) => vec![pk],
IntExtraction::Null => return Ok(()), IntExtraction::Missing => {
warning!(
"Initial column '{}' not found on tuple for cascade to '{}'",
path.initial_col,
path.entity_name
);
return Ok(());
}
};
for hop in &path.hops {
if current_ids.is_empty() {
return Ok(());
}
current_ids = crate::queue::spi_batch_lookup(
hop.table_oid,
&hop.lookup_col,
&hop.carry_col,
¤t_ids,
)?;
}
for pk in current_ids {
enqueue_refresh(&path.entity_name, pk);
}
Ok(())
}
#[pg_trigger]
#[allow(clippy::unnecessary_wraps)] fn pg_tview_flush_trigger<'a>(
_trigger: &'a PgTrigger<'a>,
) -> Result<Option<PgHeapTuple<'a, AllocatedByPostgres>>, spi::Error> {
if let Err(e) = crate::queue::flush_refresh_queue() {
warning!("TVIEW refresh failed in statement trigger: {:?}", e);
}
if let Err(e) = crate::audit::flush_audit_buffer() {
warning!("Audit flush failed in statement trigger: {:?}", e);
}
Ok(None)
}
#[pg_trigger]
#[allow(clippy::unnecessary_wraps)] fn pg_tview_stmt_trigger_handler<'a>(
trigger: &'a PgTrigger<'a>,
) -> Result<Option<PgHeapTuple<'a, AllocatedByPostgres>>, spi::Error> {
let table_oid = match trigger.relation() {
Ok(rel) => rel.oid(),
Err(e) => {
warning!("Failed to get trigger relation: {:?}", e);
return Ok(None);
}
};
let entity = match entity_for_table(table_oid) {
Ok(Some(e)) => e,
Ok(None) => {
return Ok(None);
}
Err(e) => {
warning!(
"Failed to resolve entity for table OID {:?}: {:?}",
table_oid,
e
);
return Ok(None);
}
};
let changed_pks = match extract_pks_from_transition_table(trigger) {
Ok(pks) => pks,
Err(e) => {
warning!("Failed to extract PKs from transition table: {:?}", e);
return Ok(None);
}
};
if changed_pks.is_empty() {
return Ok(None);
}
enqueue_refresh_bulk(&entity, changed_pks);
Ok(None)
}
fn extract_pks_from_transition_table(trigger: &PgTrigger) -> spi::Result<Vec<i64>> {
let transition_table_name = if trigger.new().is_some() && trigger.old().is_none() {
"new_table" } else if trigger.new().is_none() && trigger.old().is_some() {
"old_table" } else if trigger.new().is_some() && trigger.old().is_some() {
"new_table" } else {
return Ok(Vec::new()); };
let pk_column = get_pk_column_name(
trigger
.relation()
.map_err(|_| crate::TViewError::SpiError {
query: "get relation".to_string(),
error: "Failed to get trigger relation".to_string(),
})?
.oid(),
)?;
let query = format!(
"SELECT DISTINCT {} FROM {}",
quote_identifier(&pk_column),
transition_table_name );
Spi::connect(|client| {
let rows = client.select(&query, None, &[])?;
let mut pks = Vec::new();
for row in rows {
if let Some(pk) = row[&pk_column as &str].value::<i64>()? {
pks.push(pk);
}
}
Ok(pks)
})
}
fn get_pk_column_name(table_oid: pg_sys::Oid) -> spi::Result<String> {
let entity = match entity_for_table(table_oid) {
Ok(Some(e)) => e,
Ok(None) => {
return Err(crate::TViewError::SpiError {
query: "entity_for_table".to_string(),
error: "Table not managed by pg_tviews".to_string(),
}
.into());
}
Err(e) => {
return Err(crate::TViewError::SpiError {
query: "entity_for_table".to_string(),
error: format!("Failed to get entity: {e:?}"),
}
.into());
}
};
Ok(format!("pk_{entity}"))
}