pg_tviews 0.1.0-beta.12

Transactional materialized views with incremental refresh for PostgreSQL
//! Bulk Refresh API
//!
//! Provides efficient refresh of multiple rows in a single operation.
//! Reduces query count from N queries to 2 queries for N rows.

use crate::TViewResult;
use crate::catalog::TviewMeta;
use crate::utils::{lookup_view_for_source, quote_identifier};
use pgrx::JsonB;
use pgrx::datum::DatumWithOid;
use pgrx::prelude::*;
use pgrx::spi;

/// Refresh multiple rows of the same entity in a single operation
///
/// This is the bulk refresh API that replaces individual `refresh_pk()` calls
/// for statement-level triggers and other bulk operations.
///
/// # Arguments
///
/// * `entity` - Entity name (e.g., "post", "user")
/// * `pks` - Vector of primary key values to refresh
///
/// # Returns
///
/// Result indicating success or failure
///
/// # Performance
///
/// - **Individual refresh**: N queries (1 SELECT + 1 UPDATE per row)
/// - **Bulk refresh**: 2 queries (1 SELECT + 1 UPDATE for all rows)
/// - **Speedup**: 100-500× fewer queries (workload-dependent)
///
/// # Example
///
/// ```rust
/// // Instead of:
/// for pk in &[1, 2, 3, 4, 5] {
///     refresh_pk(source_oid, *pk)?;
/// }
///
/// // Use:
/// refresh_bulk("post", &[1, 2, 3, 4, 5])?;
/// ```
pub fn refresh_bulk(entity: &str, pks: &[i64]) -> TViewResult<()> {
    if pks.is_empty() {
        return Ok(());
    }

    // Load metadata once
    let meta =
        TviewMeta::load_by_entity(entity)?.ok_or_else(|| crate::TViewError::MetadataNotFound {
            entity: entity.to_string(),
        })?;

    // Recompute ALL rows in a single query using parameterized ANY($1)
    let view_name = lookup_view_for_source(meta.view_oid)?;
    let pk_col = format!("pk_{entity}");

    let data_col = "data";
    let query = format!(
        "SELECT {}, {} FROM {} WHERE {} = ANY($1)",
        quote_identifier(&pk_col),
        quote_identifier(data_col),
        quote_identifier(&view_name),
        quote_identifier(&pk_col)
    );

    Spi::connect(|client| {
        // Create PostgreSQL BIGINT[] array from Vec<i64>
        let args = vec![unsafe {
            DatumWithOid::new(
                pks.to_vec(),
                PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID).value(),
            )
        }];
        let rows = client.select(&query, None, &args)?;

        // Batch update using UPDATE ... FROM unnest()
        let tv_name = crate::utils::relname_from_oid(meta.tview_oid)?;

        // Collect data for update
        let mut update_pks: Vec<i64> = Vec::new();
        let mut update_data: Vec<JsonB> = Vec::new();

        for row in rows {
            let pk: i64 = row[&pk_col as &str].value()?.ok_or_else(|| {
                spi::Error::from(crate::TViewError::SpiError {
                    query: String::new(),
                    error: format!("{pk_col} column is NULL"),
                })
            })?;
            let data: JsonB = row["data"].value()?.ok_or_else(|| {
                spi::Error::from(crate::TViewError::SpiError {
                    query: String::new(),
                    error: "data column is NULL".to_string(),
                })
            })?;
            update_pks.push(pk);
            update_data.push(data);
        }

        if update_pks.is_empty() {
            return Ok(()); // No rows to update
        }

        // SAFE: Single UPDATE with unnest() (parameterized)
        let update_query = format!(
            "UPDATE {}
             SET data = v.data, updated_at = now()
             FROM (
                 SELECT unnest($1::bigint[]) as pk,
                        unnest($2::jsonb[]) as data
             ) AS v
             WHERE {}.{} = v.pk",
            quote_identifier(&tv_name),
            quote_identifier(&tv_name),
            quote_identifier(&pk_col)
        );

        // Execute batch update with parameters
        Spi::run_with_args(
            &update_query,
            &[
                unsafe {
                    DatumWithOid::new(
                        update_pks,
                        PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID).value(),
                    )
                },
                unsafe {
                    DatumWithOid::new(
                        update_data,
                        PgOid::BuiltIn(PgBuiltInOids::JSONBARRAYOID).value(),
                    )
                },
            ],
        )?;

        Ok(())
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_refresh_bulk_empty() {
        // Empty PK list should succeed without doing anything
        assert!(refresh_bulk("test", &[]).is_ok());
    }
}