pg_tviews 0.1.0-beta.12

Transactional materialized views with incremental refresh for PostgreSQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# 1. Scope of This TDD

This document focuses on **how TVIEW handles multi-update transactions**:

* Many rows across many `tb_*` tables can change.
* The **same TVIEW row** might be logically affected multiple times.
* We want **exactly one refresh per `(entity, pk)` per transaction**, at **commit time**, in **dependency order**.

We assume the rest of TVIEW (CREATE TABLE tv_ AS SELECT, view/table generation, dependency graph) is already designed.

---

# 2. Requirements & Invariants

### R1 — Refresh coalescing

Within a single transaction:

* Each logical target `(entity_name, pk_value)` is refreshed **at most once**, regardless of how many triggers fired for it.

### R2 — End-of-transaction semantics

* TVIEW refreshes MUST run **after all writes are applied**, but **before commit finishes**.
* If refresh fails: the entire transaction fails and rolls back.

### R3 — Dependency-correct order

If we have:

```text
tv_company → tv_user → tv_post → tv_feed
```

then for a transaction that touches all of these:

1. `tv_company` must be refreshed before
2. `tv_user` before
3. `tv_post` before
4. `tv_feed`

### R4 — Propagation is part of the same coalesced algorithm

When refreshing `tv_post(pk_post=123)`:

* We may discover parent rows, e.g. `tv_user(pk_user=7)`.
* That parent refresh must also go through the **same transaction-level queue**, so it is also deduped.

### R5 — No extra read-round trips from FraiseQL

FraiseQL just:

* writes to `tb_*`
* then reads from `tv_*`

TVIEW lives entirely in DB.

---

# 3. High-Level Design

### 3.1 Transaction-Local Queue

We maintain a **per-transaction in-memory queue** (HashSet) of refresh requests:

```text
(entity_name: String, pk: i64)
```

This is:

* Populated by row-level triggers on `tb_*` (and potentially `tv_*` if needed)
* Flushed exactly once at **transaction commit**

### 3.2 Commit Callback

We register a **transaction callback** (xact callback) via pgrx / PG hooks:

* On `XactEvent::Commit`, we:

  * Snapshot & clear the queue
  * Resolve entities into dependency order
  * For each (entity, pk), perform the **refresh phase**

### 3.3 Refresh Phase

For each `(entity, pk)`:

1. Recompute the row from `v_entity` (the view).
2. Patch the corresponding `tv_entity` row via `jsonb_delta_patch`.
3. Determine parents (e.g. from FK columns) and enqueue them into the same transaction queue (if not already processed).

Processing continues until the queue is empty (closure of the graph).

---

# 4. Data Structures

### 4.1 `RefreshKey`

```rust
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RefreshKey {
    pub entity: String, // "post", "user", "company", etc.
    pub pk: i64,        // pk_<entity> value
}
```

### 4.2 Queue State

Transaction-local state:

```rust
use std::cell::RefCell;
use std::collections::{HashSet, VecDeque};
use pgrx::prelude::*;

thread_local! {
    static TX_REFRESH_QUEUE: RefCell<HashSet<RefreshKey>> = RefCell::new(HashSet::new());
    static TX_REFRESH_SCHEDULED: RefCell<bool> = RefCell::new(false);
}
```

We:

* Use `HashSet<RefreshKey>` for dedup.
* Track whether we’ve already registered a commit callback (`TX_REFRESH_SCHEDULED`).

### 4.3 Dependency Graph

Precomputed dependency DAG between entities:

```rust
/// tv_company -> tv_user -> tv_post -> tv_feed
#[derive(Debug, Clone)]
pub struct EntityDepGraph {
    /// entity_name -> vec of parent entity_names (parents depend on child)
    /// Example: "post" -> ["feed"], "user" -> ["post"]
    pub parents: HashMap<String, Vec<String>>,
    /// entity_name -> vec of child entity_names
    pub children: HashMap<String, Vec<String>>,
    /// topological order of entities (lowest dependency first)
    pub topo_order: Vec<String>,
}
```

