Skip to main content

sources_postgres/document/
mod.rs

1//! [`PgDocumentBuilder`] — the read half of the Postgres source.
2//!
3//! Resolves which documents a changed row affects and assembles them from the
4//! schema. The work is split across this module:
5//!
6//! - [`fields`] — pure traversal of the index's field tree.
7//! - [`resolve`] — reverse resolution: changed row → affected document keys.
8//! - [`query`] — SQL generation (the server-side document query, reverse
9//!   queries, parameter binding).
10//! - [`value`] — decoding Postgres results into the value tree.
11//!
12//! ## Assembly happens in Postgres
13//!
14//! [`build`](PgDocumentBuilder::build) issues **one** query per document: the
15//! whole nested document is assembled server-side with `json_build_object` /
16//! `json_agg` and correlated subqueries (see [`query`]). Nested relations don't
17//! trigger extra round-trips, so there is no N+1. Existence and soft-delete
18//! fold into the query's `WHERE`, so a missing or deleted row simply returns no
19//! row → a tombstone.
20//!
21//! ## Coverage
22//!
23//! - Resolution: root table; direct foreign-key relations (`has_one`/
24//!   `has_many`); parent-side-key relations (`belongs_to`, resolved against the
25//!   parent table — so a change to, or deletion of, the *target* row re-emits
26//!   every document pointing at it); many-to-many (`through`) relations on
27//!   either the far or junction table; and tables reachable through multiple
28//!   hops of nesting, chained back to the root.
29//! - Assembly: column fields (transforms, defaults); belongs_to / has_one /
30//!   has_many / many_to_many joins (filters, ordering, limit); joins nested
31//!   inside joins; aggregates, including over a junction; boolean and timestamp
32//!   soft-delete with optional `when` filters.
33//!
34//! Relation targets are matched on each table's real primary key, looked up
35//! from the Postgres catalog and cached (see [`PgDocumentBuilder::table_primary_key`]).
36//! The index's own root key comes from its declared `primary_key`.
37//!
38//! ## Remaining limits
39//!
40//! A child-row *delete* on a related table can't be reverse-resolved from a
41//! key-only change (the row is already gone); this follows from the thin-event
42//! CDC design. Multi-hop reverse resolution issues one query per hop.
43
44mod fields;
45mod query;
46mod resolve;
47pub(crate) mod value;
48
49use std::collections::{HashMap, HashSet};
50use std::sync::{Arc, Mutex, PoisonError};
51
52use async_trait::async_trait;
53use schema_core::{
54    ColumnName, DatabaseSchema, Filter, IndexMapping, IndexName, IndexSchema, SoftDelete, TableName,
55};
56use sources_core::document::{Document, DocumentBuilder, DocumentId, IndexScope};
57use sources_core::{Catalog, ColumnInfo, Result, RowKey, SnapshotTable, SourceError, SourceSpec};
58use sqlx::{PgPool, Row};
59
60use fields::find_paths;
61
62/// Cache of each `(schema, table, column)`'s catalog metadata.
63type ColTypeCache = HashMap<(String, String, String), ColumnMeta>;
64
65/// Most keys per batched `build_many` query. Bounds the SQL length and the
66/// prepared-statement cache churn from the `IN (…)` list growing with key
67/// count; larger id sets are split across several round-trips.
68const BUILD_CHUNK: usize = 512;
69
70/// What the Postgres catalog says about a column: its cast-ready SQL type and
71/// whether it admits null. Fetched once per column and cached — both the
72/// document query (which needs the type to cast operands) and mapping resolution
73/// (which needs the type and nullability) read from the same lookup.
74#[derive(Debug, Clone)]
75struct ColumnMeta {
76    sql_type: String,
77    nullable: bool,
78}
79
80/// Builds index documents from a Postgres database, driven by a [`SourceSpec`] —
81/// the enabled indexes and their schemas, translated from the top-level config
82/// by the composition root. Cheap to clone — the pool, spec, and primary-key
83/// cache are shared.
84#[derive(Debug, Clone)]
85pub struct PgDocumentBuilder {
86    pool: PgPool,
87    spec: Arc<SourceSpec>,
88    /// Cache of each `(schema, table)`'s single-column primary key.
89    pk_cache: Arc<Mutex<HashMap<(String, String), ColumnName>>>,
90    /// Cache of each `(schema, table, column)`'s SQL type, used to cast filter
91    /// operands to the column's real type rather than comparing as text.
92    col_type_cache: Arc<Mutex<ColTypeCache>>,
93}
94
95impl PgDocumentBuilder {
96    pub fn new(pool: PgPool, spec: Arc<SourceSpec>) -> Self {
97        Self {
98            pool,
99            spec,
100            pk_cache: Arc::new(Mutex::new(HashMap::new())),
101            col_type_cache: Arc::new(Mutex::new(HashMap::new())),
102        }
103    }
104
105    #[tracing::instrument(name = "pg.connect", skip_all, err)]
106    pub async fn connect(connection_url: &str, spec: Arc<SourceSpec>) -> Result<Self> {
107        let pool = sqlx::postgres::PgPoolOptions::new()
108            .connect(connection_url)
109            .await
110            .map_err(|e| SourceError::Connection(e.to_string()))?;
111        tracing::info!(indexes = spec.indexes().count(), "connected to Postgres");
112        Ok(Self::new(pool, spec))
113    }
114
115    /// The single-column primary key of a table, from the Postgres catalog
116    /// (cached). Relations match against this, so a composite or missing
117    /// primary key is an error.
118    pub(super) async fn table_primary_key(
119        &self,
120        schema: &DatabaseSchema,
121        table: &TableName,
122    ) -> Result<ColumnName> {
123        let cache_key = (schema.to_string(), table.to_string());
124        {
125            let cache = self.pk_cache.lock().unwrap_or_else(PoisonError::into_inner);
126            if let Some(column) = cache.get(&cache_key) {
127                return Ok(column.clone());
128            }
129        }
130        let column = match self.fetch_primary_key(schema, table).await?.as_slice() {
131            [single] => single.clone(),
132            [] => {
133                return Err(SourceError::Query(format!(
134                    "table `{schema}.{table}` has no primary key"
135                )));
136            }
137            _ => {
138                return Err(SourceError::Unsupported(format!(
139                    "table `{schema}.{table}` has a composite primary key; relations require a single-column key"
140                )));
141            }
142        };
143        self.pk_cache
144            .lock()
145            .unwrap_or_else(PoisonError::into_inner)
146            .insert(cache_key, column.clone());
147        Ok(column)
148    }
149
150    async fn fetch_primary_key(
151        &self,
152        schema: &DatabaseSchema,
153        table: &TableName,
154    ) -> Result<Vec<ColumnName>> {
155        let names = primary_key_column_names(&self.pool, format!("{schema}.{table}")).await?;
156        names
157            .into_iter()
158            .map(|name| {
159                ColumnName::try_new(name)
160                    .map_err(|e| SourceError::Query(format!("invalid primary key column: {e}")))
161            })
162            .collect()
163    }
164
165    /// Resolve every relation table's primary key up front (cached), so the
166    /// document query can correlate and join through them.
167    async fn relation_pks(
168        &self,
169        schema: &schema_core::IndexSchema,
170    ) -> Result<HashMap<String, ColumnName>> {
171        let mut tables = Vec::new();
172        fields::collect_relation_tables(&schema.fields, &mut tables);
173        let unique: HashSet<&TableName> = tables.iter().collect();
174        let mut pks = HashMap::new();
175        for table in unique {
176            pks.insert(
177                table.to_string(),
178                self.table_primary_key(&schema.db_schema, table).await?,
179            );
180        }
181        Ok(pks)
182    }
183
184    /// The SQL type of a column, as a cast-ready name from the Postgres catalog
185    /// (e.g. `numeric`, `integer`, `timestamp with time zone`). A thin view over
186    /// [`column_meta`](Self::column_meta) for callers that only need the type to
187    /// cast a query operand.
188    pub(super) async fn column_type(
189        &self,
190        schema: &DatabaseSchema,
191        table: &TableName,
192        column: &ColumnName,
193    ) -> Result<String> {
194        Ok(self.column_meta(schema, table, column).await?.sql_type)
195    }
196
197    /// The Postgres catalog's view of a column — its cast-ready SQL type and
198    /// whether it admits null — cached. An unknown column is an error: a field or
199    /// filter naming a column that does not exist is a misconfiguration.
200    async fn column_meta(
201        &self,
202        schema: &DatabaseSchema,
203        table: &TableName,
204        column: &ColumnName,
205    ) -> Result<ColumnMeta> {
206        let cache_key = (schema.to_string(), table.to_string(), column.to_string());
207        {
208            let cache = self
209                .col_type_cache
210                .lock()
211                .unwrap_or_else(PoisonError::into_inner);
212            if let Some(meta) = cache.get(&cache_key) {
213                return Ok(meta.clone());
214            }
215        }
216        // `format_type` yields a canonical, re-parseable type name, so it can be
217        // dropped straight into a `$n::<type>` cast. `attnotnull` is the column's
218        // NOT NULL constraint — the nullability mapping resolution needs, read
219        // from the same catalog row as the type.
220        let sql = "SELECT format_type(a.atttypid, a.atttypmod) AS sql_type, a.attnotnull AS not_null \
221                   FROM pg_attribute a \
222                   WHERE a.attrelid = $1::regclass AND a.attname = $2 \
223                     AND a.attnum > 0 AND NOT a.attisdropped";
224        let row = sqlx::query(sql)
225            .bind(format!("{schema}.{table}"))
226            .bind(column.as_ref().to_owned())
227            .fetch_optional(&self.pool)
228            .await
229            .map_err(query_err)?;
230        let meta = match row {
231            Some(row) => {
232                let sql_type: String = row.try_get("sql_type").map_err(query_err)?;
233                let not_null: bool = row.try_get("not_null").map_err(query_err)?;
234                ColumnMeta {
235                    sql_type,
236                    nullable: !not_null,
237                }
238            }
239            None => {
240                return Err(SourceError::Query(format!(
241                    "references unknown column `{schema}.{table}.{column}`"
242                )));
243            }
244        };
245        self.col_type_cache
246            .lock()
247            .unwrap_or_else(PoisonError::into_inner)
248            .insert(cache_key, meta.clone());
249        Ok(meta)
250    }
251
252    /// Resolve the SQL type of every column a value filter compares against,
253    /// keyed by `(table, column)`, so the document query can cast each operand
254    /// to its column's type. Covers relation filters at any depth and the
255    /// root-table columns named by a soft-delete `when`.
256    async fn filter_column_types(
257        &self,
258        schema: &IndexSchema,
259    ) -> Result<HashMap<(String, String), String>> {
260        let mut columns = Vec::new();
261        fields::collect_filter_columns(&schema.fields, &mut columns);
262
263        // Soft-delete `when` filters and root filters run against the root table.
264        let when = match &schema.soft_delete {
265            Some(SoftDelete::Column(c)) => c.when.as_deref(),
266            Some(SoftDelete::Field(f)) => f.when.as_deref(),
267            None => None,
268        };
269        let root_filters = schema.filters.as_deref().unwrap_or_default();
270        for filter in when.unwrap_or_default().iter().chain(root_filters) {
271            if let Filter::ValueOp(value_op) = filter {
272                columns.push((&schema.table, &value_op.column));
273            }
274        }
275
276        let mut types = HashMap::new();
277        for (table, column) in columns {
278            let key = (table.to_string(), column.to_string());
279            if types.contains_key(&key) {
280                continue;
281            }
282            let sql_type = self.column_type(&schema.db_schema, table, column).await?;
283            types.insert(key, sql_type);
284        }
285        Ok(types)
286    }
287}
288
289/// The Postgres source's view of its own catalog. The index mapping is derived
290/// from the self-describing schema in [`schema_core`]; this is the one
291/// store-specific piece used for *validation* — how Postgres types and
292/// constrains a column — so a declared schema can be checked against the live
293/// database.
294#[async_trait]
295impl Catalog for PgDocumentBuilder {
296    async fn column(
297        &self,
298        schema: &DatabaseSchema,
299        table: &TableName,
300        column: &ColumnName,
301    ) -> Result<ColumnInfo> {
302        let meta = self.column_meta(schema, table, column).await?;
303        Ok(ColumnInfo {
304            sql_type: meta.sql_type,
305            nullable: meta.nullable,
306        })
307    }
308}
309
310#[async_trait]
311impl DocumentBuilder for PgDocumentBuilder {
312    #[tracing::instrument(
313        name = "pg.resolve",
314        level = "debug",
315        skip_all,
316        fields(table = table.as_ref()),
317        err,
318    )]
319    async fn resolve(&self, table: &TableName, key: &RowKey) -> Result<Vec<DocumentId>> {
320        let mut ids = Vec::new();
321        for (name, schema) in self.spec.indexes() {
322            if schema.table == *table {
323                ids.push(DocumentId {
324                    index: name.clone(),
325                    key: key.clone(),
326                });
327                continue;
328            }
329
330            let mut paths = Vec::new();
331            let mut prefix = Vec::new();
332            find_paths(&schema.fields, table, &mut prefix, &mut paths);
333            if paths.is_empty() {
334                continue;
335            }
336            let Some(pk_column) = schema.primary_key.clone() else {
337                tracing::warn!(
338                    index = %name, table = %table,
339                    "cannot reverse-resolve: index has no primary_key",
340                );
341                continue;
342            };
343
344            let mut seen = HashSet::new();
345            for path in &paths {
346                for root in self.resolve_path(schema, table, key, path).await? {
347                    if seen.insert(root.clone()) {
348                        ids.push(DocumentId {
349                            index: name.clone(),
350                            key: RowKey(vec![(pk_column.clone(), root)]),
351                        });
352                    }
353                }
354            }
355        }
356        tracing::trace!(documents = ids.len(), "resolved affected documents");
357        Ok(ids)
358    }
359
360    #[tracing::instrument(
361        name = "pg.build",
362        level = "debug",
363        skip_all,
364        fields(index = id.index.as_ref()),
365        err,
366    )]
367    async fn build(&self, id: &DocumentId) -> Result<Document> {
368        let schema = self
369            .spec
370            .schema(&id.index)
371            .ok_or_else(|| SourceError::Query(format!("unknown index `{}`", id.index)))?;
372
373        let pks = self.relation_pks(schema).await?;
374        let col_types = self.filter_column_types(schema).await?;
375        let (sql, params) = query::document_query(schema, &id.key.0, &pks, &col_types)?;
376
377        let mut statement = sqlx::query(sql);
378        for param in &params {
379            statement = query::bind_param(statement, param)?;
380        }
381        let row = statement
382            .fetch_optional(&self.pool)
383            .await
384            .map_err(query_err)?;
385
386        // No row means the root is absent or soft-deleted (both folded into the
387        // query's WHERE) → the document should not exist.
388        match row {
389            None => Ok(Document::Delete { id: id.clone() }),
390            Some(row) => {
391                let document: serde_json::Value = row.try_get("document").map_err(query_err)?;
392                Ok(Document::Upsert {
393                    id: id.clone(),
394                    body: value::json_to_generic(document),
395                })
396            }
397        }
398    }
399
400    #[tracing::instrument(name = "pg.build_many", level = "debug", skip_all, fields(ids = ids.len()))]
401    async fn build_many(&self, ids: &[DocumentId]) -> Result<Vec<Document>> {
402        let mut by_index: HashMap<&IndexName, Vec<&DocumentId>> = HashMap::new();
403        for id in ids {
404            by_index.entry(&id.index).or_default().push(id);
405        }
406
407        let mut out = Vec::with_capacity(ids.len());
408        for (index_name, group) in by_index {
409            let schema = self
410                .spec
411                .schema(index_name)
412                .ok_or_else(|| SourceError::Query(format!("unknown index `{index_name}`")))?;
413
414            // The batched query keys the root with `IN (…)` on a single column,
415            // so it needs both a declared single-column primary key and ids that
416            // carry exactly that one key column. Pair each id with its lone key
417            // value; if any id is composite (or the index has no `primary_key`),
418            // fall back to per-document assembly for this group — correct, just
419            // not batched.
420            let keyed: Option<Vec<(&schema_core::GenericValue, &DocumentId)>> = group
421                .iter()
422                .map(|id| match id.key.0.as_slice() {
423                    [(_, value)] => Some((value, *id)),
424                    _ => None,
425                })
426                .collect();
427            let (Some(pk_column), Some(keyed)) = (schema.primary_key.clone(), keyed) else {
428                for id in group {
429                    out.push(self.build(id).await?);
430                }
431                continue;
432            };
433
434            let pks = self.relation_pks(schema).await?;
435            let col_types = self.filter_column_types(schema).await?;
436
437            for chunk in keyed.chunks(BUILD_CHUNK) {
438                let keys: Vec<schema_core::GenericValue> =
439                    chunk.iter().map(|(value, _)| (*value).clone()).collect();
440                let (sql, params) =
441                    query::documents_query(schema, &pk_column, &keys, &pks, &col_types)?;
442
443                let mut statement = sqlx::query(sql);
444                for param in &params {
445                    statement = query::bind_param(statement, param)?;
446                }
447                let rows = statement.fetch_all(&self.pool).await.map_err(query_err)?;
448
449                // Map each returned root key to its assembled body. `doc_key` is
450                // the first column, decoded through the same path live-change
451                // keys take, so it matches the ids' key values exactly.
452                let mut bodies: HashMap<schema_core::GenericValue, schema_core::GenericValue> =
453                    HashMap::with_capacity(rows.len());
454                for row in &rows {
455                    let key = value::first_column_to_generic(row);
456                    let document: serde_json::Value = row.try_get("document").map_err(query_err)?;
457                    bodies.insert(key, value::json_to_generic(document));
458                }
459
460                // Every requested id yields an outcome: a body present in the
461                // result is an upsert; an absent key means the root is gone or
462                // soft-deleted (both fold into the query's WHERE) → a tombstone.
463                for (value, id) in chunk {
464                    let document = match bodies.remove(*value) {
465                        Some(body) => Document::Upsert {
466                            id: (*id).clone(),
467                            body,
468                        },
469                        None => Document::Delete { id: (*id).clone() },
470                    };
471                    out.push(document);
472                }
473            }
474        }
475        Ok(out)
476    }
477
478    fn backfill_scopes(&self) -> Vec<IndexScope> {
479        // A document is keyed by its root row, so the root table alone seeds the
480        // whole index — `build` assembles the joins and aggregates per root row.
481        self.spec
482            .indexes()
483            .map(|(name, schema)| IndexScope {
484                index: name.clone(),
485                root: SnapshotTable {
486                    db_schema: schema.db_schema.clone(),
487                    table: schema.table.clone(),
488                },
489            })
490            .collect()
491    }
492
493    async fn index_mappings(&self) -> Result<Vec<IndexMapping>> {
494        // The schema is self-describing, so the mapping is projected from it
495        // without touching the database.
496        Ok(self.spec.index_mappings())
497    }
498}
499
500pub(super) fn query_err(error: sqlx::Error) -> SourceError {
501    SourceError::Query(error.to_string())
502}
503
504/// Primary-key column names of a table, in index order. `$1` binds the
505/// qualified `schema.table` (cast to `regclass`).
506pub(crate) const PRIMARY_KEY_SQL: &str = "SELECT a.attname AS name \
507     FROM pg_index i \
508     JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) \
509     WHERE i.indrelid = $1::regclass AND i.indisprimary";
510
511/// Fetch the raw primary-key column-name strings for the table `qualified`
512/// names (e.g. `"public"."users"` or `public.users`). Callers apply their own
513/// policy for an invalid name, a missing key, or a composite key.
514pub(crate) async fn primary_key_column_names(
515    pool: &PgPool,
516    qualified: String,
517) -> Result<Vec<String>> {
518    let rows = sqlx::query(PRIMARY_KEY_SQL)
519        .bind(qualified)
520        .fetch_all(pool)
521        .await
522        .map_err(query_err)?;
523    let mut names = Vec::with_capacity(rows.len());
524    for row in &rows {
525        names.push(row.try_get::<String, _>("name").map_err(query_err)?);
526    }
527    Ok(names)
528}