use crate::TViewResult;
use crate::catalog::TviewMeta;
use crate::utils::{lookup_view_for_source, quote_identifier};
use pgrx::JsonB;
use pgrx::datum::DatumWithOid;
use pgrx::prelude::*;
use pgrx::spi;
pub fn refresh_bulk(entity: &str, pks: &[i64]) -> TViewResult<()> {
if pks.is_empty() {
return Ok(());
}
let meta =
TviewMeta::load_by_entity(entity)?.ok_or_else(|| crate::TViewError::MetadataNotFound {
entity: entity.to_string(),
})?;
let view_name = lookup_view_for_source(meta.view_oid)?;
let pk_col = format!("pk_{entity}");
let data_col = "data";
let query = format!(
"SELECT {}, {} FROM {} WHERE {} = ANY($1)",
quote_identifier(&pk_col),
quote_identifier(data_col),
quote_identifier(&view_name),
quote_identifier(&pk_col)
);
Spi::connect(|client| {
let args = vec![unsafe {
DatumWithOid::new(
pks.to_vec(),
PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID).value(),
)
}];
let rows = client.select(&query, None, &args)?;
let tv_name = crate::utils::relname_from_oid(meta.tview_oid)?;
let mut update_pks: Vec<i64> = Vec::new();
let mut update_data: Vec<JsonB> = Vec::new();
for row in rows {
let pk: i64 = row[&pk_col as &str].value()?.ok_or_else(|| {
spi::Error::from(crate::TViewError::SpiError {
query: String::new(),
error: format!("{pk_col} column is NULL"),
})
})?;
let data: JsonB = row["data"].value()?.ok_or_else(|| {
spi::Error::from(crate::TViewError::SpiError {
query: String::new(),
error: "data column is NULL".to_string(),
})
})?;
update_pks.push(pk);
update_data.push(data);
}
if update_pks.is_empty() {
return Ok(()); }
let update_query = format!(
"UPDATE {}
SET data = v.data, updated_at = now()
FROM (
SELECT unnest($1::bigint[]) as pk,
unnest($2::jsonb[]) as data
) AS v
WHERE {}.{} = v.pk",
quote_identifier(&tv_name),
quote_identifier(&tv_name),
quote_identifier(&pk_col)
);
Spi::run_with_args(
&update_query,
&[
unsafe {
DatumWithOid::new(
update_pks,
PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID).value(),
)
},
unsafe {
DatumWithOid::new(
update_data,
PgOid::BuiltIn(PgBuiltInOids::JSONBARRAYOID).value(),
)
},
],
)?;
Ok(())
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_refresh_bulk_empty() {
assert!(refresh_bulk("test", &[]).is_ok());
}
}