Skip to main content

sources_postgres/document/
mod.rs

1//! [`PgDocumentBuilder`] — the read half of the Postgres source.
2//!
3//! Resolves which documents a changed row affects and assembles them from the
4//! schema. The work is split across this module:
5//!
6//! - [`fields`] — pure traversal of the index's field tree.
7//! - [`resolve`] — reverse resolution: changed row → affected document keys.
8//! - [`query`] — SQL generation (the server-side document query, reverse
9//!   queries, parameter binding).
10//! - [`value`] — decoding Postgres results into the value tree.
11//!
12//! ## Assembly happens in Postgres
13//!
14//! [`build`](PgDocumentBuilder::build) issues **one** query per document: the
15//! whole nested document is assembled server-side with `json_build_object` /
16//! `json_agg` and correlated subqueries (see [`query`]). Nested relations don't
17//! trigger extra round-trips, so there is no N+1. Existence and soft-delete
18//! fold into the query's `WHERE`, so a missing or deleted row simply returns no
19//! row → a tombstone.
20//!
21//! ## Coverage
22//!
23//! - Resolution: root table; direct foreign-key relations (`has_one`/
24//!   `has_many`); parent-side-key relations (`belongs_to`, resolved against the
25//!   parent table — so a change to, or deletion of, the *target* row re-emits
26//!   every document pointing at it); many-to-many (`through`) relations on
27//!   either the far or junction table; and tables reachable through multiple
28//!   hops of nesting, chained back to the root.
29//! - Assembly: column fields (transforms, defaults); belongs_to / has_one /
30//!   has_many / many_to_many joins (filters, ordering, limit); joins nested
31//!   inside joins; aggregates, including over a junction; boolean and timestamp
32//!   soft-delete with optional `when` filters.
33//!
34//! Relation targets are matched on each table's real primary key, looked up
35//! from the Postgres catalog and cached (see [`PgDocumentBuilder::table_primary_key`]).
36//! The index's own root key comes from its declared `primary_key`.
37//!
38//! ## Remaining limits
39//!
40//! A child-row *delete* on a related table can't be reverse-resolved from a
41//! key-only change (the row is already gone); this follows from the thin-event
42//! CDC design. Multi-hop reverse resolution issues one query per hop.
43
44mod fields;
45mod query;
46mod resolve;
47pub(crate) mod value;
48
49use std::collections::{HashMap, HashSet};
50use std::sync::{Arc, Mutex, PoisonError};
51
52use async_trait::async_trait;
53use schema_core::{
54    ColumnName, DatabaseSchema, Filter, IndexMapping, IndexName, IndexSchema, SoftDelete, TableName,
55};
56use sources_core::document::{Document, DocumentBuilder, DocumentId, IndexScope};
57use sources_core::{Catalog, ColumnInfo, Result, RowKey, SnapshotTable, SourceError, SourceSpec};
58use sqlx::{PgPool, Row};
59
60use fields::find_paths;
61
62/// Cache of each `(schema, table, column)`'s catalog metadata.
63type ColTypeCache = HashMap<(String, String, String), ColumnMeta>;
64
65/// Most keys per batched `build_many` query. Bounds the SQL length and the
66/// prepared-statement cache churn from the `IN (…)` list growing with key
67/// count; larger id sets are split across several round-trips.
68const BUILD_CHUNK: usize = 512;
69
70/// What the Postgres catalog says about a column: its cast-ready SQL type and
71/// whether it admits null. Fetched once per column and cached — both the
72/// document query (which needs the type to cast operands) and mapping resolution
73/// (which needs the type and nullability) read from the same lookup.
74#[derive(Debug, Clone)]
75struct ColumnMeta {
76    sql_type: String,
77    nullable: bool,
78}
79
80/// Builds index documents from a Postgres database, driven by a [`SourceSpec`] —
81/// the enabled indexes and their schemas, translated from the top-level config
82/// by the composition root. Cheap to clone — the pool, spec, and primary-key
83/// cache are shared.
84#[derive(Debug, Clone)]
85pub struct PgDocumentBuilder {
86    pool: PgPool,
87    spec: Arc<SourceSpec>,
88    /// Cache of each `(schema, table)`'s single-column primary key.
89    pk_cache: Arc<Mutex<HashMap<(String, String), ColumnName>>>,
90    /// Cache of each `(schema, table, column)`'s SQL type, used to cast filter
91    /// operands to the column's real type rather than comparing as text.
92    col_type_cache: Arc<Mutex<ColTypeCache>>,
93}
94
95impl PgDocumentBuilder {
96    /// Create a builder over a connection pool and the source spec.
97    pub fn new(pool: PgPool, spec: Arc<SourceSpec>) -> Self {
98        Self {
99            pool,
100            spec,
101            pk_cache: Arc::new(Mutex::new(HashMap::new())),
102            col_type_cache: Arc::new(Mutex::new(HashMap::new())),
103        }
104    }
105
106    /// Connect a pool from a Postgres connection URL and build over it.
107    #[tracing::instrument(name = "pg.connect", skip_all, err)]
108    pub async fn connect(connection_url: &str, spec: Arc<SourceSpec>) -> Result<Self> {
109        let pool = sqlx::postgres::PgPoolOptions::new()
110            .connect(connection_url)
111            .await
112            .map_err(|e| SourceError::Connection(e.to_string()))?;
113        tracing::info!(indexes = spec.indexes().count(), "connected to Postgres");
114        Ok(Self::new(pool, spec))
115    }
116
117    /// The single-column primary key of a table, from the Postgres catalog
118    /// (cached). Relations match against this, so a composite or missing
119    /// primary key is an error.
120    pub(super) async fn table_primary_key(
121        &self,
122        schema: &DatabaseSchema,
123        table: &TableName,
124    ) -> Result<ColumnName> {
125        let cache_key = (schema.to_string(), table.to_string());
126        {
127            let cache = self.pk_cache.lock().unwrap_or_else(PoisonError::into_inner);
128            if let Some(column) = cache.get(&cache_key) {
129                return Ok(column.clone());
130            }
131        }
132        let column = match self.fetch_primary_key(schema, table).await?.as_slice() {
133            [single] => single.clone(),
134            [] => {
135                return Err(SourceError::Query(format!(
136                    "table `{schema}.{table}` has no primary key"
137                )));
138            }
139            _ => {
140                return Err(SourceError::Unsupported(format!(
141                    "table `{schema}.{table}` has a composite primary key; relations require a single-column key"
142                )));
143            }
144        };
145        self.pk_cache
146            .lock()
147            .unwrap_or_else(PoisonError::into_inner)
148            .insert(cache_key, column.clone());
149        Ok(column)
150    }
151
152    async fn fetch_primary_key(
153        &self,
154        schema: &DatabaseSchema,
155        table: &TableName,
156    ) -> Result<Vec<ColumnName>> {
157        let names = primary_key_column_names(&self.pool, format!("{schema}.{table}")).await?;
158        names
159            .into_iter()
160            .map(|name| {
161                ColumnName::try_new(name)
162                    .map_err(|e| SourceError::Query(format!("invalid primary key column: {e}")))
163            })
164            .collect()
165    }
166
167    /// Resolve every relation table's primary key up front (cached), so the
168    /// document query can correlate and join through them.
169    async fn relation_pks(
170        &self,
171        schema: &schema_core::IndexSchema,
172    ) -> Result<HashMap<String, ColumnName>> {
173        let mut tables = Vec::new();
174        fields::collect_relation_tables(&schema.fields, &mut tables);
175        let unique: HashSet<&TableName> = tables.iter().collect();
176        let mut pks = HashMap::new();
177        for table in unique {
178            pks.insert(
179                table.to_string(),
180                self.table_primary_key(&schema.db_schema, table).await?,
181            );
182        }
183        Ok(pks)
184    }
185
186    /// The SQL type of a column, as a cast-ready name from the Postgres catalog
187    /// (e.g. `numeric`, `integer`, `timestamp with time zone`). A thin view over
188    /// [`column_meta`](Self::column_meta) for callers that only need the type to
189    /// cast a query operand.
190    pub(super) async fn column_type(
191        &self,
192        schema: &DatabaseSchema,
193        table: &TableName,
194        column: &ColumnName,
195    ) -> Result<String> {
196        Ok(self.column_meta(schema, table, column).await?.sql_type)
197    }
198
199    /// The Postgres catalog's view of a column — its cast-ready SQL type and
200    /// whether it admits null — cached. An unknown column is an error: a field or
201    /// filter naming a column that does not exist is a misconfiguration.
202    async fn column_meta(
203        &self,
204        schema: &DatabaseSchema,
205        table: &TableName,
206        column: &ColumnName,
207    ) -> Result<ColumnMeta> {
208        let cache_key = (schema.to_string(), table.to_string(), column.to_string());
209        {
210            let cache = self
211                .col_type_cache
212                .lock()
213                .unwrap_or_else(PoisonError::into_inner);
214            if let Some(meta) = cache.get(&cache_key) {
215                return Ok(meta.clone());
216            }
217        }
218        // `format_type` yields a canonical, re-parseable type name, so it can be
219        // dropped straight into a `$n::<type>` cast. `attnotnull` is the column's
220        // NOT NULL constraint — the nullability mapping resolution needs, read
221        // from the same catalog row as the type.
222        let sql = "SELECT format_type(a.atttypid, a.atttypmod) AS sql_type, a.attnotnull AS not_null \
223                   FROM pg_attribute a \
224                   WHERE a.attrelid = $1::regclass AND a.attname = $2 \
225                     AND a.attnum > 0 AND NOT a.attisdropped";
226        let row = sqlx::query(sql)
227            .bind(format!("{schema}.{table}"))
228            .bind(column.as_ref().to_owned())
229            .fetch_optional(&self.pool)
230            .await
231            .map_err(query_err)?;
232        let meta = match row {
233            Some(row) => {
234                let sql_type: String = row.try_get("sql_type").map_err(query_err)?;
235                let not_null: bool = row.try_get("not_null").map_err(query_err)?;
236                ColumnMeta {
237                    sql_type,
238                    nullable: !not_null,
239                }
240            }
241            None => {
242                return Err(SourceError::Query(format!(
243                    "references unknown column `{schema}.{table}.{column}`"
244                )));
245            }
246        };
247        self.col_type_cache
248            .lock()
249            .unwrap_or_else(PoisonError::into_inner)
250            .insert(cache_key, meta.clone());
251        Ok(meta)
252    }
253
254    /// Resolve the SQL type of every column a value filter compares against,
255    /// keyed by `(table, column)`, so the document query can cast each operand
256    /// to its column's type. Covers relation filters at any depth and the
257    /// root-table columns named by a soft-delete `when`.
258    async fn filter_column_types(
259        &self,
260        schema: &IndexSchema,
261    ) -> Result<HashMap<(String, String), String>> {
262        let mut columns = Vec::new();
263        fields::collect_filter_columns(&schema.fields, &mut columns);
264
265        // Soft-delete `when` filters and root filters run against the root table.
266        let when = match &schema.soft_delete {
267            Some(SoftDelete::Column(c)) => c.when.as_deref(),
268            Some(SoftDelete::Field(f)) => f.when.as_deref(),
269            None => None,
270        };
271        let root_filters = schema.filters.as_deref().unwrap_or_default();
272        for filter in when.unwrap_or_default().iter().chain(root_filters) {
273            if let Filter::ValueOp(value_op) = filter {
274                columns.push((&schema.table, &value_op.column));
275            }
276        }
277
278        let mut types = HashMap::new();
279        for (table, column) in columns {
280            let key = (table.to_string(), column.to_string());
281            if types.contains_key(&key) {
282                continue;
283            }
284            let sql_type = self.column_type(&schema.db_schema, table, column).await?;
285            types.insert(key, sql_type);
286        }
287        Ok(types)
288    }
289}
290
291/// The Postgres source's view of its own catalog. The index mapping is derived
292/// from the self-describing schema in [`schema_core`]; this is the one
293/// store-specific piece used for *validation* — how Postgres types and
294/// constrains a column — so a declared schema can be checked against the live
295/// database.
296#[async_trait]
297impl Catalog for PgDocumentBuilder {
298    async fn column(
299        &self,
300        schema: &DatabaseSchema,
301        table: &TableName,
302        column: &ColumnName,
303    ) -> Result<ColumnInfo> {
304        let meta = self.column_meta(schema, table, column).await?;
305        Ok(ColumnInfo {
306            sql_type: meta.sql_type,
307            nullable: meta.nullable,
308        })
309    }
310}
311
312#[async_trait]
313impl DocumentBuilder for PgDocumentBuilder {
314    #[tracing::instrument(
315        name = "pg.resolve",
316        level = "debug",
317        skip_all,
318        fields(table = table.as_ref()),
319        err,
320    )]
321    async fn resolve(&self, table: &TableName, key: &RowKey) -> Result<Vec<DocumentId>> {
322        let mut ids = Vec::new();
323        for (name, schema) in self.spec.indexes() {
324            // Change on the document's own root table: the key is the id.
325            if schema.table == *table {
326                ids.push(DocumentId {
327                    index: name.clone(),
328                    key: key.clone(),
329                });
330                continue;
331            }
332
333            // Change on a related table: resolve every path back to the root.
334            let mut paths = Vec::new();
335            let mut prefix = Vec::new();
336            find_paths(&schema.fields, table, &mut prefix, &mut paths);
337            if paths.is_empty() {
338                continue;
339            }
340            let Some(pk_column) = schema.primary_key.clone() else {
341                tracing::warn!(
342                    index = %name, table = %table,
343                    "cannot reverse-resolve: index has no primary_key",
344                );
345                continue;
346            };
347
348            let mut seen = HashSet::new();
349            for path in &paths {
350                for root in self.resolve_path(schema, table, key, path).await? {
351                    if seen.insert(root.clone()) {
352                        ids.push(DocumentId {
353                            index: name.clone(),
354                            key: RowKey(vec![(pk_column.clone(), root)]),
355                        });
356                    }
357                }
358            }
359        }
360        tracing::trace!(documents = ids.len(), "resolved affected documents");
361        Ok(ids)
362    }
363
364    #[tracing::instrument(
365        name = "pg.build",
366        level = "debug",
367        skip_all,
368        fields(index = id.index.as_ref()),
369        err,
370    )]
371    async fn build(&self, id: &DocumentId) -> Result<Document> {
372        let schema = self
373            .spec
374            .schema(&id.index)
375            .ok_or_else(|| SourceError::Query(format!("unknown index `{}`", id.index)))?;
376
377        let pks = self.relation_pks(schema).await?;
378        let col_types = self.filter_column_types(schema).await?;
379        let (sql, params) = query::document_query(schema, &id.key.0, &pks, &col_types)?;
380
381        let mut statement = sqlx::query(sql);
382        for param in &params {
383            statement = query::bind_param(statement, param)?;
384        }
385        let row = statement
386            .fetch_optional(&self.pool)
387            .await
388            .map_err(query_err)?;
389
390        // No row means the root is absent or soft-deleted (both folded into the
391        // query's WHERE) → the document should not exist.
392        match row {
393            None => Ok(Document::Delete { id: id.clone() }),
394            Some(row) => {
395                let document: serde_json::Value = row.try_get("document").map_err(query_err)?;
396                Ok(Document::Upsert {
397                    id: id.clone(),
398                    body: value::json_to_generic(document),
399                })
400            }
401        }
402    }
403
404    #[tracing::instrument(name = "pg.build_many", level = "debug", skip_all, fields(ids = ids.len()))]
405    async fn build_many(&self, ids: &[DocumentId]) -> Result<Vec<Document>> {
406        // Group by index: each index has its own schema, root table, and key.
407        let mut by_index: HashMap<&IndexName, Vec<&DocumentId>> = HashMap::new();
408        for id in ids {
409            by_index.entry(&id.index).or_default().push(id);
410        }
411
412        let mut out = Vec::with_capacity(ids.len());
413        for (index_name, group) in by_index {
414            let schema = self
415                .spec
416                .schema(index_name)
417                .ok_or_else(|| SourceError::Query(format!("unknown index `{index_name}`")))?;
418
419            // The batched query keys the root with `IN (…)` on a single column,
420            // so it needs both a declared single-column primary key and ids that
421            // carry exactly that one key column. Pair each id with its lone key
422            // value; if any id is composite (or the index has no `primary_key`),
423            // fall back to per-document assembly for this group — correct, just
424            // not batched.
425            let keyed: Option<Vec<(&schema_core::GenericValue, &DocumentId)>> = group
426                .iter()
427                .map(|id| match id.key.0.as_slice() {
428                    [(_, value)] => Some((value, *id)),
429                    _ => None,
430                })
431                .collect();
432            let (Some(pk_column), Some(keyed)) = (schema.primary_key.clone(), keyed) else {
433                for id in group {
434                    out.push(self.build(id).await?);
435                }
436                continue;
437            };
438
439            let pks = self.relation_pks(schema).await?;
440            let col_types = self.filter_column_types(schema).await?;
441
442            for chunk in keyed.chunks(BUILD_CHUNK) {
443                let keys: Vec<schema_core::GenericValue> =
444                    chunk.iter().map(|(value, _)| (*value).clone()).collect();
445                let (sql, params) =
446                    query::documents_query(schema, &pk_column, &keys, &pks, &col_types)?;
447
448                let mut statement = sqlx::query(sql);
449                for param in &params {
450                    statement = query::bind_param(statement, param)?;
451                }
452                let rows = statement.fetch_all(&self.pool).await.map_err(query_err)?;
453
454                // Map each returned root key to its assembled body. `doc_key` is
455                // the first column, decoded through the same path live-change
456                // keys take, so it matches the ids' key values exactly.
457                let mut bodies: HashMap<schema_core::GenericValue, schema_core::GenericValue> =
458                    HashMap::with_capacity(rows.len());
459                for row in &rows {
460                    let key = value::first_column_to_generic(row);
461                    let document: serde_json::Value = row.try_get("document").map_err(query_err)?;
462                    bodies.insert(key, value::json_to_generic(document));
463                }
464
465                // Every requested id yields an outcome: a body present in the
466                // result is an upsert; an absent key means the root is gone or
467                // soft-deleted (both fold into the query's WHERE) → a tombstone.
468                for (value, id) in chunk {
469                    let document = match bodies.remove(*value) {
470                        Some(body) => Document::Upsert {
471                            id: (*id).clone(),
472                            body,
473                        },
474                        None => Document::Delete { id: (*id).clone() },
475                    };
476                    out.push(document);
477                }
478            }
479        }
480        Ok(out)
481    }
482
483    fn backfill_scopes(&self) -> Vec<IndexScope> {
484        // A document is keyed by its root row, so the root table alone seeds the
485        // whole index — `build` assembles the joins and aggregates per root row.
486        self.spec
487            .indexes()
488            .map(|(name, schema)| IndexScope {
489                index: name.clone(),
490                root: SnapshotTable {
491                    db_schema: schema.db_schema.clone(),
492                    table: schema.table.clone(),
493                },
494            })
495            .collect()
496    }
497
498    async fn index_mappings(&self) -> Result<Vec<IndexMapping>> {
499        // The schema is self-describing, so the mapping is projected from it
500        // without touching the database.
501        Ok(self.spec.index_mappings())
502    }
503}
504
505pub(super) fn query_err(error: sqlx::Error) -> SourceError {
506    SourceError::Query(error.to_string())
507}
508
509/// Primary-key column names of a table, in index order. `$1` binds the
510/// qualified `schema.table` (cast to `regclass`).
511pub(crate) const PRIMARY_KEY_SQL: &str = "SELECT a.attname AS name \
512     FROM pg_index i \
513     JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) \
514     WHERE i.indrelid = $1::regclass AND i.indisprimary";
515
516/// Fetch the raw primary-key column-name strings for the table `qualified`
517/// names (e.g. `"public"."users"` or `public.users`). Callers apply their own
518/// policy for an invalid name, a missing key, or a composite key.
519pub(crate) async fn primary_key_column_names(
520    pool: &PgPool,
521    qualified: String,
522) -> Result<Vec<String>> {
523    let rows = sqlx::query(PRIMARY_KEY_SQL)
524        .bind(qualified)
525        .fetch_all(pool)
526        .await
527        .map_err(query_err)?;
528    let mut names = Vec::with_capacity(rows.len());
529    for row in &rows {
530        names.push(row.try_get::<String, _>("name").map_err(query_err)?);
531    }
532    Ok(names)
533}