1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
use crate::cascade_path::CascadePath;
use pgrx::datum::DatumWithOid;
use pgrx::pg_sys::Oid;
use pgrx::prelude::*;
/// Type of dependency relationship for `jsonb_delta` optimization
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DependencyType {
/// Direct column from base table (no nested JSONB)
Scalar,
/// Embedded object via `jsonb_build_object` in nested key
NestedObject,
/// Array created via `jsonb_agg`
Array,
}
impl DependencyType {
/// Parse from database string representation
pub fn from_str(s: &str) -> Self {
match s {
"nested_object" => Self::NestedObject,
"array" => Self::Array,
_ => Self::Scalar, // default fallback (includes "scalar")
}
}
/// Convert to database string representation
pub const fn as_str(&self) -> &'static str {
match self {
Self::Scalar => "scalar",
Self::NestedObject => "nested_object",
Self::Array => "array",
}
}
}
/// Represents a row in `pg_tview_meta` (your own catalog table).
#[derive(Debug, Clone)]
pub struct TviewMeta {
pub tview_oid: Oid,
pub view_oid: Oid,
pub entity_name: String,
pub fk_columns: Vec<String>,
#[allow(dead_code)] // Reason: loaded from pg_tview_meta for future UUID FK handling
pub uuid_fk_columns: Vec<String>,
/// Type of each dependency: Scalar (direct column), `NestedObject` (embedded JSONB),
/// or Array (`jsonb_agg` aggregation).
///
/// Length matches `fk_columns` and `dependencies` arrays.
/// Used by `jsonb_delta` to choose patch function (scalar/nested/array).
pub dependency_types: Vec<DependencyType>,
/// JSONB path for each dependency, if nested.
/// - Scalar: None
/// - `NestedObject`: Some(vec!["author"]) for { "author": {...} }
/// - Array: Some(vec!["comments"]) for { "comments": [...] }
///
/// Length matches `dependency_types`.
pub dependency_paths: Vec<Option<Vec<String>>>,
/// For Array dependencies, the key used to match elements (e.g., "id").
/// Used by `jsonb_smart_patch_array(target, 'comments', '{...}', 'id')`.
///
/// - Scalar/`NestedObject`: None
/// - Array: Some("id") or `Some("pk_comment")`
///
/// Length matches `dependency_types`.
pub array_match_keys: Vec<Option<String>>,
/// DISTINCT ON key column names for DISTINCT ON TVIEWs (e.g. `["id"]`).
///
/// When non-empty, this TVIEW uses deduplication-based refresh: the trigger
/// enqueues the DISTINCT ON key value instead of the base-table PK, and the
/// refresh re-evaluates the full DISTINCT ON group to find the winning row.
///
/// Empty for standard (PK-based) TVIEWs.
pub distinct_on_keys: Vec<String>,
/// `true` when this TVIEW's backing view is a `UNION ALL` or `UNION` query.
///
/// Used to apply the duplicate-row policy when multiple rows are returned
/// for the same PK during refresh (which can occur with non-mutually-exclusive
/// UNION ALL branches).
pub is_union: bool,
/// Cascade paths defining how changes propagate to this TVIEW.
///
/// Each path represents a sequence of hops from a source table to this TVIEW,
/// enabling indirect dependency tracking for multi-level cascades.
pub cascade_paths: Vec<CascadePath>,
}
impl TviewMeta {
/// Helper: Parse TEXT[] to Vec<DependencyType>
fn parse_dependency_types(row_value: Option<Vec<String>>) -> Vec<DependencyType> {
row_value
.unwrap_or_default()
.into_iter()
.map(|s| DependencyType::from_str(&s))
.collect()
}
/// Convert flat `TEXT[]` of dot-separated path strings into structured paths.
///
/// Each element is a dot-joined key sequence (e.g. `"book.author"`).
/// An empty string represents a `None` path (Scalar dependency).
fn parse_dep_paths(raw: Option<Vec<Option<String>>>) -> Vec<Option<Vec<String>>> {
raw.unwrap_or_default()
.into_iter()
.map(|opt| {
opt.filter(|s| !s.is_empty())
.map(|s| s.split('.').map(str::to_string).collect())
})
.collect()
}
/// Look up metadata by source table OID or view OID.
pub fn load_for_source(source_oid: Oid) -> spi::Result<Option<Self>> {
// SAFETY: DatumWithOid::new wraps PostgreSQL datum pointers for SPI parameter passing.
// The OID is a validated PostgreSQL object identifier.
Spi::connect(|client| {
let args = vec![unsafe {
DatumWithOid::new(source_oid, PgOid::BuiltIn(PgBuiltInOids::OIDOID).value())
}];
let mut rows = client.select(
"SELECT table_oid AS tview_oid, view_oid, entity, \
fk_columns, uuid_fk_columns, \
dependency_types, dependency_paths, array_match_keys, \
distinct_on_keys, is_union, cascade_paths \
FROM pg_tview_meta \
WHERE view_oid = $1 OR table_oid = $1",
None,
&args,
)?;
match rows.next() {
Some(row) => Ok(Some(Self::from_spi_row(&row)?)),
None => Ok(None),
}
})
}
/// Look up metadata by entity name
pub fn load_by_entity(entity_name: &str) -> spi::Result<Option<Self>> {
// SAFETY: DatumWithOid::new wraps PostgreSQL datum pointers for SPI parameter passing.
// The entity name is validated before this call.
Spi::connect(|client| {
let args = vec![unsafe {
DatumWithOid::new(entity_name, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value())
}];
let mut rows = client.select(
"SELECT table_oid AS tview_oid, view_oid, entity, \
fk_columns, uuid_fk_columns, \
dependency_types, dependency_paths, array_match_keys, \
distinct_on_keys, is_union, cascade_paths \
FROM pg_tview_meta \
WHERE entity = $1",
None,
&args,
)?;
match rows.next() {
Some(row) => Ok(Some(Self::from_spi_row(&row)?)),
None => Ok(None),
}
})
}
/// Load all TVIEW metadata
pub fn load_all() -> spi::Result<Vec<Self>> {
Spi::connect(|client| {
let rows = client.select(
"SELECT table_oid AS tview_oid, view_oid, entity, \
fk_columns, uuid_fk_columns, \
dependency_types, dependency_paths, array_match_keys, \
distinct_on_keys, is_union, cascade_paths \
FROM pg_tview_meta \
ORDER BY entity",
None,
&[],
)?;
let mut result = Vec::new();
for row in rows {
result.push(Self::from_spi_row(&row)?);
}
Ok(result)
})
}
/// Load metadata for a specific TVIEW OID.
///
/// Queries `pg_tview_meta` to retrieve dependency information needed for
/// smart JSONB patching. Used by `apply_patch()` to determine how to update
/// the JSONB `data` column.
///
/// # Arguments
///
/// * `tview_oid` - OID of the TVIEW table (e.g., `tv_post`)
///
/// # Returns
///
/// - `Ok(Some(TviewMeta))` if metadata found
/// - `Ok(None)` if no metadata exists (legacy TVIEW)
/// - `Err` if query fails
///
/// # Example
///
/// ```rust
/// let meta = TviewMeta::load_for_tview(tview_oid)?;
/// if let Some(m) = meta {
/// let deps = m.parse_dependencies();
/// // Use deps for smart patching
/// }
/// ```
#[allow(dead_code)] // Reason: May be useful in future optimization phases or external code
pub fn load_for_tview(tview_oid: Oid) -> spi::Result<Option<Self>> {
// SAFETY: DatumWithOid::new wraps PostgreSQL datum pointers for SPI parameter passing.
// The OID is a validated PostgreSQL object identifier.
Spi::connect(|client| {
let args = vec![unsafe {
DatumWithOid::new(tview_oid, PgOid::BuiltIn(PgBuiltInOids::OIDOID).value())
}];
let mut rows = client.select(
"SELECT table_oid AS tview_oid, view_oid, entity, \
fk_columns, uuid_fk_columns, \
dependency_types, dependency_paths, array_match_keys, \
distinct_on_keys, is_union, cascade_paths \
FROM pg_tview_meta \
WHERE table_oid = $1",
None,
&args,
)?;
let result = if let Some(row) = rows.next() {
Some(Self::from_spi_row(&row)?)
} else {
None
};
Ok(result)
})
}
/// Parse SPI row into `TviewMeta` struct.
///
/// Expects columns: `tview_oid`, `view_oid`, `entity`, `fk_columns`,
/// `uuid_fk_columns`, `dependency_types`, `dependency_paths`, `array_match_keys`.
pub fn from_spi_row(row: &spi::SpiHeapTupleData) -> spi::Result<Self> {
// Extract existing arrays
let fk_cols_val: Option<Vec<String>> = row["fk_columns"].value()?;
let uuid_fk_cols_val: Option<Vec<String>> = row["uuid_fk_columns"].value()?;
// Extract dependency_types (TEXT[])
let dep_types_raw: Option<Vec<String>> = row["dependency_types"].value()?;
let dep_types = Self::parse_dependency_types(dep_types_raw);
let dep_paths_raw: Option<Vec<Option<String>>> = row["dependency_paths"].value()?;
let dep_paths = Self::parse_dep_paths(dep_paths_raw);
// array_match_keys (TEXT[]) with NULL values
let array_keys: Option<Vec<Option<String>>> = row["array_match_keys"].value()?;
// distinct_on_keys (TEXT[]) — present in pg_tview_meta for DISTINCT ON TVIEWs
let distinct_on_keys: Vec<String> = row["distinct_on_keys"]
.value::<Vec<String>>()?
.unwrap_or_default();
// is_union (BOOLEAN) — true when backing view is a UNION ALL / UNION query
let is_union: bool = row["is_union"].value::<bool>()?.unwrap_or(false);
// cascade_paths (TEXT[]) — array of JSON-serialized cascade path objects
let cascade_paths_raw: Option<Vec<String>> = row["cascade_paths"].value()?;
let cascade_paths = if let Some(json_strings) = cascade_paths_raw {
json_strings
.into_iter()
.map(|json| serde_json::from_str(&json))
.collect::<Result<Vec<CascadePath>, _>>()
.map_err(|_e| spi::Error::InvalidPosition)? // Use appropriate error type
} else {
Vec::new()
};
Ok(Self {
tview_oid: row["tview_oid"].value()?.ok_or_else(|| {
spi::Error::from(crate::TViewError::SpiError {
query: String::new(),
error: "tview_oid column is NULL".to_string(),
})
})?,
view_oid: row["view_oid"].value()?.ok_or_else(|| {
spi::Error::from(crate::TViewError::SpiError {
query: String::new(),
error: "view_oid column is NULL".to_string(),
})
})?,
entity_name: row["entity"].value()?.ok_or_else(|| {
spi::Error::from(crate::TViewError::SpiError {
query: String::new(),
error: "entity column is NULL".to_string(),
})
})?,
fk_columns: fk_cols_val.unwrap_or_default(),
uuid_fk_columns: uuid_fk_cols_val.unwrap_or_default(),
dependency_types: dep_types,
dependency_paths: dep_paths,
array_match_keys: array_keys.unwrap_or_default(),
distinct_on_keys,
is_union,
cascade_paths,
})
}
/// Returns `true` if this TVIEW uses DISTINCT ON deduplication-based refresh.
#[allow(dead_code)] // Reason: Kept for API completeness; trigger handler uses cached distinct_on_key
pub fn is_distinct_on(&self) -> bool {
!self.distinct_on_keys.is_empty()
}
/// Parse dependency metadata into structured form for smart patching.
///
/// Converts raw metadata arrays (`dependency_types`, `dependency_paths`, etc.)
/// into a vector of `DependencyDetail` structs, one per FK column. Each detail
/// contains the dependency type, JSONB path, and array match key if applicable.
///
/// # Returns
///
/// Vector of `DependencyDetail` structs, one per FK column in `fk_columns`.
///
/// # Example
///
/// ```rust
/// let deps = meta.parse_dependencies();
/// for dep in deps {
/// match dep.dep_type {
/// DependencyType::NestedObject => {
/// println!("Nested at path: {:?}", dep.path);
/// }
/// DependencyType::Array => {
/// println!("Array at path: {:?}, key: {:?}", dep.path, dep.match_key);
/// }
/// DependencyType::Scalar => {
/// println!("Scalar FK: {}", dep.fk_column);
/// }
/// }
/// }
/// ```
pub fn parse_dependencies(&self) -> Vec<DependencyDetail> {
let len = self.dependency_types.len().max(self.fk_columns.len());
let mut details = Vec::with_capacity(len);
for i in 0..len {
let dep_type = self
.dependency_types
.get(i)
.cloned()
.unwrap_or(DependencyType::Scalar);
let path = self.dependency_paths.get(i).cloned().flatten();
let match_key = self.array_match_keys.get(i).cloned().flatten();
details.push(DependencyDetail {
dep_type,
path,
match_key,
});
}
details
}
}
/// Represents a single dependency with its type, path, and match key.
/// Used by the refresh engine to determine how to update related TVIEWs.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DependencyDetail {
/// Type of dependency (Scalar, Array, etc.)
pub dep_type: DependencyType,
/// JSONB path to the dependent data (e.g., `["author"]` or `["comments"]`)
pub path: Option<Vec<String>>,
/// Key to match for array elements (e.g., "id")
pub match_key: Option<String>,
}
impl Default for TviewMeta {
fn default() -> Self {
Self {
tview_oid: pg_sys::Oid::INVALID,
view_oid: pg_sys::Oid::INVALID,
entity_name: String::new(),
fk_columns: vec![],
uuid_fk_columns: vec![],
dependency_types: vec![],
dependency_paths: vec![],
array_match_keys: vec![],
distinct_on_keys: vec![],
is_union: false,
cascade_paths: vec![],
}
}
}
/// Map a base table OID to its entity name
///
/// Example: OID of `tb_user` → Some("user")
///
/// Returns:
/// - Ok(Some(entity)) if table is tracked in `pg_tview_meta`
/// - Ok(None) if table is not tracked
/// - Err(...) on database error
///
/// # Cached Version
///
/// This function caches the mapping to avoid repeated `pg_class` queries.
/// Performance improvement: 0.1ms → 0.001ms per trigger
///
/// # Errors
/// Returns error if table OID lookup fails or cache access fails
pub fn entity_for_table(table_oid: Oid) -> crate::TViewResult<Option<String>> {
crate::queue::cache::table_cache::entity_for_table_cached(table_oid)
}
/// Get entity name for table OID without caching (internal use)
///
/// This is the slow path that queries `pg_class` every time.
/// Used by the cache when there's a cache miss.
pub fn entity_for_table_uncached(table_oid: Oid) -> crate::TViewResult<Option<String>> {
// Use Spi::connect + client.select instead of Spi::get_one_with_args because
// pgrx 0.17's get_one_with_args returns Err(InvalidPosition) when the query
// returns 0 rows — it calls .first().get_one() which goes through
// get_datum_by_ordinal's bounds check (current >= size) instead of the
// get_heap_tuple path that properly returns Ok(None) for empty results.
Spi::connect(|client| {
// Step 1: resolve OID → table name
let args = vec![unsafe {
DatumWithOid::new(table_oid, PgOid::BuiltIn(PgBuiltInOids::OIDOID).value())
}];
let mut rows = client.select(
"SELECT relname::text FROM pg_class WHERE oid = $1",
Some(1),
&args,
)?;
let table_name: String = match rows.next() {
Some(row) => match row[1].value::<String>()? {
Some(name) => name,
None => return Ok(None),
},
None => return Ok(None),
};
// Step 2: check for "tb_<entity>" prefix
let Some(entity) = table_name.strip_prefix("tb_") else {
return Ok(None);
};
// Step 3: verify entity exists in pg_tview_meta
let args = vec![unsafe {
DatumWithOid::new(entity, PgOid::BuiltIn(PgBuiltInOids::TEXTOID).value())
}];
let mut meta_rows = client.select(
"SELECT entity FROM pg_tview_meta WHERE entity = $1",
Some(1),
&args,
)?;
match meta_rows.next() {
Some(row) => Ok(row[1].value::<String>()?),
None => Ok(None),
}
})
.map_err(|e: spi::Error| crate::TViewError::SpiError {
query: "entity_for_table_uncached".to_string(),
error: format!("{e:?}"),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dependency_type_from_str() {
assert_eq!(DependencyType::from_str("scalar"), DependencyType::Scalar);
assert_eq!(
DependencyType::from_str("nested_object"),
DependencyType::NestedObject
);
assert_eq!(DependencyType::from_str("array"), DependencyType::Array);
assert_eq!(DependencyType::from_str("unknown"), DependencyType::Scalar);
// default
}
#[test]
fn test_dependency_type_to_str() {
assert_eq!(DependencyType::Scalar.as_str(), "scalar");
assert_eq!(DependencyType::NestedObject.as_str(), "nested_object");
assert_eq!(DependencyType::Array.as_str(), "array");
}
#[test]
fn test_tview_meta_has_new_fields() {
let meta = TviewMeta {
tview_oid: Oid::from(1234),
view_oid: Oid::from(5678),
entity_name: "test".to_string(),
fk_columns: vec![],
uuid_fk_columns: vec![],
dependency_types: vec![DependencyType::Scalar],
dependency_paths: vec![None],
array_match_keys: vec![None],
distinct_on_keys: vec![],
is_union: false,
cascade_paths: vec![],
};
assert_eq!(meta.dependency_types.len(), 1);
assert_eq!(meta.dependency_paths.len(), 1);
assert_eq!(meta.array_match_keys.len(), 1);
}
#[test]
fn test_entity_for_table_name_parsing() {
// This is a unit test that doesn't require database access
let test_cases = vec![
("tb_user", Some("user")),
("tb_post", Some("post")),
("tb_company", Some("company")),
("users", None), // Not a tb_* table
("pg_class", None), // System table
];
for (table_name, expected_entity) in test_cases {
let result = table_name.strip_prefix("tb_").map(str::to_string);
assert_eq!(result.as_deref(), expected_entity);
}
}
}