use super::key::RefreshKey;
use super::state::{TX_CRASH_RECOVERY_CHECKED, TX_REFRESH_QUEUE};
use std::collections::HashSet;
fn check_queue_backpressure(limit: usize) -> Result<(), String> {
let current_size = TX_REFRESH_QUEUE.with(|q| q.borrow().len());
if current_size >= limit {
return Err(format!(
"refresh queue backpressure: queue size ({}) would exceed max_queue_size ({})",
current_size, limit
));
}
Ok(())
}
pub fn enqueue_refresh_with_limit(entity: &str, pk: i64, limit: usize) -> Result<(), String> {
check_queue_backpressure(limit)?;
TX_REFRESH_QUEUE.with(|q| {
q.borrow_mut().insert(RefreshKey::pk(entity, pk));
});
Ok(())
}
pub fn enqueue_refresh(entity: &str, pk: i64) {
if let Err(msg) = enqueue_refresh_with_limit(entity, pk, crate::config::max_queue_size()) {
pgrx::error!("{}", msg);
}
}
pub fn enqueue_refresh_dedup(entity: &str, dedup_key: &str) {
TX_REFRESH_QUEUE.with(|q| {
let limit = crate::config::max_queue_size();
check_queue_backpressure(limit).unwrap_or_else(|msg| {
pgrx::error!("{}", msg);
});
q.borrow_mut().insert(RefreshKey::dedup(entity, dedup_key));
});
}
pub fn enqueue_refresh_bulk(entity: &str, pks: Vec<i64>) {
TX_REFRESH_QUEUE.with(|q| {
let limit = crate::config::max_queue_size();
check_queue_backpressure(limit).unwrap_or_else(|msg| {
pgrx::error!("{}", msg);
});
let mut queue = q.borrow_mut();
for pk in pks {
queue.insert(RefreshKey::pk(entity, pk));
}
});
}
pub fn take_queue_snapshot() -> HashSet<RefreshKey> {
TX_REFRESH_QUEUE.with(|q| {
let mut queue = q.borrow_mut();
std::mem::take(&mut *queue)
})
}
pub fn is_queue_empty() -> bool {
TX_REFRESH_QUEUE.with(|q| q.borrow().is_empty())
}
pub fn clear_queue() {
TX_REFRESH_QUEUE.with(|q| {
q.borrow_mut().clear();
});
}
pub fn is_crash_recovery_checked(entity: &str) -> bool {
TX_CRASH_RECOVERY_CHECKED.with(|checked| checked.borrow().contains(entity))
}
pub fn mark_crash_recovery_checked(entity: &str) {
TX_CRASH_RECOVERY_CHECKED.with(|checked| {
checked.borrow_mut().insert(entity.to_string());
});
}
pub fn clear_crash_recovery_cache() {
TX_CRASH_RECOVERY_CHECKED.with(|checked| {
checked.borrow_mut().clear();
});
}
pub fn spi_batch_lookup(
table_oid: pgrx::pg_sys::Oid,
lookup_col: &str,
carry_col: &str,
ids: &[i64],
) -> crate::TViewResult<Vec<i64>> {
use crate::utils::{qualified_relname_from_oid, quote_identifier};
use pgrx::prelude::*;
if ids.is_empty() {
return Ok(vec![]);
}
let qualified_name = qualified_relname_from_oid(table_oid).map_err(|e| {
crate::TViewError::SpiError {
query: "qualified_relname_from_oid".to_string(),
error: format!("failed to resolve table OID {table_oid:?}: {e}"),
}
})?;
let qi_lookup = quote_identifier(lookup_col);
let qi_carry = quote_identifier(carry_col);
let query = format!("SELECT {qi_carry} FROM {qualified_name} WHERE {qi_lookup} = ANY($1)");
let pks_array = ids.to_vec();
let args = vec![unsafe {
pgrx::datum::DatumWithOid::new(
pks_array,
PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID).value(),
)
}];
Spi::connect(|client| {
let rows = client.select(&query, None, &args)?;
let mut result = Vec::new();
for row in rows {
if let Some(value) = row[1].value::<i64>()? {
result.push(value);
}
}
Ok(result)
})
.map_err(|e: pgrx::spi::Error| crate::TViewError::SpiError {
query,
error: format!("SPI batch lookup failed: {e}"),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_enqueue_and_snapshot() {
clear_queue();
enqueue_refresh("user", 1);
enqueue_refresh("post", 2);
enqueue_refresh("user", 1);
let snapshot = take_queue_snapshot();
assert_eq!(snapshot.len(), 2);
let empty_snapshot = take_queue_snapshot();
assert_eq!(empty_snapshot.len(), 0);
}
#[test]
fn test_clear_queue() {
clear_queue();
enqueue_refresh("user", 1);
enqueue_refresh("post", 2);
clear_queue();
let snapshot = take_queue_snapshot();
assert_eq!(snapshot.len(), 0);
}
#[test]
fn test_enqueue_respects_max_queue_size() {
clear_queue();
let limit = 2;
enqueue_refresh_with_limit("user", 1, limit).expect("first insert should succeed");
enqueue_refresh_with_limit("post", 2, limit).expect("second insert should succeed");
assert!(
enqueue_refresh_with_limit("user", 3, limit).is_err(),
"third insert should fail"
);
let snapshot = take_queue_snapshot();
assert_eq!(snapshot.len(), 2);
}
}