use pgrx::JsonB;
use pgrx::datum::DatumWithOid;
use pgrx::pg_sys::Oid;
use pgrx::prelude::*;
use crate::catalog::{DependencyDetail, DependencyType, TviewMeta};
use crate::lifecycle::check_jsonb_delta_available;
use crate::utils::{lookup_view_for_source, relname_from_oid};
const DEFAULT_ARRAY_MATCH_KEY: &str = "id";
pub struct ViewRow {
pub entity_name: String,
pub pk: i64,
pub tview_oid: Oid,
pub data: JsonB,
}
pub fn refresh_pk(source_oid: Oid, pk: i64) -> spi::Result<()> {
let meta = TviewMeta::load_for_source(source_oid)?;
let Some(meta) = meta else {
error!("No TVIEW metadata for source_oid: {:?}", source_oid);
};
let view_row = recompute_view_row(&meta, pk)?;
apply_patch(&view_row, &meta)?;
Ok(())
}
pub fn refresh_by_dedup_key(source_oid: Oid, dedup_key: &str) -> spi::Result<()> {
let meta = TviewMeta::load_for_source(source_oid)?;
let Some(meta) = meta else {
error!("No TVIEW metadata for source_oid: {:?}", source_oid);
};
if meta.distinct_on_keys.is_empty() {
error!(
"refresh_by_dedup_key called on non-DISTINCT-ON TVIEW '{}'",
meta.entity_name
);
}
let key_col = &meta.distinct_on_keys[0];
let view_name = lookup_view_for_source(meta.view_oid)?;
let tv_name = relname_from_oid(meta.tview_oid)?;
let count_sql = format!("SELECT COUNT(*) FROM {view_name} WHERE {key_col}::text = $1");
let row_count: i64 = Spi::connect(|client| {
let args = vec![unsafe {
DatumWithOid::new(dedup_key, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value())
}];
let mut rows = client.select(&count_sql, None, &args)?;
let count = rows
.next()
.and_then(|r| r["count"].value::<i64>().ok().flatten())
.unwrap_or(0);
Ok::<i64, spi::SpiError>(count)
})?;
if row_count == 0 {
let delete_sql = format!("DELETE FROM {tv_name} WHERE {key_col}::text = $1");
Spi::run_with_args(
&delete_sql,
&[unsafe {
DatumWithOid::new(dedup_key, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value())
}],
)?;
} else {
let cached_dml: Option<(String, String)> = {
let cache = crate::utils::DEDUP_DML_CACHE
.lock()
.unwrap_or_else(|e| e.into_inner());
cache.get(&view_name).cloned()
};
let (col_list, do_update) = match cached_dml {
Some(dml) => dml,
None => {
let col_names = crate::utils::get_view_columns(&view_name)?;
if col_names.is_empty() {
return Ok(());
}
let dml = build_dedup_dml_components(&col_names, key_col.as_str());
crate::utils::DEDUP_DML_CACHE
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(view_name.clone(), dml.clone());
dml
}
};
let upsert_sql = format!(
"INSERT INTO {tv_name} ({col_list}) \
SELECT {col_list} FROM {view_name} WHERE {key_col}::text = $1 LIMIT 1 \
ON CONFLICT ({key_col}) DO UPDATE SET {do_update}"
);
Spi::run_with_args(
&upsert_sql,
&[unsafe {
DatumWithOid::new(dedup_key, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value())
}],
)?;
}
Ok(())
}
fn build_dedup_dml_components(col_names: &[String], key_col: &str) -> (String, String) {
let do_update: String = {
let mut update_parts = Vec::with_capacity(col_names.len());
for c in col_names {
if c.as_str() != key_col {
update_parts.push(format!("{c} = EXCLUDED.{c}"));
}
}
update_parts.push("updated_at = NOW()".to_string());
update_parts.join(", ")
};
let col_list = col_names.join(", ");
(col_list, do_update)
}
fn recompute_view_row(meta: &TviewMeta, pk: i64) -> spi::Result<ViewRow> {
let view_name = lookup_view_for_source(meta.view_oid)?;
let pk_col = format!("pk_{}", meta.entity_name);
let sql = format!("SELECT * FROM {view_name} WHERE {pk_col} = $1");
Spi::connect(|client| {
let args =
vec![unsafe { DatumWithOid::new(pk, PgOid::BuiltIn(PgBuiltInOids::INT8OID).value()) }];
let mut rows = client.select(&sql, None, &args)?;
let row_data = rows.next().ok_or_else(|| {
spi::Error::from(crate::TViewError::SpiError {
query: sql.clone(),
error: format!(
"TVIEW '{}': No row found in backing view '{view_name}' for {pk_col} = {pk}. \
Possible causes: (1) row deleted from base table, \
(2) row violates UNION ALL branch conditions, \
(3) row filtered by view WHERE clause",
meta.entity_name
),
})
})?;
if meta.is_union && rows.next().is_some() {
let policy = crate::config::union_duplicate_policy();
if policy == "first" {
warning!(
"TVIEW '{}': UNION ALL backing view returned multiple rows for pk={}; \
taking first row (union_duplicate_policy=first)",
meta.entity_name,
pk
);
} else {
return Err(spi::Error::from(crate::TViewError::SpiError {
query: sql.clone(),
error: format!(
"TVIEW '{}': UNION ALL backing view returned multiple rows for pk={}. \
Ensure UNION ALL branches are mutually exclusive, or set \
pg_tviews.union_duplicate_policy='first' to suppress this error.",
meta.entity_name, pk
),
}));
}
}
let data: JsonB = row_data["data"].value()?.ok_or_else(|| {
spi::Error::from(crate::TViewError::SpiError {
query: sql.clone(),
error: format!(
"TVIEW '{}': data column is NULL for {pk_col} = {pk} in view '{view_name}'. \
Ensure TVIEW definition includes a non-NULL data column.",
meta.entity_name
),
})
})?;
Ok(ViewRow {
entity_name: meta.entity_name.clone(),
pk,
tview_oid: meta.tview_oid,
data,
})
})
}
fn apply_patch(row: &ViewRow, meta: &TviewMeta) -> spi::Result<()> {
let tv_name = relname_from_oid(row.tview_oid)?;
let pk_col = format!("pk_{}", row.entity_name);
if !check_jsonb_delta_available() {
warning!(
"jsonb_delta extension not installed. Smart patching disabled. \
Install with: CREATE EXTENSION jsonb_delta; \
Performance: Full replacement is ~2× slower for cascades."
);
return apply_full_replacement(row, meta);
}
let deps = meta.parse_dependencies();
if deps.is_empty() {
return apply_full_replacement(row, meta);
}
let sql = build_smart_patch_sql(&tv_name, &pk_col, &deps);
Spi::run_with_args(
&sql,
&[
unsafe {
DatumWithOid::new(
JsonB(row.data.0.clone()),
PgOid::BuiltIn(PgBuiltInOids::JSONBOID).value(),
)
},
unsafe { DatumWithOid::new(row.pk, PgOid::BuiltIn(PgBuiltInOids::INT8OID).value()) },
],
)?;
Ok(())
}
fn build_smart_patch_sql(tv_name: &str, pk_col: &str, deps: &[DependencyDetail]) -> String {
if deps.is_empty() {
return format!(
"UPDATE {tv_name} SET data = $1::jsonb, updated_at = now() WHERE {pk_col} = $2"
);
}
let mut patch_expr = "data".to_string();
for dep in deps {
patch_expr = match dep.dep_type {
DependencyType::NestedObject => {
if let Some(path) = &dep.path {
let path_str = path.join(",");
format!(
"jsonb_smart_patch_nested({patch_expr}, $1::jsonb, ARRAY['{path_str}'])"
)
} else {
warning!("NestedObject dependency missing path, skipping");
patch_expr
}
}
DependencyType::Array => {
if let Some(path) = &dep.path {
let path_str = path.join(",");
let match_key = dep.match_key.as_deref().unwrap_or(DEFAULT_ARRAY_MATCH_KEY);
format!(
"jsonb_smart_patch_array({patch_expr}, $1::jsonb, ARRAY['{path_str}'], '{match_key}')"
)
} else {
warning!("Array dependency missing path, skipping");
patch_expr
}
}
DependencyType::Scalar => {
format!("jsonb_smart_patch_scalar({patch_expr}, $1::jsonb)")
}
};
}
format!("UPDATE {tv_name} SET data = {patch_expr}, updated_at = now() WHERE {pk_col} = $2")
}
fn apply_full_replacement(row: &ViewRow, meta: &TviewMeta) -> spi::Result<()> {
let tv_name = relname_from_oid(row.tview_oid)?;
let pk_col = format!("pk_{}", row.entity_name);
let view_name = lookup_view_for_source(meta.view_oid)?;
let col_names = crate::utils::get_view_columns(&view_name)?;
let do_update: String = {
let mut update_parts = Vec::with_capacity(col_names.len());
for c in &col_names {
if c.as_str() != pk_col.as_str() {
update_parts.push(format!("{c} = EXCLUDED.{c}"));
}
}
update_parts.push("updated_at = NOW()".to_string());
update_parts.join(", ")
};
let col_list = col_names.join(", ");
let sql = format!(
"INSERT INTO {tv_name} ({col_list}) \
SELECT {col_list} FROM {view_name} WHERE {pk_col} = $1 \
ON CONFLICT ({pk_col}) DO UPDATE SET {do_update}"
);
Spi::run_with_args(
&sql,
&[unsafe { DatumWithOid::new(row.pk, PgOid::BuiltIn(PgBuiltInOids::INT8OID).value()) }],
)?;
Ok(())
}
#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
use pgrx::JsonB;
use pgrx::prelude::*;
#[pg_test]
fn test_apply_patch_nested_object() {
Spi::run("CREATE TABLE tb_user (pk_user BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run(
"CREATE TABLE tb_post (
pk_post BIGSERIAL PRIMARY KEY,
fk_user BIGINT REFERENCES tb_user(pk_user),
title TEXT
)",
)
.unwrap();
Spi::run("INSERT INTO tb_user (pk_user, name) VALUES (1, 'Alice')").unwrap();
Spi::run("INSERT INTO tb_post (pk_post, fk_user, title) VALUES (1, 1, 'Hello')").unwrap();
Spi::run(
"
SELECT pg_tviews_create('user', $$
SELECT pk_user, jsonb_build_object('name', name) AS data
FROM tb_user
$$)
",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create(
'post',
$$
SELECT pk_post, fk_user,
jsonb_build_object(
'title', title,
'author', v_user.data
) AS data
FROM tb_post
LEFT JOIN v_user ON v_user.pk_user = tb_post.fk_user
$$
)
",
)
.unwrap();
let meta = crate::utils::spi_get_string(
"
SELECT dependency_types::text FROM pg_tview_meta
WHERE entity = 'post'
",
)
.unwrap()
.unwrap();
assert!(
meta.contains("nested_object"),
"Expected nested_object dependency, got: {meta}"
);
let initial_data = Spi::get_one::<JsonB>(
"
SELECT data FROM tv_post WHERE pk_post = 1
",
)
.unwrap()
.unwrap();
let initial_json = &initial_data.0;
assert_eq!(initial_json["title"], "Hello");
assert_eq!(initial_json["author"]["name"], "Alice");
Spi::run("UPDATE tb_user SET name = 'Alice Updated' WHERE pk_user = 1").unwrap();
let user_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_user'::regclass::oid")
.unwrap()
.unwrap();
crate::refresh::refresh_pk(user_oid, 1).unwrap();
let post_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_post'::regclass::oid")
.unwrap()
.unwrap();
crate::refresh::refresh_pk(post_oid, 1).unwrap();
let updated_data = Spi::get_one::<JsonB>(
"
SELECT data FROM tv_post WHERE pk_post = 1
",
)
.unwrap()
.unwrap();
let updated_json = &updated_data.0;
assert_eq!(
updated_json["title"], "Hello",
"Title should NOT be touched by smart patch"
);
assert_eq!(
updated_json["author"]["name"], "Alice Updated",
"Author name should be updated via smart patch"
);
}
#[pg_test]
fn test_apply_patch_array() {
Spi::run("CREATE TABLE tb_user (pk_user BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run(
"CREATE TABLE tb_post (
pk_post BIGSERIAL PRIMARY KEY,
fk_user BIGINT REFERENCES tb_user(pk_user),
title TEXT
)",
)
.unwrap();
Spi::run(
"CREATE TABLE tb_comment (
pk_comment BIGSERIAL PRIMARY KEY,
fk_post BIGINT REFERENCES tb_post(pk_post),
fk_user BIGINT REFERENCES tb_user(pk_user),
text TEXT
)",
)
.unwrap();
Spi::run("INSERT INTO tb_user (pk_user, name) VALUES (1, 'Alice')").unwrap();
Spi::run("INSERT INTO tb_post (pk_post, fk_user, title) VALUES (1, 1, 'Hello')").unwrap();
Spi::run(
"INSERT INTO tb_comment (pk_comment, fk_post, fk_user, text)
VALUES (1, 1, 1, 'Great post!')",
)
.unwrap();
Spi::run(
"INSERT INTO tb_comment (pk_comment, fk_post, fk_user, text)
VALUES (2, 1, 1, 'Thanks!')",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('user', $$
SELECT pk_user, jsonb_build_object('name', name) AS data
FROM tb_user
$$)
",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('comment', $$
SELECT pk_comment, fk_post, fk_user,
jsonb_build_object('text', text) AS data
FROM tb_comment
$$)
",
)
.unwrap();
Spi::run("
SELECT pg_tviews_create(
'post',
$$
SELECT pk_post, fk_user,
jsonb_build_object(
'title', title,
'author', v_user.data,
'comments', COALESCE(jsonb_agg(v_comment.data ORDER BY v_comment.pk_comment), '[]'::jsonb)
) AS data
FROM tb_post
LEFT JOIN v_user ON v_user.pk_user = tb_post.fk_user
LEFT JOIN v_comment ON v_comment.fk_post = tb_post.pk_post
GROUP BY pk_post, fk_user, title, v_user.data
$$
)
").unwrap();
let meta = crate::utils::spi_get_string(
"
SELECT dependency_types::text FROM pg_tview_meta
WHERE entity = 'post'
",
)
.unwrap()
.unwrap();
assert!(
meta.contains("array"),
"Expected array dependency, got: {meta}"
);
let initial_data = Spi::get_one::<JsonB>(
"
SELECT data FROM tv_post WHERE pk_post = 1
",
)
.unwrap()
.unwrap();
let initial_comments = initial_data.0["comments"].as_array().unwrap();
assert_eq!(
initial_comments.len(),
2,
"Should have 2 comments initially"
);
Spi::run("UPDATE tb_comment SET text = 'Updated!' WHERE pk_comment = 1").unwrap();
let comment_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_comment'::regclass::oid")
.unwrap()
.unwrap();
crate::refresh::refresh_pk(comment_oid, 1).unwrap();
let post_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_post'::regclass::oid")
.unwrap()
.unwrap();
crate::refresh::refresh_pk(post_oid, 1).unwrap();
let updated_data = Spi::get_one::<JsonB>(
"
SELECT data FROM tv_post WHERE pk_post = 1
",
)
.unwrap()
.unwrap();
let comments = updated_data.0["comments"].as_array().unwrap();
assert_eq!(comments.len(), 2, "Should still have 2 comments");
let comment_1 = comments
.iter()
.find(|c| c["id"].as_i64() == Some(1))
.expect("Should find comment with id=1");
let comment_2 = comments
.iter()
.find(|c| c["id"].as_i64() == Some(2))
.expect("Should find comment with id=2");
assert_eq!(comment_1["text"], "Updated!", "Comment 1 should be updated");
assert_eq!(
comment_2["text"], "Thanks!",
"Comment 2 should be unchanged"
);
}
#[pg_test]
fn test_apply_patch_scalar() {
Spi::run("CREATE TABLE tb_category (pk_category BIGSERIAL PRIMARY KEY, name TEXT)")
.unwrap();
Spi::run(
"CREATE TABLE tb_post (
pk_post BIGSERIAL PRIMARY KEY,
fk_category BIGINT REFERENCES tb_category(pk_category),
title TEXT
)",
)
.unwrap();
Spi::run("INSERT INTO tb_category (pk_category, name) VALUES (1, 'Tech')").unwrap();
Spi::run("INSERT INTO tb_post (pk_post, fk_category, title) VALUES (1, 1, 'Hello')")
.unwrap();
Spi::run(
"
SELECT pg_tviews_create(
'post',
$$
SELECT pk_post, fk_category,
jsonb_build_object('title', title) AS data
FROM tb_post
$$
)
",
)
.unwrap();
let meta = crate::utils::spi_get_string(
"
SELECT dependency_types::text FROM pg_tview_meta
WHERE entity ='post'
",
)
.unwrap()
.unwrap();
assert!(
meta.contains("scalar"),
"Expected scalar dependency, got: {meta}"
);
let initial_data = Spi::get_one::<JsonB>(
"
SELECT data FROM tv_post WHERE pk_post = 1
",
)
.unwrap()
.unwrap();
assert_eq!(initial_data.0["title"], "Hello");
assert!(
initial_data.0.get("category").is_none(),
"Should not have category in data"
);
Spi::run("UPDATE tb_category SET name = 'Technology' WHERE pk_category = 1").unwrap();
let post_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_post'::regclass::oid")
.unwrap()
.unwrap();
crate::refresh::refresh_pk(post_oid, 1).unwrap();
let updated_data = Spi::get_one::<JsonB>(
"
SELECT data FROM tv_post WHERE pk_post = 1
",
)
.unwrap()
.unwrap();
assert_eq!(
updated_data.0["title"], "Hello",
"Title should be unchanged"
);
assert!(
updated_data.0.get("category").is_none(),
"Still no category in data"
);
}
#[pg_test]
fn test_smart_patch_full_integration() {
let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS jsonb_delta");
Spi::run("CREATE TABLE tb_user (pk_user BIGSERIAL PRIMARY KEY, name TEXT, email TEXT)")
.unwrap();
Spi::run(
"CREATE TABLE tb_post (
pk_post BIGSERIAL PRIMARY KEY,
fk_user BIGINT REFERENCES tb_user(pk_user),
title TEXT,
content TEXT
)",
)
.unwrap();
Spi::run(
"CREATE TABLE tb_comment (
pk_comment BIGSERIAL PRIMARY KEY,
fk_post BIGINT REFERENCES tb_post(pk_post),
fk_user BIGINT REFERENCES tb_user(pk_user),
text TEXT
)",
)
.unwrap();
Spi::run(
"INSERT INTO tb_user (pk_user, name, email) VALUES (1, 'Alice', 'alice@example.com')",
)
.unwrap();
Spi::run("INSERT INTO tb_user (pk_user, name, email) VALUES (2, 'Bob', 'bob@example.com')")
.unwrap();
Spi::run(
"INSERT INTO tb_post (pk_post, fk_user, title, content)
VALUES (1, 1, 'First Post', 'Hello World')",
)
.unwrap();
Spi::run(
"INSERT INTO tb_comment (pk_comment, fk_post, fk_user, text)
VALUES (1, 1, 1, 'Great post!')",
)
.unwrap();
Spi::run(
"INSERT INTO tb_comment (pk_comment, fk_post, fk_user, text)
VALUES (2, 1, 2, 'Thanks for sharing!')",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('user', $$
SELECT pk_user, jsonb_build_object('name', name, 'email', email) AS data
FROM tb_user
$$)
",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('comment', $$
SELECT pk_comment, fk_post, fk_user,
jsonb_build_object('text', text) AS data
FROM tb_comment
$$)
",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('post', $$
SELECT pk_post, fk_user,
jsonb_build_object(
'title', title,
'content', content,
'author', v_user.data,
'comments', COALESCE(
jsonb_agg(
v_comment.data
ORDER BY v_comment.pk_comment
),
'[]'::jsonb
)
) AS data
FROM tb_post
LEFT JOIN v_user ON v_user.pk_user = tb_post.fk_user
LEFT JOIN v_comment ON v_comment.fk_post = tb_post.pk_post
GROUP BY pk_post, fk_user, title, content, v_user.data
$$)
",
)
.unwrap();
let initial = Spi::get_one::<JsonB>("SELECT data FROM tv_post WHERE pk_post = 1")
.unwrap()
.unwrap();
assert_eq!(initial.0["title"], "First Post");
assert_eq!(initial.0["author"]["name"], "Alice");
assert_eq!(initial.0["comments"].as_array().unwrap().len(), 2);
Spi::run(
"UPDATE tb_user SET name = 'Alice Updated', email = 'alice.new@example.com'
WHERE pk_user = 1",
)
.unwrap();
let user_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_user'::regclass::oid")
.unwrap()
.unwrap();
crate::refresh::refresh_pk(user_oid, 1).unwrap();
let post_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_post'::regclass::oid")
.unwrap()
.unwrap();
crate::refresh::refresh_pk(post_oid, 1).unwrap();
let after_author_update =
Spi::get_one::<JsonB>("SELECT data FROM tv_post WHERE pk_post = 1")
.unwrap()
.unwrap();
assert_eq!(after_author_update.0["author"]["name"], "Alice Updated");
assert_eq!(
after_author_update.0["author"]["email"],
"alice.new@example.com"
);
assert_eq!(after_author_update.0["title"], "First Post");
assert_eq!(after_author_update.0["content"], "Hello World");
assert_eq!(
after_author_update.0["comments"].as_array().unwrap().len(),
2
);
Spi::run("UPDATE tb_comment SET text = 'Updated comment!' WHERE pk_comment = 1").unwrap();
let comment_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_comment'::regclass::oid")
.unwrap()
.unwrap();
crate::refresh::refresh_pk(comment_oid, 1).unwrap();
crate::refresh::refresh_pk(post_oid, 1).unwrap();
let after_comment_update =
Spi::get_one::<JsonB>("SELECT data FROM tv_post WHERE pk_post = 1")
.unwrap()
.unwrap();
let comments = after_comment_update.0["comments"].as_array().unwrap();
assert_eq!(comments.len(), 2, "Should still have 2 comments");
let comment_1 = comments
.iter()
.find(|c| c["id"].as_i64() == Some(1))
.expect("Should find comment 1");
assert_eq!(comment_1["text"], "Updated comment!");
let comment_2 = comments
.iter()
.find(|c| c["id"].as_i64() == Some(2))
.expect("Should find comment 2");
assert_eq!(comment_2["text"], "Thanks for sharing!");
}
#[pg_test]
fn test_fallback_without_jsonb_delta() {
let _ = Spi::run("DROP EXTENSION IF EXISTS jsonb_delta CASCADE");
Spi::run("CREATE TABLE tb_user (pk_user BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run(
"CREATE TABLE tb_post (
pk_post BIGSERIAL PRIMARY KEY,
fk_user BIGINT REFERENCES tb_user(pk_user),
title TEXT
)",
)
.unwrap();
Spi::run("INSERT INTO tb_user VALUES (1, 'Alice')").unwrap();
Spi::run("INSERT INTO tb_post VALUES (1, 1, 'Hello')").unwrap();
Spi::run(
"
SELECT pg_tviews_create('user', $$
SELECT pk_user, jsonb_build_object('name', name) AS data
FROM tb_user
$$)
",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('post', $$
SELECT pk_post, fk_user,
jsonb_build_object('title', title, 'author', v_user.data) AS data
FROM tb_post
LEFT JOIN v_user ON v_user.pk_user = tb_post.fk_user
$$)
",
)
.unwrap();
let meta = crate::utils::spi_get_string(
"
SELECT dependency_types::text FROM pg_tview_meta WHERE entity = 'post'
",
);
assert!(
meta.is_ok(),
"Metadata should be captured even without jsonb_delta"
);
Spi::run("UPDATE tb_user SET name = 'Alice Fallback' WHERE pk_user = 1").unwrap();
let user_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_user'::regclass::oid")
.unwrap()
.unwrap();
let result = crate::refresh::refresh_pk(user_oid, 1);
assert!(result.is_ok(), "Fallback should work without jsonb_delta");
let post_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_post'::regclass::oid")
.unwrap()
.unwrap();
crate::refresh::refresh_pk(post_oid, 1).unwrap();
let updated = Spi::get_one::<JsonB>("SELECT data FROM tv_post WHERE pk_post = 1")
.unwrap()
.unwrap();
assert_eq!(updated.0["author"]["name"], "Alice Fallback");
assert_eq!(updated.0["title"], "Hello");
}
#[pg_test]
fn test_legacy_tview_fallback() {
Spi::run("CREATE TABLE tb_user (pk_user BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run("INSERT INTO tb_user VALUES (1, 'Alice')").unwrap();
Spi::run(
"
SELECT pg_tviews_create('user', $$
SELECT pk_user, jsonb_build_object('name', name) AS data
FROM tb_user
$$)
",
)
.unwrap();
Spi::run(
"
UPDATE pg_tview_meta
SET dependency_types = NULL,
dependency_paths = NULL,
array_match_keys = NULL
WHERE entity ='user'
",
)
.unwrap();
Spi::run("UPDATE tb_user SET name = 'Alice Legacy' WHERE pk_user = 1").unwrap();
let user_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_user'::regclass::oid")
.unwrap()
.unwrap();
let result = crate::refresh::refresh_pk(user_oid, 1);
assert!(result.is_ok(), "Should handle legacy TVIEW gracefully");
let updated = Spi::get_one::<JsonB>("SELECT data FROM tv_user WHERE pk_user = 1")
.unwrap()
.unwrap();
assert_eq!(updated.0["name"], "Alice Legacy");
}
#[pg_test]
fn test_refresh_by_dedup_key_basic() {
Spi::run("CREATE TABLE tb_user (pk_user BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run(
"CREATE TABLE tb_post (
pk_post BIGSERIAL PRIMARY KEY,
fk_user BIGINT REFERENCES tb_user(pk_user),
title TEXT,
created_at TIMESTAMP DEFAULT NOW()
)",
)
.unwrap();
Spi::run("INSERT INTO tb_user VALUES (1, 'Alice')").unwrap();
Spi::run(
"INSERT INTO tb_post (pk_post, fk_user, title) VALUES
(1, 1, 'First Post'),
(2, 1, 'Second Post'),
(3, 1, 'Third Post')",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('user', $$
SELECT pk_user, jsonb_build_object('name', name) AS data
FROM tb_user
$$)
",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('post_by_user', $$
SELECT DISTINCT ON (fk_user)
pk_post, fk_user,
jsonb_build_object('title', title) AS data
FROM tb_post
ORDER BY fk_user, pk_post
$$, 'fk_user')
",
)
.unwrap();
let distinct_keys = crate::utils::spi_get_string(
"
SELECT distinct_on_keys::text FROM pg_tview_meta
WHERE entity = 'post_by_user'
",
)
.unwrap()
.unwrap();
assert!(
distinct_keys.contains("fk_user"),
"Should capture distinct_on_keys"
);
let initial_count: i64 = Spi::get_one(
"
SELECT COUNT(*) FROM tv_post_by_user WHERE fk_user = 1
",
)
.unwrap()
.unwrap();
assert_eq!(initial_count, 1, "Should have exactly 1 row for fk_user=1");
let initial_title: String = Spi::get_one(
"
SELECT data->>'title' FROM tv_post_by_user WHERE fk_user = 1
",
)
.unwrap()
.unwrap();
assert_eq!(
initial_title, "First Post",
"Should be first post initially"
);
}
#[pg_test]
fn test_refresh_by_dedup_key_multiple_keys() {
Spi::run("CREATE TABLE tb_category (pk_category BIGSERIAL PRIMARY KEY, name TEXT)")
.unwrap();
Spi::run(
"CREATE TABLE tb_item (
pk_item BIGSERIAL PRIMARY KEY,
fk_category BIGINT REFERENCES tb_category(pk_category),
title TEXT
)",
)
.unwrap();
Spi::run("INSERT INTO tb_category VALUES (1, 'Tech'), (2, 'News')").unwrap();
Spi::run(
"INSERT INTO tb_item (pk_item, fk_category, title) VALUES
(1, 1, 'Item 1A'),
(2, 1, 'Item 1B'),
(3, 1, 'Item 1C'),
(4, 2, 'Item 2A'),
(5, 2, 'Item 2B')",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('category', $$
SELECT pk_category, jsonb_build_object('name', name) AS data
FROM tb_category
$$)
",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('item_by_cat', $$
SELECT DISTINCT ON (fk_category)
pk_item, fk_category,
jsonb_build_object('title', title) AS data
FROM tb_item
ORDER BY fk_category, pk_item
$$, 'fk_category')
",
)
.unwrap();
let cat1_count: i64 = Spi::get_one(
"
SELECT COUNT(*) FROM tv_item_by_cat WHERE fk_category = 1
",
)
.unwrap()
.unwrap();
assert_eq!(cat1_count, 1, "Should have 1 row for category 1");
let cat1_title: String = Spi::get_one(
"
SELECT data->>'title' FROM tv_item_by_cat WHERE fk_category = 1
",
)
.unwrap()
.unwrap();
assert_eq!(cat1_title, "Item 1A", "Category 1 should show Item 1A");
Spi::run("DELETE FROM tb_item WHERE pk_item = 1").unwrap();
let view_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'v_item_by_cat'::regclass::oid")
.unwrap()
.unwrap();
let result = crate::refresh::refresh_by_dedup_key(view_oid, "1");
assert!(result.is_ok(), "First dedup key refresh should succeed");
let cat1_new_title: String = Spi::get_one(
"
SELECT data->>'title' FROM tv_item_by_cat WHERE fk_category = 1
",
)
.unwrap()
.unwrap();
assert_eq!(
cat1_new_title, "Item 1B",
"Category 1 should now show Item 1B"
);
Spi::run("DELETE FROM tb_item WHERE pk_item = 2").unwrap();
let result2 = crate::refresh::refresh_by_dedup_key(view_oid, "1");
assert!(
result2.is_ok(),
"Second dedup key refresh should succeed and reuse cache"
);
let cat1_final_title: String = Spi::get_one(
"
SELECT data->>'title' FROM tv_item_by_cat WHERE fk_category = 1
",
)
.unwrap()
.unwrap();
assert_eq!(
cat1_final_title, "Item 1C",
"Category 1 should now show Item 1C"
);
}
#[pg_test]
fn test_audit_buffer_and_flush() {
Spi::run("SET pg_tviews.audit_enabled = true").unwrap();
crate::audit::log_refresh("user", 5);
crate::audit::log_refresh("post", 3);
crate::audit::log_create("comment", "SELECT ...");
let count: i64 = Spi::get_one("SELECT COUNT(*) FROM pg_tview_audit_log")
.unwrap()
.unwrap_or(0);
assert_eq!(count, 0, "Buffer should not write to DB before flush");
crate::audit::flush_audit_buffer().unwrap();
let count: i64 = Spi::get_one("SELECT COUNT(*) FROM pg_tview_audit_log")
.unwrap()
.unwrap_or(0);
assert_eq!(count, 3, "Flush should write all buffered entries");
let refresh_count: i64 =
Spi::get_one("SELECT COUNT(*) FROM pg_tview_audit_log WHERE operation = 'REFRESH'")
.unwrap()
.unwrap_or(0);
assert_eq!(refresh_count, 2, "Should have 2 REFRESH entries");
let create_count: i64 =
Spi::get_one("SELECT COUNT(*) FROM pg_tview_audit_log WHERE operation = 'CREATE'")
.unwrap()
.unwrap_or(0);
assert_eq!(create_count, 1, "Should have 1 CREATE entry");
crate::audit::flush_audit_buffer().unwrap();
let count_after: i64 = Spi::get_one("SELECT COUNT(*) FROM pg_tview_audit_log")
.unwrap()
.unwrap_or(0);
assert_eq!(count_after, 3, "Second flush should be no-op");
}
#[pg_test]
fn test_audit_buffer_clear() {
Spi::run("SET pg_tviews.audit_enabled = true").unwrap();
crate::audit::log_refresh("user", 10);
crate::audit::log_drop("post");
crate::audit::clear_audit_buffer();
crate::audit::flush_audit_buffer().unwrap();
let count: i64 = Spi::get_one("SELECT COUNT(*) FROM pg_tview_audit_log")
.unwrap()
.unwrap_or(0);
assert_eq!(count, 0, "Cleared buffer should not produce any rows");
}
#[pg_test]
fn test_audit_disabled_skips_flush() {
Spi::run("SET pg_tviews.audit_enabled = false").unwrap();
crate::audit::log_refresh("user", 5);
crate::audit::flush_audit_buffer().unwrap();
let count: i64 = Spi::get_one("SELECT COUNT(*) FROM pg_tview_audit_log")
.unwrap()
.unwrap_or(0);
assert_eq!(count, 0, "Disabled audit should not write any rows");
}
#[pg_test]
fn test_missing_row_error_handling() {
Spi::run("CREATE TABLE tb_user (pk_user BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run(
"CREATE TABLE tb_post (
pk_post BIGSERIAL PRIMARY KEY,
fk_user BIGINT REFERENCES tb_user(pk_user),
title TEXT
)",
)
.unwrap();
Spi::run("INSERT INTO tb_user VALUES (1, 'Alice')").unwrap();
Spi::run("INSERT INTO tb_post VALUES (1, 1, 'Hello')").unwrap();
Spi::run(
"
SELECT pg_tviews_create('post', $$
SELECT pk_post, fk_user,
jsonb_build_object('title', title) AS data
FROM tb_post
$$)
",
)
.unwrap();
Spi::run("DELETE FROM tb_post WHERE pk_post = 1").unwrap();
let post_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_post'::regclass::oid")
.unwrap()
.unwrap();
let result = crate::refresh::refresh_pk(post_oid, 1);
assert!(
result.is_err(),
"Refresh should fail when backing row is missing"
);
let error_msg = format!("{:?}", result.unwrap_err());
assert!(
error_msg.contains("post")
|| error_msg.contains("v_post")
|| error_msg.contains("pk=1"),
"Error message should provide context about missing row"
);
}
#[pg_test]
fn test_null_data_column_error_handling() {
Spi::run("CREATE TABLE tb_item (pk_item BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run("INSERT INTO tb_item VALUES (1, 'Widget')").unwrap();
Spi::run(
"
SELECT pg_tviews_create('item', $$
SELECT pk_item,
CASE WHEN name = 'Widget' THEN jsonb_build_object('name', name)
ELSE NULL
END AS data
FROM tb_item
$$)
",
)
.unwrap();
let initial_data: Option<String> =
Spi::get_one("SELECT data::text FROM tv_item WHERE pk_item = 1").unwrap();
assert!(initial_data.is_some(), "Should have valid data initially");
Spi::run("UPDATE tb_item SET name = 'Widget-Modified' WHERE pk_item = 1").unwrap();
let item_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'tv_item'::regclass::oid")
.unwrap()
.unwrap();
let result = crate::refresh::refresh_pk(item_oid, 1);
assert!(
result.is_err(),
"Refresh should fail when data column is NULL"
);
let error_msg = format!("{:?}", result.unwrap_err());
assert!(
error_msg.to_lowercase().contains("null") || error_msg.contains("data"),
"Error should mention NULL or data column issue"
);
}
#[pg_test]
fn test_refresh_by_dedup_key_cache_invalidation() {
Spi::run(
"CREATE TABLE tb_post (
pk_post BIGSERIAL PRIMARY KEY,
title TEXT
)",
)
.unwrap();
Spi::run(
"INSERT INTO tb_post (pk_post, title) VALUES
(1, 'Post 1'),
(2, 'Post 2')",
)
.unwrap();
Spi::run(
"
SELECT pg_tviews_create('post_by_title', $$
SELECT DISTINCT ON (title)
pk_post,
jsonb_build_object('title', title) AS data
FROM tb_post
ORDER BY title, pk_post
$$, 'title')
",
)
.unwrap();
let view_oid: pgrx::pg_sys::Oid = Spi::get_one("SELECT 'v_post_by_title'::regclass::oid")
.unwrap()
.unwrap();
let result1 = crate::refresh::refresh_by_dedup_key(view_oid, "Post 1");
assert!(result1.is_ok(), "Initial refresh should succeed");
let result2 = crate::refresh::refresh_by_dedup_key(view_oid, "Post 2");
assert!(result2.is_ok(), "Second refresh should succeed");
}
}