This can be loaded once at extension startup or lazily cached (details up to you).

---

# 5. Trigger Behavior (Enqueue Only)

### 5.1 Trigger semantics

* Called for every `INSERT`, `UPDATE`, `DELETE` on any underlying `tb_*`.
* Must **NOT** recompute TVIEWs directly.
* Only identifies which `(entity, pk)` should be refreshed **eventually** and enqueues them.

### 5.2 Trigger pseudo-code

```rust
use pgrx::prelude::*;
use crate::tx_queue::enqueue_refresh_from_trigger;

#[pg_trigger]
pub fn tview_trigger(trigger: &PgTrigger) -> Result<
    Option<pgrx::heap_tuple::PgHeapTuple<'_, pgrx::pgbox::AllocatedByRust>>,
    spi::Error,
> {
    let rel = trigger.relation()?;
    let source_oid = rel.oid();

    // Determine which entity/entities this table affects.
    // Often one table maps to one entity, but read-models can be synthetic.
    // We'll assume we have metadata: "tb_post" -> "post".
    let entity = crate::catalog::entity_for_table(source_oid)?;

    // Extract PK of the changed row.
    let pk = crate::util::extract_pk(trigger, &entity)?;

    // Enqueue the refresh request.
    enqueue_refresh_from_trigger(&entity, pk)?;

    // Standard AFTER trigger: return NEW where present, else OLD.
    Ok(trigger.new_or_old())
}
```

---

# 6. Transaction Queue Logic

## 6.1 Enqueue Function

```rust
// tx_queue.rs
use super::RefreshKey;
use std::collections::HashSet;
use pgrx::prelude::*;

pub fn enqueue_refresh_from_trigger(entity: &str, pk: i64) -> spi::Result<()> {
    let key = RefreshKey { entity: entity.to_string(), pk };

    TX_REFRESH_QUEUE.with(|cell| {
        let mut set = cell.borrow_mut();
        set.insert(key);
    });

    // Ensure commit callback is registered once per transaction
    TX_REFRESH_SCHEDULED.with(|flag_cell| {
        let mut flag = flag_cell.borrow_mut();
        if !*flag {
            register_commit_callback()?;
            *flag = true;
        }
        Ok(())
    })
}

/// Register a transaction callback that will be invoked at COMMIT.
/// Exact API depends on pgrx / PG version; this is pseudocode.
fn register_commit_callback() -> spi::Result<()> {
    // Pseudo-API — you will use pgrx internals or a raw PG hook:
    // pgrx::register_xact_callback(XactEvent::Commit, tx_commit_handler);

    // For the stub, we assume something like:
    crate::xact::register_commit_callback(tx_commit_handler);
    Ok(())
}
```

## 6.2 Commit Handler

```rust
fn tx_commit_handler() -> spi::Result<()> {
    // Take the queue snapshot and clear it
    let mut queue_snapshot: HashSet<RefreshKey> = HashSet::new();

    TX_REFRESH_QUEUE.with(|cell| {
        // swap out the set
        let mut set = cell.borrow_mut();
        queue_snapshot = std::mem::take(&mut *set);
    });

    TX_REFRESH_SCHEDULED.with(|flag_cell| {
        *flag_cell.borrow_mut() = false;
    });

    if queue_snapshot.is_empty() {
        return Ok(());
    }

    // Convert to VecDeque for iterative processing
    let mut worklist: VecDeque<RefreshKey> = queue_snapshot.into_iter().collect();
    let mut processed: HashSet<RefreshKey> = HashSet::new();

    // Optionally, you can sort or bucket by entity to respect topo order.
    // For now, we’ll process iteratively and use DAG logic during propagation.

    // Main refresh loop
    while let Some(key) = worklist.pop_front() {
        if processed.contains(&key) {
            continue;
        }

        // Refresh this (entity, pk)
        crate::refresh::refresh_entity_pk(&key)?;

        processed.insert(key.clone());

        // Propagation: may return parents to enqueue
        let parents = crate::propagate::parents_for(&key)?;

        for parent_key in parents {
            if !processed.contains(&parent_key) {
                worklist.push_back(parent_key);
            }
        }
    }

    Ok(())
}
```

