use pgrx::PgBuiltInOids;
use pgrx::PgOid;
use pgrx::datum::DatumWithOid;
use pgrx::pg_sys::Oid;
use pgrx::prelude::*;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::utils::quote_identifier;
static JSONB_IVM_AVAILABLE: AtomicBool = AtomicBool::new(false);
static JSONB_IVM_CHECKED: AtomicBool = AtomicBool::new(false);
#[pg_extern]
#[allow(clippy::missing_const_for_fn)] fn pg_tviews_version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
#[pg_extern]
const fn pg_tviews_hook_status() -> &'static str {
"Extension loaded - hook installation attempted in _PG_init"
}
pub fn check_jsonb_delta_available() -> bool {
if JSONB_IVM_CHECKED.load(Ordering::Relaxed) {
return JSONB_IVM_AVAILABLE.load(Ordering::Relaxed);
}
let result: Result<bool, spi::Error> = Spi::connect(|client| {
let rows = client.select(
"SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'jsonb_delta')",
None,
&[],
)?;
for row in rows {
if let Some(exists) = row[1].value::<bool>()? {
return Ok(exists);
}
}
Ok(false)
});
let is_available = result.unwrap_or(false);
JSONB_IVM_AVAILABLE.store(is_available, Ordering::Relaxed);
JSONB_IVM_CHECKED.store(true, Ordering::Relaxed);
is_available
}
#[pg_extern]
pub fn pg_tviews_recover_after_crash(entity_name: &str) -> crate::TViewResult<bool> {
if detect_post_crash_truncation(entity_name)? {
Spi::run_with_args(
"SELECT pg_tviews_refresh($1)",
&[unsafe {
DatumWithOid::new(entity_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value())
}],
)?;
Ok(true)
} else {
Ok(false)
}
}
pub fn detect_post_crash_truncation(entity_name: &str) -> crate::TViewResult<bool> {
let (table_oid_opt, view_oid_opt): (Option<Oid>, Option<Oid>) = Spi::get_two_with_args(
"SELECT table_oid, view_oid FROM pg_tview_meta WHERE entity = $1",
&[unsafe {
DatumWithOid::new(entity_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value())
}],
)?;
let table_oid = match table_oid_opt {
Some(t) => t,
None => return Ok(false), };
let view_oid = match view_oid_opt {
Some(v) => v,
None => return Ok(false), };
let is_unlogged: Option<bool> = Spi::get_one_with_args(
"SELECT relpersistence = 'u' FROM pg_class WHERE oid = $1 AND relkind = 'r'",
&[unsafe { DatumWithOid::new(table_oid, PgOid::BuiltIn(PgBuiltInOids::OIDOID).value()) }],
)?;
if !is_unlogged.unwrap_or(false) {
return Ok(false);
}
let schema: Option<String> = Spi::get_one_with_args(
"SELECT n.nspname::text FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE c.oid = $1",
&[unsafe { DatumWithOid::new(table_oid, PgOid::BuiltIn(PgBuiltInOids::OIDOID).value()) }],
)?;
let schema = schema.unwrap_or_else(|| "public".to_string());
let tview_table: Option<String> = Spi::get_one_with_args(
"SELECT relname::text FROM pg_class WHERE oid = $1",
&[unsafe { DatumWithOid::new(table_oid, PgOid::BuiltIn(PgBuiltInOids::OIDOID).value()) }],
)?;
let backing_view: Option<String> = Spi::get_one_with_args(
"SELECT relname::text FROM pg_class WHERE oid = $1",
&[unsafe { DatumWithOid::new(view_oid, PgOid::BuiltIn(PgBuiltInOids::OIDOID).value()) }],
)?;
let tview_table = tview_table.unwrap_or_default();
let backing_view = backing_view.unwrap_or_default();
let table_has_rows: Option<bool> = Spi::get_one(&format!(
"SELECT EXISTS(SELECT 1 FROM {}.{} LIMIT 1)",
quote_identifier(&schema),
quote_identifier(&tview_table)
))?;
if table_has_rows.unwrap_or(false) {
return Ok(false);
}
let view_has_rows: Option<bool> = Spi::get_one(&format!(
"SELECT EXISTS(SELECT 1 FROM {}.{} LIMIT 1)",
quote_identifier(&schema),
quote_identifier(&backing_view)
))?;
Ok(view_has_rows.unwrap_or(false))
}
#[pg_extern]
fn pg_tviews_check_jsonb_delta() -> bool {
check_jsonb_delta_available()
}
pub fn invalidate_jsonb_delta_cache() {
JSONB_IVM_CHECKED.store(false, Ordering::Relaxed);
JSONB_IVM_AVAILABLE.store(false, Ordering::Relaxed);
}
#[pg_guard]
pub extern "C-unwind" fn _PG_init() {
crate::config::register_gucs();
unsafe {
crate::hooks::ensure_hook_installed();
}
unsafe {
crate::queue::xact::register_xact_callback();
crate::queue::xact::register_subxact_callback();
}
}