use crate::cascade_path;
use crate::error::{TViewError, TViewResult};
use crate::schema::{TViewSchema, analyzer::analyze_dependencies, inference::infer_schema};
use crate::utils::quote_identifier;
use pgrx::datum::DatumWithOid;
use pgrx::pg_sys::Oid;
use pgrx::prelude::*;
fn current_schema() -> TViewResult<String> {
crate::utils::spi_get_string("SELECT current_schema()::text")
.map_err(|e| TViewError::CatalogError {
operation: "Get current schema".to_string(),
pg_error: e.to_string(),
})?
.ok_or_else(|| TViewError::CatalogError {
operation: "Get current schema".to_string(),
pg_error: "current_schema() returned NULL (no schema in search_path?)".to_string(),
})
}
fn expand_select_star_if_needed(select_sql: &str) -> TViewResult<String> {
let trimmed = select_sql.trim();
let lower = trimmed.to_lowercase();
let after_kw = lower.strip_prefix("select").unwrap_or("").trim_start();
let after_star = match after_kw.strip_prefix('*') {
Some(rest) => rest.trim_start(),
None => return Ok(select_sql.to_string()),
};
let after_from = match after_star.strip_prefix("from") {
Some(rest)
if rest
.chars()
.next()
.map(|c| c.is_ascii_whitespace())
.unwrap_or(true) =>
{
rest.trim_start()
}
_ => return Ok(select_sql.to_string()),
};
let source_qualified = after_from
.trim_end_matches(';')
.trim()
.split_ascii_whitespace()
.next()
.unwrap_or("");
if source_qualified.is_empty() {
return Ok(select_sql.to_string());
}
let (schema_name, table_name) = match source_qualified.split_once('.') {
Some((s, t)) => (
Some(s.trim_matches('"').to_string()),
t.trim_matches('"').to_string(),
),
None => (None, source_qualified.trim_matches('"').to_string()),
};
let columns: Vec<String> = if let Some(ref schema) = schema_name {
let args = vec![
unsafe {
pgrx::datum::DatumWithOid::new(
table_name.as_str(),
pgrx::prelude::PgOid::BuiltIn(pgrx::prelude::PgBuiltInOids::TEXTOID).value(),
)
},
unsafe {
pgrx::datum::DatumWithOid::new(
schema.as_str(),
pgrx::prelude::PgOid::BuiltIn(pgrx::prelude::PgBuiltInOids::TEXTOID).value(),
)
},
];
pgrx::prelude::Spi::connect(|client| {
let rows = client.select(
"SELECT column_name::text \
FROM information_schema.columns \
WHERE table_name = $1 AND table_schema = $2 \
ORDER BY ordinal_position",
None,
&args,
)?;
let mut result = Vec::new();
for row in rows {
if let Some(col) =
row[1]
.value::<String>()
.map_err(|e| TViewError::CatalogError {
operation: "expand_select_star: read column_name".to_string(),
pg_error: format!("{e:?}"),
})?
{
result.push(col);
}
}
Ok(result)
})
.map_err(|e: pgrx::spi::Error| TViewError::SpiError {
query: "expand_select_star: information_schema query".to_string(),
error: e.to_string(),
})?
} else {
let args = vec![unsafe {
pgrx::datum::DatumWithOid::new(
table_name.as_str(),
pgrx::prelude::PgOid::BuiltIn(pgrx::prelude::PgBuiltInOids::TEXTOID).value(),
)
}];
pgrx::prelude::Spi::connect(|client| {
let rows = client.select(
"SELECT column_name::text \
FROM information_schema.columns \
WHERE table_name = $1 \
ORDER BY ordinal_position",
None,
&args,
)?;
let mut result = Vec::new();
for row in rows {
if let Some(col) =
row[1]
.value::<String>()
.map_err(|e| TViewError::CatalogError {
operation: "expand_select_star: read column_name".to_string(),
pg_error: format!("{e:?}"),
})?
{
result.push(col);
}
}
Ok(result)
})
.map_err(|e: pgrx::spi::Error| TViewError::SpiError {
query: "expand_select_star: information_schema query".to_string(),
error: e.to_string(),
})?
};
if columns.is_empty() {
return Ok(select_sql.to_string());
}
let col_list = columns.join(", ");
Ok(format!("SELECT {col_list} FROM {source_qualified}"))
}
pub fn create_tview(
tview_name: &str,
select_sql: &str,
schema_override: Option<&str>,
defer_populate: bool,
) -> TViewResult<()> {
let exists = tview_exists(tview_name)?;
if exists {
return Err(TViewError::TViewAlreadyExists {
name: tview_name.to_string(),
});
}
let entity_name = tview_name
.strip_prefix("tv_")
.map_or(tview_name, |stripped| stripped);
let select_sql = expand_select_star_if_needed(select_sql)?;
let select_sql = select_sql.as_str();
let schema = infer_schema(select_sql)?;
let (final_select_sql, final_schema) = if schema.entity_name.is_none() {
transform_raw_select_to_tview(entity_name, select_sql)?
} else {
(select_sql.to_string(), schema)
};
let entity_name =
final_schema
.entity_name
.as_ref()
.ok_or_else(|| TViewError::RequiredColumnMissing {
column_name: format!(
"pk_{}",
tview_name.strip_prefix("tv_").unwrap_or(tview_name)
),
context: "pg_tviews requires a Trinity Pattern primary key column named \
\"pk_<entity>\" (e.g., pk_user, pk_post)"
.to_string(),
})?;
crate::validation::validate_sql_identifier(entity_name, "entity_name")?;
let tv_table_name = format!("tv_{entity_name}");
let schema_name = match schema_override {
Some(s) => s.to_string(),
None => current_schema()?,
};
let view_name = format!("v_{entity_name}");
create_backing_view(&view_name, &final_select_sql, &schema_name)?;
let distinct_on_keys =
crate::schema::parser::extract_distinct_on_keys(&final_select_sql).unwrap_or_default();
create_materialized_table(
&tv_table_name,
&final_schema,
&schema_name,
&distinct_on_keys,
)?;
if defer_populate {
crate::hooks::enqueue_pending_populate(&tv_table_name, &view_name, &schema_name);
} else {
populate_initial_data(&tv_table_name, &view_name, &final_schema, &schema_name)?;
}
let dep_graph = crate::dependency::find_base_tables(&view_name, Some(&schema_name))?;
let cascade_paths = extract_and_resolve_cascade_paths(
&final_select_sql,
entity_name,
&final_schema,
&dep_graph.base_tables,
)?;
register_metadata(
entity_name,
&view_name,
&tv_table_name,
&final_select_sql,
&final_schema,
&cascade_paths,
&schema_name,
&distinct_on_keys,
)?;
if dep_graph.base_tables.is_empty() {
warning!("No base table dependencies found for {}", tv_table_name);
} else {
crate::dependency::install_triggers(&dep_graph.base_tables, entity_name)?;
}
crate::queue::cache::invalidate_all_caches();
crate::audit::log_create(entity_name, select_sql);
if let Err(e) = crate::audit::flush_audit_buffer() {
warning!("Failed to flush audit after CREATE: {}", e);
}
Ok(())
}
fn extract_and_resolve_cascade_paths(
select_sql: &str,
entity_name: &str,
_schema: &TViewSchema,
base_table_oids: &[pg_sys::Oid],
) -> TViewResult<Vec<cascade_path::CascadePath>> {
let root_table = format!("tb_{entity_name}");
let join_paths = match crate::sql_parser::extract_join_paths(select_sql, &root_table) {
Ok(paths) => paths,
Err(e) => {
notice!("Could not parse JOIN tree for cascade paths: {e}");
return Ok(vec![]);
}
};
if join_paths.is_empty() {
return Ok(vec![]);
}
let oid_map = build_oid_name_map(base_table_oids)?;
let mut cascade_paths = Vec::new();
for jp in &join_paths {
match resolve_join_path(jp, entity_name, &oid_map) {
Ok(cp) => cascade_paths.push(cp),
Err(e) => {
notice!(
"Cascade path from '{}' unresolvable: {e} — changes to this table will not trigger incremental refresh of '{entity_name}'",
jp.source_table
);
}
}
}
Ok(cascade_paths)
}
fn build_oid_name_map(
oids: &[pg_sys::Oid],
) -> TViewResult<std::collections::HashMap<String, pg_sys::Oid>> {
use std::collections::HashMap;
if oids.is_empty() {
return Ok(HashMap::new());
}
let oid_list = oids
.iter()
.map(|o| o.to_u32().to_string())
.collect::<Vec<_>>()
.join(",");
let query = format!("SELECT oid, relname::text FROM pg_class WHERE oid IN ({oid_list})");
let mut map = HashMap::new();
Spi::connect(|client| {
let rows = client.select(&query, None, &[])?;
for row in rows {
let oid: pg_sys::Oid = row["oid"].value()?.unwrap_or(pg_sys::Oid::INVALID);
let name: String = row["relname"].value()?.unwrap_or_default();
map.insert(name, oid);
}
Ok::<_, spi::Error>(())
})?;
Ok(map)
}
fn resolve_join_path(
jp: &crate::sql_parser::JoinPath,
entity_name: &str,
oid_map: &std::collections::HashMap<String, pg_sys::Oid>,
) -> TViewResult<cascade_path::CascadePath> {
let source_oid = *oid_map
.get(&jp.source_table)
.ok_or_else(|| TViewError::CatalogError {
operation: "resolve cascade path".to_string(),
pg_error: format!(
"Table '{}' not found in base table dependencies",
jp.source_table
),
})?;
let mut hops = Vec::with_capacity(jp.steps.len());
for step in &jp.steps {
let table_oid = *oid_map
.get(&step.table_name)
.ok_or_else(|| TViewError::CatalogError {
operation: "resolve cascade path hop".to_string(),
pg_error: format!(
"Intermediate table '{}' not found in base table dependencies",
step.table_name
),
})?;
hops.push(cascade_path::CascadeHop {
table_oid,
table_name: step.table_name.clone(),
lookup_col: step.lookup_col.clone(),
carry_col: step.carry_col.clone(),
});
}
let pk_col = format!("pk_{entity_name}");
if jp.root_join_col != pk_col {
let root_table = format!("tb_{entity_name}");
let root_oid = *oid_map
.get(&root_table)
.ok_or_else(|| TViewError::CatalogError {
operation: "resolve cascade path reverse-lookup hop".to_string(),
pg_error: format!(
"Root table '{root_table}' not found in base table dependencies"
),
})?;
hops.push(cascade_path::CascadeHop {
table_oid: root_oid,
table_name: root_table,
lookup_col: jp.root_join_col.clone(),
carry_col: pk_col,
});
}
Ok(cascade_path::CascadePath {
source_oid,
source_table: jp.source_table.clone(),
entity_name: entity_name.to_string(),
initial_col: jp.initial_col.clone(),
hops,
unresolvable: false,
})
}
fn tview_exists(tview_name: &str) -> TViewResult<bool> {
let entity_name = tview_name.trim_start_matches("tv_");
let args = vec![unsafe {
DatumWithOid::new(entity_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value())
}];
Spi::get_one_with_args::<bool>(
"SELECT COUNT(*) > 0 FROM pg_tview_meta WHERE entity = $1",
&args,
)
.map_err(|e| TViewError::CatalogError {
operation: format!("Check TVIEW exists: {tview_name}"),
pg_error: format!("{e:?}"),
})
.map(|opt| opt.unwrap_or(false))
}
fn create_backing_view(view_name: &str, select_sql: &str, schema_name: &str) -> TViewResult<()> {
let qi_schema = quote_identifier(schema_name);
let qi_view = quote_identifier(view_name);
let create_view_sql = format!("CREATE VIEW {qi_schema}.{qi_view} AS {select_sql}");
crate::utils::spi_run_ddl(&create_view_sql).map_err(|e| TViewError::SpiError {
query: create_view_sql.clone(),
error: e,
})?;
let check_args = vec![
unsafe { DatumWithOid::new(view_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value()) },
unsafe { DatumWithOid::new(schema_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value()) },
];
let exists = Spi::get_one_with_args::<i32>(
"SELECT 1 FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = $1 AND n.nspname = $2 AND c.relkind = 'v'",
&check_args,
)
.map_err(|e| TViewError::SpiError {
query: format!("Check view {schema_name}.{view_name} exists"),
error: e.to_string(),
})?
.is_some();
if !exists {
return Err(TViewError::CatalogError {
operation: format!("Create view {schema_name}.{view_name}"),
pg_error: "View was not created".to_string(),
});
}
Ok(())
}
fn scalar_pg_type_to_sql(pg_type: &str) -> &'static str {
match pg_type {
"boolean" => "BOOLEAN",
"uuid" => "UUID",
"jsonb" => "JSONB",
"json" => "JSON",
"bigint" | "int8" => "BIGINT",
"integer" | "int4" | "int" => "INTEGER",
"smallint" | "int2" => "SMALLINT",
"numeric" | "decimal" => "NUMERIC",
"real" | "float4" => "REAL",
"double precision" | "float8" => "DOUBLE PRECISION",
"timestamp with time zone" | "timestamptz" => "TIMESTAMPTZ",
"timestamp without time zone" | "timestamp" => "TIMESTAMP",
"date" => "DATE",
"time with time zone" | "timetz" => "TIMETZ",
"time without time zone" | "time" => "TIME",
"interval" => "INTERVAL",
"inet" => "INET",
"cidr" => "CIDR",
"macaddr" => "MACADDR",
"macaddr8" => "MACADDR8",
"bytea" => "BYTEA",
"tsvector" => "TSVECTOR",
"tsquery" => "TSQUERY",
"int4range" => "INT4RANGE",
"int8range" => "INT8RANGE",
"numrange" => "NUMRANGE",
"tsrange" => "TSRANGE",
"tstzrange" => "TSTZRANGE",
"daterange" => "DATERANGE",
"point" => "POINT",
"line" => "LINE",
"lseg" => "LSEG",
"box" => "BOX",
"path" => "PATH",
"polygon" => "POLYGON",
"circle" => "CIRCLE",
"ltree" => "LTREE",
"lquery" => "LQUERY",
"ltxtquery" => "LTXTQUERY",
"geometry" => "GEOMETRY",
"geography" => "GEOGRAPHY",
"hstore" => "HSTORE",
"citext" => "CITEXT",
"money" => "MONEY",
"bit" => "BIT",
"bit varying" => "BIT VARYING",
"xml" => "XML",
_ => "TEXT",
}
}
fn resolve_pg_column_type(data_type: &str, udt_name: Option<&str>) -> String {
match data_type {
"USER-DEFINED" => {
udt_name
.map(|u| scalar_pg_type_to_sql(u).to_string())
.unwrap_or_else(|| "TEXT".to_string())
}
"ARRAY" => {
let element_sql = udt_name
.and_then(|u| u.strip_prefix('_'))
.map(scalar_pg_type_to_sql)
.unwrap_or("TEXT");
format!("{element_sql}[]")
}
other => scalar_pg_type_to_sql(other).to_string(),
}
}
fn create_materialized_table(
tview_name: &str,
schema: &TViewSchema,
schema_name: &str,
distinct_on_keys: &[String],
) -> TViewResult<()> {
let qi_schema = quote_identifier(schema_name);
let qi_tview = quote_identifier(tview_name);
let first_dedup = distinct_on_keys.first().map(String::as_str);
let is_distinct_on = first_dedup.is_some();
let mut columns = Vec::new();
if let Some(pk) = &schema.pk_column {
if is_distinct_on && first_dedup != Some(pk.as_str()) {
columns.push(format!("{} BIGINT", quote_identifier(pk)));
} else if is_distinct_on {
columns.push(format!("{} BIGINT PRIMARY KEY", quote_identifier(pk)));
} else {
columns.push(format!("{} BIGINT PRIMARY KEY", quote_identifier(pk)));
}
}
if let Some(id) = &schema.id_column {
if first_dedup == Some(id.as_str()) {
columns.push(format!("{} UUID PRIMARY KEY", quote_identifier(id)));
} else {
columns.push(format!("{} UUID NOT NULL", quote_identifier(id)));
}
}
if let Some(identifier) = &schema.identifier_column {
columns.push(format!("{} TEXT", quote_identifier(identifier)));
}
if let Some(data) = &schema.data_column {
columns.push(format!("{} JSONB", quote_identifier(data)));
}
for fk in &schema.fk_columns {
columns.push(format!("{} BIGINT", quote_identifier(fk)));
}
let entity = tview_name.strip_prefix("tv_").unwrap_or(tview_name);
let view_name_for_types = format!("v_{entity}");
let actual_col_types: std::collections::HashMap<String, String> = {
let args = vec![
unsafe {
DatumWithOid::new(
view_name_for_types.as_str(),
PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value(),
)
},
unsafe {
DatumWithOid::new(schema_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value())
},
];
Spi::connect(|client| {
let rows = client.select(
"SELECT column_name::text, data_type::text, udt_name::text \
FROM information_schema.columns \
WHERE table_name = $1 AND table_schema = $2",
None,
&args,
)?;
let mut map = std::collections::HashMap::new();
for row in rows {
let name = row[1].value::<String>()?;
let data_type = row[2].value::<String>()?;
let udt_name = row[3].value::<String>()?;
if let Some(n) = name {
let sql_type = resolve_pg_column_type(
data_type.as_deref().unwrap_or("text"),
udt_name.as_deref(),
);
map.insert(n, sql_type);
}
}
Ok::<std::collections::HashMap<String, String>, pgrx::spi::Error>(map)
})
.unwrap_or_default()
};
for uuid_fk in &schema.uuid_fk_columns {
let sql_type = actual_col_types
.get(uuid_fk.as_str())
.map(String::as_str)
.unwrap_or("UUID"); columns.push(format!("{} {sql_type}", quote_identifier(uuid_fk)));
}
for (col_name, col_type) in &schema.additional_columns_with_types {
let effective_type = actual_col_types
.get(col_name.as_str())
.map(String::as_str)
.unwrap_or(col_type.as_str());
let qi_col = quote_identifier(col_name);
if first_dedup == Some(col_name.as_str()) {
columns.push(format!("{qi_col} {effective_type} PRIMARY KEY"));
} else {
columns.push(format!("{qi_col} {effective_type}"));
}
}
columns.push("created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()".to_string());
columns.push("updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()".to_string());
let columns_sql = columns.join(",\n ");
let unlogged_keyword = if crate::config::unlogged_by_default() {
"UNLOGGED "
} else {
""
};
let create_table_sql =
format!("CREATE {unlogged_keyword}TABLE {qi_schema}.{qi_tview} (\n {columns_sql}\n)");
crate::utils::spi_run_ddl(&create_table_sql).map_err(|e| TViewError::SpiError {
query: create_table_sql,
error: e,
})?;
create_tview_indexes(tview_name, schema, schema_name)?;
Ok(())
}
fn create_tview_indexes(
tview_name: &str,
schema: &TViewSchema,
schema_name: &str,
) -> TViewResult<()> {
let qi_schema = quote_identifier(schema_name);
let qi_tview = quote_identifier(tview_name);
if let Some(id) = &schema.id_column {
let idx_name = format!("idx_{tview_name}_{id}");
let create_idx = format!(
"CREATE INDEX {} ON {qi_schema}.{qi_tview} ({})",
quote_identifier(&idx_name),
quote_identifier(id),
);
crate::utils::spi_run_ddl(&create_idx).map_err(|e| TViewError::SpiError {
query: create_idx.clone(),
error: e,
})?;
}
for uuid_fk in &schema.uuid_fk_columns {
let idx_name = format!("idx_{tview_name}_{uuid_fk}");
let create_idx = format!(
"CREATE INDEX {} ON {qi_schema}.{qi_tview} ({})",
quote_identifier(&idx_name),
quote_identifier(uuid_fk),
);
crate::utils::spi_run_ddl(&create_idx).map_err(|e| TViewError::SpiError {
query: create_idx.clone(),
error: e,
})?;
}
if let Some(data) = &schema.data_column {
let idx_name = format!("idx_{tview_name}_{data}_gin");
let create_idx = format!(
"CREATE INDEX {} ON {qi_schema}.{qi_tview} USING GIN ({})",
quote_identifier(&idx_name),
quote_identifier(data),
);
crate::utils::spi_run_ddl(&create_idx).map_err(|e| TViewError::SpiError {
query: create_idx.clone(),
error: e,
})?;
}
Ok(())
}
fn populate_initial_data(
tview_name: &str,
view_name: &str,
_schema: &TViewSchema,
schema_name: &str,
) -> TViewResult<()> {
let view_oid = Spi::get_one::<Oid>(&format!(
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname::text = '{}' AND n.nspname::text = '{}' AND c.relkind = 'v'",
view_name, schema_name
))?
.ok_or_else(|| TViewError::CatalogError {
operation: format!("Find view {view_name} in schema {schema_name}"),
pg_error: "View not found".to_string(),
})?;
let view_columns = crate::utils::get_view_columns_by_oid(view_oid)?;
if view_columns.is_empty() {
return Err(TViewError::CatalogError {
operation: format!("Get columns for view {view_name}"),
pg_error: "View has no selectable columns".to_string(),
});
}
let insert_columns = view_columns;
let qi_schema = quote_identifier(schema_name);
let qi_tview = quote_identifier(tview_name);
let qi_view = quote_identifier(view_name);
let col_list = insert_columns
.iter()
.map(|c| quote_identifier(c))
.collect::<Vec<_>>()
.join(", ");
let insert_sql = format!(
"INSERT INTO {qi_schema}.{qi_tview} ({col_list}) \
SELECT {col_list} FROM {qi_schema}.{qi_view}"
);
Spi::run(&insert_sql).map_err(|e| TViewError::SpiError {
query: insert_sql,
error: e.to_string(),
})?;
Ok(())
}
fn pg_array_elem(s: &str) -> String {
if s.is_empty() || s.contains([',', '"', '\\', '{', '}', ' ']) {
format!("\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))
} else {
s.to_string()
}
}
#[allow(clippy::too_many_arguments)] fn register_metadata(
entity_name: &str,
view_name: &str,
tview_name: &str,
definition_sql: &str,
schema: &TViewSchema,
cascade_paths: &[cascade_path::CascadePath],
schema_name: &str,
distinct_on_keys: &[String],
) -> TViewResult<()> {
let is_union = {
let sql_lower = definition_sql.to_lowercase();
crate::schema::parser::find_outer_union(&sql_lower, 0).is_some()
};
let dep_infos = analyze_dependencies(definition_sql, &schema.fk_columns);
let fk_columns = schema
.fk_columns
.iter()
.map(|s| pg_array_elem(s))
.collect::<Vec<_>>()
.join(",");
let uuid_fk_columns = schema
.uuid_fk_columns
.iter()
.map(|s| pg_array_elem(s))
.collect::<Vec<_>>()
.join(",");
let dep_types = dep_infos
.iter()
.map(|d| pg_array_elem(d.dep_type.as_str()))
.collect::<Vec<_>>()
.join(",");
let dep_paths = dep_infos
.iter()
.map(|d| {
pg_array_elem(
&d.jsonb_path
.as_ref()
.map_or_else(String::new, |path| path.join(".")),
)
})
.collect::<Vec<_>>()
.join(",");
let array_keys = dep_infos
.iter()
.map(|d| pg_array_elem(&d.array_match_key.clone().unwrap_or_default()))
.collect::<Vec<_>>()
.join(",");
let cascade_paths_str = cascade_paths
.iter()
.map(|path| {
let json = serde_json::to_string(path).expect("Failed to serialize cascade path");
pg_array_elem(&json)
})
.collect::<Vec<_>>()
.join(",");
let cascade_paths_literal = format!("'{{{cascade_paths_str}}}'");
let view_oid_args = vec![
unsafe { DatumWithOid::new(view_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value()) },
unsafe { DatumWithOid::new(schema_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value()) },
];
let view_oid_result = Spi::get_one_with_args::<pg_sys::Oid>(
"SELECT c.oid FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = $1 AND n.nspname = $2 AND c.relkind = 'v'",
&view_oid_args,
)
.map_err(|e| TViewError::CatalogError {
operation: format!("Get OID for view {schema_name}.{view_name}"),
pg_error: e.to_string(),
})?;
let table_oid_args = vec![
unsafe { DatumWithOid::new(tview_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value()) },
unsafe { DatumWithOid::new(schema_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value()) },
];
let table_oid_result = Spi::get_one_with_args::<pg_sys::Oid>(
"SELECT c.oid FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = $1 AND n.nspname = $2 AND c.relkind = 'r'",
&table_oid_args,
)
.map_err(|e| TViewError::CatalogError {
operation: format!("Get OID for table {schema_name}.{tview_name}"),
pg_error: e.to_string(),
})?;
let view_oid = view_oid_result.ok_or_else(|| TViewError::CatalogError {
operation: format!("Find view {schema_name}.{view_name}"),
pg_error: "View OID not found".to_string(),
})?;
let table_oid = table_oid_result.ok_or_else(|| TViewError::CatalogError {
operation: format!("Find table {schema_name}.{tview_name}"),
pg_error: "Table OID not found".to_string(),
})?;
let distinct_on_str = distinct_on_keys
.iter()
.map(|s| pg_array_elem(s))
.collect::<Vec<_>>()
.join(",");
let insert_meta_sql = format!(
"INSERT INTO pg_tview_meta (
entity,
view_oid,
table_oid,
definition,
cascade_paths,
fk_columns,
uuid_fk_columns,
dependency_types,
dependency_paths,
array_match_keys,
distinct_on_keys,
is_union
) VALUES ($1, {}, {}, $2, {}, '{{{}}}', '{{{}}}', '{{{}}}', '{{{}}}', '{{{}}}', '{{{}}}', {})
ON CONFLICT (entity) DO NOTHING",
view_oid.to_u32(),
table_oid.to_u32(),
cascade_paths_literal,
fk_columns,
uuid_fk_columns,
dep_types,
dep_paths,
array_keys,
distinct_on_str,
is_union
);
let args = [
unsafe { DatumWithOid::new(entity_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value()) },
unsafe {
DatumWithOid::new(
definition_sql,
PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value(),
)
},
];
Spi::run_with_args(&insert_meta_sql, &args).map_err(|e| TViewError::SpiError {
query: insert_meta_sql,
error: e.to_string(),
})?;
Ok(())
}
fn transform_raw_select_to_tview(
entity_name: &str,
select_sql: &str,
) -> TViewResult<(String, TViewSchema)> {
let temp_view_name = format!("_temp_raw_{entity_name}");
let qi_temp_view = quote_identifier(&temp_view_name);
let create_temp = format!("CREATE TEMP VIEW {qi_temp_view} AS {select_sql}");
crate::utils::spi_run_ddl(&create_temp).map_err(|e| TViewError::SpiError {
query: create_temp.clone(),
error: e,
})?;
let get_columns_sql = "SELECT column_name::text, data_type::text
FROM information_schema.columns
WHERE table_name = $1
ORDER BY ordinal_position";
let temp_view_args = vec![unsafe {
DatumWithOid::new(
temp_view_name.as_str(),
PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value(),
)
}];
let columns: Vec<(String, String)> = Spi::connect(|client| {
let rows = client.select(get_columns_sql, None, &temp_view_args)?;
let mut result = Vec::new();
for row in rows {
let col_name: String = row[1].value()?.ok_or_else(|| {
spi::Error::from(crate::TViewError::SpiError {
query: get_columns_sql.to_string(),
error: "column name is NULL".to_string(),
})
})?;
let data_type: String = row[2].value()?.ok_or_else(|| {
spi::Error::from(crate::TViewError::SpiError {
query: get_columns_sql.to_string(),
error: "data type is NULL".to_string(),
})
})?;
result.push((col_name, data_type));
}
Ok(result)
})
.map_err(|e: spi::Error| TViewError::CatalogError {
operation: "Get columns from temp view".to_string(),
pg_error: format!("{e:?}"),
})?;
crate::utils::spi_run_ddl(&format!("DROP VIEW {qi_temp_view}")).ok();
let pk_source_col = columns
.iter()
.find(|(name, _)| name == "id")
.or_else(|| {
columns
.iter()
.find(|(_, typ)| typ.contains("int") || typ.contains("serial"))
})
.map(|(name, _)| name.clone())
.ok_or_else(|| TViewError::InvalidSelectStatement {
sql: select_sql.to_string(),
reason: "No suitable primary key column found (need 'id' or an integer column)"
.to_string(),
})?;
let _source_columns: Vec<String> = columns
.iter()
.map(|(name, _)| format!("source.{name}"))
.collect();
let data_columns: Vec<String> = columns
.iter()
.map(|(name, _)| format!("'{name}', source.{name}"))
.collect();
let transformed_select = format!(
"SELECT
source.{} AS pk_{},
gen_random_uuid() AS id,
jsonb_build_object({}) AS data
FROM ({}) AS source",
pk_source_col,
entity_name,
data_columns.join(", "),
select_sql
);
let schema = infer_schema(&transformed_select)?;
Ok((transformed_select, schema))
}
#[cfg(any(test, feature = "pg_test"))]
#[pgrx::pg_schema]
mod tests {
use pgrx::prelude::*;
#[test]
fn test_scalar_pg_type_boolean() {
assert_eq!(super::scalar_pg_type_to_sql("boolean"), "BOOLEAN");
}
#[test]
fn test_scalar_pg_type_uuid() {
assert_eq!(super::scalar_pg_type_to_sql("uuid"), "UUID");
}
#[test]
fn test_scalar_pg_type_numeric() {
assert_eq!(super::scalar_pg_type_to_sql("bigint"), "BIGINT");
assert_eq!(super::scalar_pg_type_to_sql("int8"), "BIGINT");
assert_eq!(super::scalar_pg_type_to_sql("integer"), "INTEGER");
assert_eq!(super::scalar_pg_type_to_sql("int4"), "INTEGER");
assert_eq!(super::scalar_pg_type_to_sql("smallint"), "SMALLINT");
assert_eq!(super::scalar_pg_type_to_sql("numeric"), "NUMERIC");
assert_eq!(super::scalar_pg_type_to_sql("decimal"), "NUMERIC");
}
#[test]
fn test_scalar_pg_type_floating_point() {
assert_eq!(super::scalar_pg_type_to_sql("real"), "REAL");
assert_eq!(super::scalar_pg_type_to_sql("float4"), "REAL");
assert_eq!(
super::scalar_pg_type_to_sql("double precision"),
"DOUBLE PRECISION"
);
assert_eq!(super::scalar_pg_type_to_sql("float8"), "DOUBLE PRECISION");
}
#[test]
fn test_scalar_pg_type_temporal() {
assert_eq!(
super::scalar_pg_type_to_sql("timestamp with time zone"),
"TIMESTAMPTZ"
);
assert_eq!(
super::scalar_pg_type_to_sql("timestamp without time zone"),
"TIMESTAMP"
);
assert_eq!(super::scalar_pg_type_to_sql("date"), "DATE");
assert_eq!(super::scalar_pg_type_to_sql("time"), "TIME");
}
#[test]
fn test_scalar_pg_type_json() {
assert_eq!(super::scalar_pg_type_to_sql("jsonb"), "JSONB");
assert_eq!(super::scalar_pg_type_to_sql("json"), "JSON");
}
#[test]
fn test_scalar_pg_type_extensions() {
assert_eq!(super::scalar_pg_type_to_sql("ltree"), "LTREE");
assert_eq!(super::scalar_pg_type_to_sql("lquery"), "LQUERY");
assert_eq!(super::scalar_pg_type_to_sql("geometry"), "GEOMETRY");
assert_eq!(super::scalar_pg_type_to_sql("geography"), "GEOGRAPHY");
assert_eq!(super::scalar_pg_type_to_sql("hstore"), "HSTORE");
assert_eq!(super::scalar_pg_type_to_sql("citext"), "CITEXT");
}
#[test]
fn test_scalar_pg_type_unknown_fallback() {
assert_eq!(super::scalar_pg_type_to_sql("unknown_type"), "TEXT");
assert_eq!(super::scalar_pg_type_to_sql(""), "TEXT");
}
#[test]
fn test_resolve_pg_column_type_builtin_scalar() {
assert_eq!(super::resolve_pg_column_type("boolean", None), "BOOLEAN");
assert_eq!(super::resolve_pg_column_type("uuid", None), "UUID");
assert_eq!(super::resolve_pg_column_type("bigint", None), "BIGINT");
assert_eq!(super::resolve_pg_column_type("text", None), "TEXT");
}
#[test]
fn test_resolve_pg_column_type_user_defined() {
assert_eq!(
super::resolve_pg_column_type("USER-DEFINED", Some("ltree")),
"LTREE"
);
assert_eq!(
super::resolve_pg_column_type("USER-DEFINED", Some("geometry")),
"GEOMETRY"
);
assert_eq!(
super::resolve_pg_column_type("USER-DEFINED", Some("hstore")),
"HSTORE"
);
}
#[test]
fn test_resolve_pg_column_type_user_defined_unknown() {
assert_eq!(
super::resolve_pg_column_type("USER-DEFINED", Some("custom_type")),
"TEXT"
);
}
#[test]
fn test_resolve_pg_column_type_user_defined_missing_udt_name() {
assert_eq!(super::resolve_pg_column_type("USER-DEFINED", None), "TEXT");
}
#[test]
fn test_resolve_pg_column_type_array_uuid() {
assert_eq!(
super::resolve_pg_column_type("ARRAY", Some("_uuid")),
"UUID[]"
);
}
#[test]
fn test_resolve_pg_column_type_array_text() {
assert_eq!(
super::resolve_pg_column_type("ARRAY", Some("_text")),
"TEXT[]"
);
}
#[test]
fn test_resolve_pg_column_type_array_integer() {
assert_eq!(
super::resolve_pg_column_type("ARRAY", Some("_int4")),
"INTEGER[]"
);
}
#[test]
fn test_resolve_pg_column_type_array_ltree() {
assert_eq!(
super::resolve_pg_column_type("ARRAY", Some("_ltree")),
"LTREE[]"
);
}
#[test]
fn test_resolve_pg_column_type_array_missing_udt_name() {
assert_eq!(super::resolve_pg_column_type("ARRAY", None), "TEXT[]");
}
#[test]
fn test_resolve_pg_column_type_array_unknown_element() {
assert_eq!(
super::resolve_pg_column_type("ARRAY", Some("_unknown")),
"TEXT[]"
);
}
#[test]
fn test_tview_exists_non_existent() {
}
#[pg_test]
fn test_create_tview_respects_search_path() {
Spi::run("CREATE SCHEMA tview_test_ns").unwrap();
Spi::run("SET search_path TO tview_test_ns, public").unwrap();
Spi::run("CREATE TABLE tb_item (pk_item BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run("INSERT INTO tb_item VALUES (1, 'Widget')").unwrap();
Spi::run(
"SELECT pg_tviews_create('item', $$
SELECT pk_item, jsonb_build_object('name', name) AS data
FROM tb_item
$$)",
)
.unwrap();
let in_target = Spi::get_one::<bool>(
"SELECT COUNT(*) > 0 FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = 'tv_item' AND n.nspname = 'tview_test_ns'",
)
.unwrap()
.unwrap_or(false);
assert!(in_target, "tv_item should be in tview_test_ns, not public");
let in_public = Spi::get_one::<bool>(
"SELECT COUNT(*) > 0 FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = 'tv_item' AND n.nspname = 'public'",
)
.unwrap()
.unwrap_or(false);
assert!(!in_public, "tv_item must not be created in public schema");
let view_in_target = Spi::get_one::<bool>(
"SELECT COUNT(*) > 0 FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = 'v_item' AND n.nspname = 'tview_test_ns'",
)
.unwrap()
.unwrap_or(false);
assert!(view_in_target, "v_item should be in tview_test_ns");
}
#[pg_test]
fn test_create_tview_defaults_to_public() {
Spi::run("SET search_path TO public").unwrap();
Spi::run("CREATE TABLE tb_gadget (pk_gadget BIGSERIAL PRIMARY KEY, label TEXT)").unwrap();
Spi::run("INSERT INTO tb_gadget VALUES (1, 'Gizmo')").unwrap();
Spi::run(
"SELECT pg_tviews_create('gadget', $$
SELECT pk_gadget, jsonb_build_object('label', label) AS data
FROM tb_gadget
$$)",
)
.unwrap();
let in_public = Spi::get_one::<bool>(
"SELECT COUNT(*) > 0 FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = 'tv_gadget' AND n.nspname = 'public'",
)
.unwrap()
.unwrap_or(false);
assert!(
in_public,
"tv_gadget should be in public with default search_path"
);
}
#[pg_test]
fn test_ctas_with_preexisting_data() {
Spi::run("SET search_path TO public").unwrap();
Spi::run("CREATE TABLE tb_ctas_test (pk_test BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run("INSERT INTO tb_ctas_test VALUES (1, 'Alice'), (2, 'Bob')").unwrap();
Spi::run(
"CREATE TABLE tv_ctas_test AS
SELECT pk_test, jsonb_build_object('name', name) AS data
FROM tb_ctas_test",
)
.unwrap();
let tview_exists = Spi::get_one::<bool>(
"SELECT COUNT(*) > 0 FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = 'tv_ctas_test' AND n.nspname = 'public'",
)
.unwrap()
.unwrap_or(false);
assert!(tview_exists, "tv_ctas_test should exist");
let row_count = Spi::get_one::<i64>("SELECT COUNT(*) FROM tv_ctas_test")
.unwrap()
.unwrap_or(0);
assert_eq!(
row_count, 2,
"tv_ctas_test should have 2 rows from initial population"
);
let alice_exists = Spi::get_one::<bool>(
"SELECT COUNT(*) > 0 FROM tv_ctas_test WHERE data->>'name' = 'Alice'",
)
.unwrap()
.unwrap_or(false);
assert!(alice_exists, "Alice should be in the TVIEW");
}
#[pg_test]
fn test_tview_unlogged_guc_control() {
Spi::run("SET search_path TO public").unwrap();
Spi::run("SET pg_tviews.unlogged_by_default TO true").unwrap();
Spi::run("CREATE TABLE tb_guc_test1 (pk_test BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run(
"SELECT pg_tviews_create('guc_test1', $$
SELECT pk_test, jsonb_build_object('name', name) AS data
FROM tb_guc_test1
$$)",
)
.unwrap();
let is_unlogged = Spi::get_one::<bool>(
"SELECT c.relpersistence = 'u' FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = 'tv_guc_test1' AND n.nspname = 'public'",
)
.unwrap()
.unwrap_or(false);
assert!(
is_unlogged,
"tv_guc_test1 should be UNLOGGED when GUC is true"
);
Spi::run("SET pg_tviews.unlogged_by_default TO false").unwrap();
Spi::run("CREATE TABLE tb_guc_test2 (pk_test BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run(
"SELECT pg_tviews_create('guc_test2', $$
SELECT pk_test, jsonb_build_object('name', name) AS data
FROM tb_guc_test2
$$)",
)
.unwrap();
let is_logged = Spi::get_one::<bool>(
"SELECT c.relpersistence = 'p' FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = 'tv_guc_test2' AND n.nspname = 'public'",
)
.unwrap()
.unwrap_or(false);
assert!(is_logged, "tv_guc_test2 should be LOGGED when GUC is false");
Spi::run("RESET pg_tviews.unlogged_by_default").unwrap();
}
#[pg_test]
fn test_alter_tview_unlogged_logged() {
Spi::run("SET search_path TO public").unwrap();
Spi::run("CREATE TABLE tb_alter_test (pk_test BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run("INSERT INTO tb_alter_test VALUES (1, 'Alice'), (2, 'Bob')").unwrap();
Spi::run("SET pg_tviews.unlogged_by_default TO false").unwrap();
Spi::run(
"SELECT pg_tviews_create('alter_test', $$
SELECT pk_test, jsonb_build_object('name', name) AS data
FROM tb_alter_test
$$)",
)
.unwrap();
let is_logged = Spi::get_one::<bool>(
"SELECT c.relpersistence = 'p' FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = 'tv_alter_test' AND n.nspname = 'public'",
)
.unwrap()
.unwrap_or(false);
assert!(is_logged, "tv_alter_test should initially be LOGGED");
Spi::run("ALTER TABLE tv_alter_test SET UNLOGGED").unwrap();
let is_unlogged = Spi::get_one::<bool>(
"SELECT c.relpersistence = 'u' FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = 'tv_alter_test' AND n.nspname = 'public'",
)
.unwrap()
.unwrap_or(false);
assert!(
is_unlogged,
"tv_alter_test should be UNLOGGED after ALTER TABLE"
);
Spi::run("ALTER TABLE tv_alter_test SET LOGGED").unwrap();
let is_logged_again = Spi::get_one::<bool>(
"SELECT c.relpersistence = 'p' FROM pg_class c \
JOIN pg_namespace n ON c.relnamespace = n.oid \
WHERE c.relname = 'tv_alter_test' AND n.nspname = 'public'",
)
.unwrap()
.unwrap_or(false);
assert!(
is_logged_again,
"tv_alter_test should be LOGGED again after ALTER TABLE"
);
Spi::run("RESET pg_tviews.unlogged_by_default").unwrap();
}
#[pg_test]
fn test_alter_tview_data_integrity() {
Spi::run("SET search_path TO public").unwrap();
Spi::run("CREATE TABLE tb_integrity_test (pk_test BIGSERIAL PRIMARY KEY, name TEXT)")
.unwrap();
Spi::run("INSERT INTO tb_integrity_test VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')")
.unwrap();
Spi::run("SET pg_tviews.unlogged_by_default TO false").unwrap();
Spi::run(
"SELECT pg_tviews_create('integrity_test', $$
SELECT pk_test, jsonb_build_object('name', name) AS data
FROM tb_integrity_test
$$)",
)
.unwrap();
let initial_count = Spi::get_one::<i64>("SELECT COUNT(*) FROM tv_integrity_test")
.unwrap()
.unwrap_or(0);
assert_eq!(initial_count, 3, "TVIEW should have 3 rows initially");
Spi::run("ALTER TABLE tv_integrity_test SET UNLOGGED").unwrap();
let after_unlogged_count = Spi::get_one::<i64>("SELECT COUNT(*) FROM tv_integrity_test")
.unwrap()
.unwrap_or(0);
assert_eq!(
after_unlogged_count, 3,
"Data should be preserved when converting LOGGED to UNLOGGED"
);
Spi::run("ALTER TABLE tv_integrity_test SET LOGGED").unwrap();
let after_logged_count = Spi::get_one::<i64>("SELECT COUNT(*) FROM tv_integrity_test")
.unwrap()
.unwrap_or(0);
assert_eq!(
after_logged_count, 3,
"Data should be preserved when converting UNLOGGED to LOGGED"
);
Spi::run("RESET pg_tviews.unlogged_by_default").unwrap();
}
#[pg_test]
fn test_detect_post_crash_empty_tview() {
Spi::run("SET search_path TO public").unwrap();
Spi::run("CREATE TABLE tb_crash_test (pk_test BIGSERIAL PRIMARY KEY, name TEXT)").unwrap();
Spi::run("INSERT INTO tb_crash_test VALUES (1, 'Alice'), (2, 'Bob')").unwrap();
Spi::run(
"SELECT pg_tviews_create('crash_test', $$
SELECT pk_test, jsonb_build_object('name', name) AS data
FROM tb_crash_test
$$)",
)
.unwrap();
let initial_count = Spi::get_one::<i64>("SELECT COUNT(*) FROM tv_crash_test")
.unwrap()
.unwrap_or(0);
assert_eq!(initial_count, 2, "TVIEW should have 2 rows initially");
let crash_before = crate::lifecycle::detect_post_crash_truncation("crash_test").unwrap();
assert!(!crash_before, "Should not detect crash when table has data");
Spi::run("TRUNCATE TABLE tv_crash_test").unwrap();
let after_truncate_count = Spi::get_one::<i64>("SELECT COUNT(*) FROM tv_crash_test")
.unwrap()
.unwrap_or(0);
assert_eq!(
after_truncate_count, 0,
"TVIEW should be empty after truncate"
);
let crash_detected = crate::lifecycle::detect_post_crash_truncation("crash_test").unwrap();
assert!(
crash_detected,
"Should detect crash when UNLOGGED table is empty but view has data"
);
let view_count = Spi::get_one::<i64>("SELECT COUNT(*) FROM v_crash_test")
.unwrap()
.unwrap_or(0);
assert_eq!(
view_count, 2,
"Backing view should still have data after table truncate"
);
}
#[pg_test]
fn test_auto_recover_after_crash() {
Spi::run("SET search_path TO public").unwrap();
Spi::run("CREATE TABLE tb_recover_test (pk_test BIGSERIAL PRIMARY KEY, name TEXT)")
.unwrap();
Spi::run("INSERT INTO tb_recover_test VALUES (1, 'Alice'), (2, 'Bob')").unwrap();
Spi::run(
"SELECT pg_tviews_create('recover_test', $$
SELECT pk_test, jsonb_build_object('name', name) AS data
FROM tb_recover_test
$$)",
)
.unwrap();
let initial_count = Spi::get_one::<i64>("SELECT COUNT(*) FROM tv_recover_test")
.unwrap()
.unwrap_or(0);
assert_eq!(initial_count, 2, "TVIEW should have 2 rows initially");
Spi::run("TRUNCATE TABLE tv_recover_test").unwrap();
let after_truncate_count = Spi::get_one::<i64>("SELECT COUNT(*) FROM tv_recover_test")
.unwrap()
.unwrap_or(0);
assert_eq!(
after_truncate_count, 0,
"TVIEW should be empty after truncate"
);
let recovery_performed =
Spi::get_one::<bool>("SELECT pg_tviews_recover_after_crash('recover_test')")
.unwrap()
.unwrap_or(false);
assert!(
recovery_performed,
"Recovery should be performed when crash is detected"
);
let after_recovery_count = Spi::get_one::<i64>("SELECT COUNT(*) FROM tv_recover_test")
.unwrap()
.unwrap_or(0);
assert_eq!(
after_recovery_count, 2,
"TVIEW should have 2 rows after recovery"
);
let second_recovery =
Spi::get_one::<bool>("SELECT pg_tviews_recover_after_crash('recover_test')")
.unwrap()
.unwrap_or(true);
assert!(
!second_recovery,
"Second recovery call should return false when no crash detected"
);
}
}