This loop:

* Ensures each `(entity, pk)` is refreshed at most once.
* Uses propagation step to populate further refreshes (parents).
* Terminates because dependency graph is acyclic and finite.

If you prefer topological ordering at the **entity level**, you can:

* Bucket refresh keys by `entity`
* Process entities according to `EntityDepGraph.topo_order`
* But the iterative approach plus a DAG ensures correctness as well.

---

# 7. Refresh Logic

## 7.1 High-Level Steps

For a given `(entity, pk)`:

1. Find `v_entity` and `tv_entity` OIDs from metadata.
2. Run `SELECT * FROM v_entity WHERE pk_<entity> = $1`.
3. If row exists:

   * Extract `data` and any updated FK / UUID columns.
   * Patch `tv_entity` row using `jsonb_delta_patch`.
4. If row does not exist (e.g. deletion):

   * Delete from `tv_entity`.

## 7.2 Pseudo-code

```rust
use pgrx::prelude::*;
use crate::catalog::TviewMeta;

pub fn refresh_entity_pk(key: &crate::RefreshKey) -> spi::Result<()> {
    let meta = TviewMeta::load_for_entity(&key.entity)?
        .ok_or_else(|| spi::Error::User(format!("No TVIEW meta for entity {}", key.entity)))?;

    let view_name = meta.view_name;      // "v_post"
    let table_name = meta.table_name;    // "tv_post"
    let pk_col = format!("pk_{}", key.entity); // "pk_post"

    // 1. Recompute from view
    let select_sql = format!(
        "SELECT * FROM {} WHERE {} = $1",
        view_name, pk_col
    );

    Spi::connect(|client| {
        let rows = client.select(
            &select_sql,
            None,
            Some(vec![(
                PgOid::BuiltIn(PgBuiltInOids::INT8OID),
                key.pk.into(),
            )]),
        )?;

        if rows.len() == 0 {
            // Row deleted in view → delete tv row
            let delete_sql = format!(
                "DELETE FROM {} WHERE {} = $1",
                table_name, pk_col
            );
            client.update(
                &delete_sql,
                None,
                Some(vec![(
                    PgOid::BuiltIn(PgBuiltInOids::INT8OID),
                    key.pk.into(),
                )]),
            )?;
            return Ok(());
        }

        let row = rows.get(0)?;

        // Extract JSONB data
        let new_data: JsonB = row["data"].value().unwrap();

        // Extract FK columns & UUID FK columns as needed
        let fk_updates = crate::infer::extract_fk_updates(&row, &meta)?;
        let uuid_fk_updates = crate::infer::extract_uuid_fk_updates(&row, &meta)?;

        // 2. Apply patch using jsonb_delta_patch
        let mut set_fragments: Vec<String> = Vec::new();
        set_fragments.push("data = jsonb_delta_patch(data, $1)".to_string());
        set_fragments.push("updated_at = now()".to_string());

        // Add fk/uuid columns to SET clause
        for (i, col) in fk_updates.columns.iter().enumerate() {
            set_fragments.push(format!("{} = ${}", col, i + 2)); // offset by 2 (data + pk)
        }
        let uuid_offset = 2 + fk_updates.columns.len();
        for (j, col) in uuid_fk_updates.columns.iter().enumerate() {
            set_fragments.push(format!("{} = ${}", col, uuid_offset + j));
        }

        let set_clause = set_fragments.join(", ");

        let update_sql = format!(
            "UPDATE {} SET {} WHERE {} = ${}",
            table_name,
            set_clause,
            pk_col,
            uuid_offset + uuid_fk_updates.columns.len() + 1
        );

        // Build bind params
        let mut params: Vec<(PgOid, pgrx::Datum)> = Vec::new();
        params.push((PgOid::BuiltIn(PgBuiltInOids::JSONBOID), new_data.into()));

        for v in &fk_updates.values {
            params.push((PgOid::BuiltIn(PgBuiltInOids::INT8OID), (*v).into()));
        }
        for v in &uuid_fk_updates.values {
            params.push((PgOid::BuiltIn(PgBuiltInOids::UUIDOID), (*v).into()));
        }
        params.push((PgOid::BuiltIn(PgBuiltInOids::INT8OID), key.pk.into()));

        client.update(&update_sql, None, Some(params))?;

        Ok(())
    })
}
```

> **Note:** This is intentionally verbose pseudo-code so the team can see how to wire dynamic sets; you can simplify based on your conventions.

---

# 8. Propagation Logic

## 8.1 Goal

Given a refreshed `(entity, pk)`, find which parent entities depend on it and enqueue those.

Two ways:

* **FK-based**: inspect FK columns in the refreshed view row (e.g. `fk_user`) and map them to parents (e.g. `user`).
* **Graph-based**: use precomputed `EntityDepGraph` (which entity depends on which).

### 8.2 Pseudo-code

```rust
use pgrx::prelude::*;
use crate::{RefreshKey, catalog::TviewMeta};

pub fn parents_for(key: &RefreshKey) -> spi::Result<Vec<RefreshKey>> {
    let mut result = Vec::new();

    // 1. Load metadata for this entity
    let meta = TviewMeta::load_for_entity(&key.entity)?
        .ok_or_else(|| spi::Error::User(format!("No meta for {}", key.entity)))?;

    // 2. From entity dep graph, find parent entities
    let dep_graph = crate::catalog::entity_dep_graph()?;
    let parents = dep_graph.parents.get(&key.entity).cloned().unwrap_or_default();

    // 3. For each parent entity, determine which PK value to enqueue
    // Typically: parent.pk = some fk_* value on this row.
    // We need the refreshed fk_* columns. We could:
    //  - either fetch the row from v_entity again and read FKs
    //  - or reuse previous view row if we keep it around.

    for parent_entity in parents {
        let fk_value = crate::util::lookup_fk_for_parent(&key.entity, &parent_entity, key.pk)?;
        if let Some(parent_pk) = fk_value {
            result.push(RefreshKey {
                entity: parent_entity,
                pk: parent_pk,
            });
        }
    }

    Ok(result)
}
```

This is intentionally a **high-level stub**; the real implementation will depend on how you encode parent relationships in your metadata.

---

# 9. Error Handling & Failure Modes

* If `refresh_entity_pk` fails:

  * The commit callback returns `Err`.
  * The entire transaction aborts, including writes to `tb_*`.
  * TVIEW remains in a consistent previous state.
* If propagation discovers missing metadata:

  * Raise a clear `ERROR` about misconfigured TVIEW.
* If multiple triggers race:

  * They all operate on transaction-local state. No cross-transaction interference.

---

# 10. Summary for the Coding Team

**Conceptual behavior:**

* Triggers only **enqueue** refreshes.
* Coalescing happens in a **transaction-local HashSet**.
* Actual refresh work happens **once per (entity,pk) at COMMIT**.
* Refresh uses `v_entity` to recompute and `jsonb_delta_patch` to apply.
* Propagation is driven by FK and dependency graph, and uses the same queue (so it’s coalesced too).

**Key pieces to implement:**

1. `RefreshKey` + transaction-local queue (`TX_REFRESH_QUEUE`, `TX_REFRESH_SCHEDULED`)
2. Trigger → `enqueue_refresh_from_trigger`
3. Commit callback → `tx_commit_handler`
4. `refresh_entity_pk` (recompute + jsonb_delta_patch + delete-if-missing)
5. `parents_for` + entity dependency graph
6. Metadata layer (`TviewMeta`) to map entities → v_*/tv_* names, PK/FK columns

If you want, next we can:

* Zoom in on **how to compute the entity dependency graph from `pg_depend`**, with concrete SQL + Rust.
* Or design the `pg_tview_meta` schema in detail.