1mod fields;
45mod query;
46mod resolve;
47pub(crate) mod value;
48
49use std::collections::{HashMap, HashSet};
50use std::sync::{Arc, Mutex, PoisonError};
51
52use async_trait::async_trait;
53use schema_core::{
54 ColumnName, DatabaseSchema, Filter, IndexMapping, IndexName, IndexSchema, SoftDelete, TableName,
55};
56use sources_core::document::{Document, DocumentBuilder, DocumentId, IndexScope};
57use sources_core::{Catalog, ColumnInfo, Result, RowKey, SnapshotTable, SourceError, SourceSpec};
58use sqlx::{PgPool, Row};
59
60use fields::find_paths;
61
62type ColTypeCache = HashMap<(String, String, String), ColumnMeta>;
64
65const BUILD_CHUNK: usize = 512;
69
70#[derive(Debug, Clone)]
75struct ColumnMeta {
76 sql_type: String,
77 nullable: bool,
78}
79
80#[derive(Debug, Clone)]
85pub struct PgDocumentBuilder {
86 pool: PgPool,
87 spec: Arc<SourceSpec>,
88 pk_cache: Arc<Mutex<HashMap<(String, String), ColumnName>>>,
90 col_type_cache: Arc<Mutex<ColTypeCache>>,
93}
94
95impl PgDocumentBuilder {
96 pub fn new(pool: PgPool, spec: Arc<SourceSpec>) -> Self {
98 Self {
99 pool,
100 spec,
101 pk_cache: Arc::new(Mutex::new(HashMap::new())),
102 col_type_cache: Arc::new(Mutex::new(HashMap::new())),
103 }
104 }
105
106 #[tracing::instrument(name = "pg.connect", skip_all, err)]
108 pub async fn connect(connection_url: &str, spec: Arc<SourceSpec>) -> Result<Self> {
109 let pool = sqlx::postgres::PgPoolOptions::new()
110 .connect(connection_url)
111 .await
112 .map_err(|e| SourceError::Connection(e.to_string()))?;
113 tracing::info!(indexes = spec.indexes().count(), "connected to Postgres");
114 Ok(Self::new(pool, spec))
115 }
116
117 pub(super) async fn table_primary_key(
121 &self,
122 schema: &DatabaseSchema,
123 table: &TableName,
124 ) -> Result<ColumnName> {
125 let cache_key = (schema.to_string(), table.to_string());
126 {
127 let cache = self.pk_cache.lock().unwrap_or_else(PoisonError::into_inner);
128 if let Some(column) = cache.get(&cache_key) {
129 return Ok(column.clone());
130 }
131 }
132 let column = match self.fetch_primary_key(schema, table).await?.as_slice() {
133 [single] => single.clone(),
134 [] => {
135 return Err(SourceError::Query(format!(
136 "table `{schema}.{table}` has no primary key"
137 )));
138 }
139 _ => {
140 return Err(SourceError::Unsupported(format!(
141 "table `{schema}.{table}` has a composite primary key; relations require a single-column key"
142 )));
143 }
144 };
145 self.pk_cache
146 .lock()
147 .unwrap_or_else(PoisonError::into_inner)
148 .insert(cache_key, column.clone());
149 Ok(column)
150 }
151
152 async fn fetch_primary_key(
153 &self,
154 schema: &DatabaseSchema,
155 table: &TableName,
156 ) -> Result<Vec<ColumnName>> {
157 let names = primary_key_column_names(&self.pool, format!("{schema}.{table}")).await?;
158 names
159 .into_iter()
160 .map(|name| {
161 ColumnName::try_new(name)
162 .map_err(|e| SourceError::Query(format!("invalid primary key column: {e}")))
163 })
164 .collect()
165 }
166
167 async fn relation_pks(
170 &self,
171 schema: &schema_core::IndexSchema,
172 ) -> Result<HashMap<String, ColumnName>> {
173 let mut tables = Vec::new();
174 fields::collect_relation_tables(&schema.fields, &mut tables);
175 let unique: HashSet<&TableName> = tables.iter().collect();
176 let mut pks = HashMap::new();
177 for table in unique {
178 pks.insert(
179 table.to_string(),
180 self.table_primary_key(&schema.db_schema, table).await?,
181 );
182 }
183 Ok(pks)
184 }
185
186 pub(super) async fn column_type(
191 &self,
192 schema: &DatabaseSchema,
193 table: &TableName,
194 column: &ColumnName,
195 ) -> Result<String> {
196 Ok(self.column_meta(schema, table, column).await?.sql_type)
197 }
198
199 async fn column_meta(
203 &self,
204 schema: &DatabaseSchema,
205 table: &TableName,
206 column: &ColumnName,
207 ) -> Result<ColumnMeta> {
208 let cache_key = (schema.to_string(), table.to_string(), column.to_string());
209 {
210 let cache = self
211 .col_type_cache
212 .lock()
213 .unwrap_or_else(PoisonError::into_inner);
214 if let Some(meta) = cache.get(&cache_key) {
215 return Ok(meta.clone());
216 }
217 }
218 let sql = "SELECT format_type(a.atttypid, a.atttypmod) AS sql_type, a.attnotnull AS not_null \
223 FROM pg_attribute a \
224 WHERE a.attrelid = $1::regclass AND a.attname = $2 \
225 AND a.attnum > 0 AND NOT a.attisdropped";
226 let row = sqlx::query(sql)
227 .bind(format!("{schema}.{table}"))
228 .bind(column.as_ref().to_owned())
229 .fetch_optional(&self.pool)
230 .await
231 .map_err(query_err)?;
232 let meta = match row {
233 Some(row) => {
234 let sql_type: String = row.try_get("sql_type").map_err(query_err)?;
235 let not_null: bool = row.try_get("not_null").map_err(query_err)?;
236 ColumnMeta {
237 sql_type,
238 nullable: !not_null,
239 }
240 }
241 None => {
242 return Err(SourceError::Query(format!(
243 "references unknown column `{schema}.{table}.{column}`"
244 )));
245 }
246 };
247 self.col_type_cache
248 .lock()
249 .unwrap_or_else(PoisonError::into_inner)
250 .insert(cache_key, meta.clone());
251 Ok(meta)
252 }
253
254 async fn filter_column_types(
259 &self,
260 schema: &IndexSchema,
261 ) -> Result<HashMap<(String, String), String>> {
262 let mut columns = Vec::new();
263 fields::collect_filter_columns(&schema.fields, &mut columns);
264
265 let when = match &schema.soft_delete {
267 Some(SoftDelete::Column(c)) => c.when.as_deref(),
268 Some(SoftDelete::Field(f)) => f.when.as_deref(),
269 None => None,
270 };
271 let root_filters = schema.filters.as_deref().unwrap_or_default();
272 for filter in when.unwrap_or_default().iter().chain(root_filters) {
273 if let Filter::ValueOp(value_op) = filter {
274 columns.push((&schema.table, &value_op.column));
275 }
276 }
277
278 let mut types = HashMap::new();
279 for (table, column) in columns {
280 let key = (table.to_string(), column.to_string());
281 if types.contains_key(&key) {
282 continue;
283 }
284 let sql_type = self.column_type(&schema.db_schema, table, column).await?;
285 types.insert(key, sql_type);
286 }
287 Ok(types)
288 }
289}
290
291#[async_trait]
297impl Catalog for PgDocumentBuilder {
298 async fn column(
299 &self,
300 schema: &DatabaseSchema,
301 table: &TableName,
302 column: &ColumnName,
303 ) -> Result<ColumnInfo> {
304 let meta = self.column_meta(schema, table, column).await?;
305 Ok(ColumnInfo {
306 sql_type: meta.sql_type,
307 nullable: meta.nullable,
308 })
309 }
310}
311
312#[async_trait]
313impl DocumentBuilder for PgDocumentBuilder {
314 #[tracing::instrument(
315 name = "pg.resolve",
316 level = "debug",
317 skip_all,
318 fields(table = table.as_ref()),
319 err,
320 )]
321 async fn resolve(&self, table: &TableName, key: &RowKey) -> Result<Vec<DocumentId>> {
322 let mut ids = Vec::new();
323 for (name, schema) in self.spec.indexes() {
324 if schema.table == *table {
326 ids.push(DocumentId {
327 index: name.clone(),
328 key: key.clone(),
329 });
330 continue;
331 }
332
333 let mut paths = Vec::new();
335 let mut prefix = Vec::new();
336 find_paths(&schema.fields, table, &mut prefix, &mut paths);
337 if paths.is_empty() {
338 continue;
339 }
340 let Some(pk_column) = schema.primary_key.clone() else {
341 tracing::warn!(
342 index = %name, table = %table,
343 "cannot reverse-resolve: index has no primary_key",
344 );
345 continue;
346 };
347
348 let mut seen = HashSet::new();
349 for path in &paths {
350 for root in self.resolve_path(schema, table, key, path).await? {
351 if seen.insert(root.clone()) {
352 ids.push(DocumentId {
353 index: name.clone(),
354 key: RowKey(vec![(pk_column.clone(), root)]),
355 });
356 }
357 }
358 }
359 }
360 tracing::trace!(documents = ids.len(), "resolved affected documents");
361 Ok(ids)
362 }
363
364 #[tracing::instrument(
365 name = "pg.build",
366 level = "debug",
367 skip_all,
368 fields(index = id.index.as_ref()),
369 err,
370 )]
371 async fn build(&self, id: &DocumentId) -> Result<Document> {
372 let schema = self
373 .spec
374 .schema(&id.index)
375 .ok_or_else(|| SourceError::Query(format!("unknown index `{}`", id.index)))?;
376
377 let pks = self.relation_pks(schema).await?;
378 let col_types = self.filter_column_types(schema).await?;
379 let (sql, params) = query::document_query(schema, &id.key.0, &pks, &col_types)?;
380
381 let mut statement = sqlx::query(sql);
382 for param in ¶ms {
383 statement = query::bind_param(statement, param)?;
384 }
385 let row = statement
386 .fetch_optional(&self.pool)
387 .await
388 .map_err(query_err)?;
389
390 match row {
393 None => Ok(Document::Delete { id: id.clone() }),
394 Some(row) => {
395 let document: serde_json::Value = row.try_get("document").map_err(query_err)?;
396 Ok(Document::Upsert {
397 id: id.clone(),
398 body: value::json_to_generic(document),
399 })
400 }
401 }
402 }
403
404 #[tracing::instrument(name = "pg.build_many", level = "debug", skip_all, fields(ids = ids.len()))]
405 async fn build_many(&self, ids: &[DocumentId]) -> Result<Vec<Document>> {
406 let mut by_index: HashMap<&IndexName, Vec<&DocumentId>> = HashMap::new();
408 for id in ids {
409 by_index.entry(&id.index).or_default().push(id);
410 }
411
412 let mut out = Vec::with_capacity(ids.len());
413 for (index_name, group) in by_index {
414 let schema = self
415 .spec
416 .schema(index_name)
417 .ok_or_else(|| SourceError::Query(format!("unknown index `{index_name}`")))?;
418
419 let keyed: Option<Vec<(&schema_core::GenericValue, &DocumentId)>> = group
426 .iter()
427 .map(|id| match id.key.0.as_slice() {
428 [(_, value)] => Some((value, *id)),
429 _ => None,
430 })
431 .collect();
432 let (Some(pk_column), Some(keyed)) = (schema.primary_key.clone(), keyed) else {
433 for id in group {
434 out.push(self.build(id).await?);
435 }
436 continue;
437 };
438
439 let pks = self.relation_pks(schema).await?;
440 let col_types = self.filter_column_types(schema).await?;
441
442 for chunk in keyed.chunks(BUILD_CHUNK) {
443 let keys: Vec<schema_core::GenericValue> =
444 chunk.iter().map(|(value, _)| (*value).clone()).collect();
445 let (sql, params) =
446 query::documents_query(schema, &pk_column, &keys, &pks, &col_types)?;
447
448 let mut statement = sqlx::query(sql);
449 for param in ¶ms {
450 statement = query::bind_param(statement, param)?;
451 }
452 let rows = statement.fetch_all(&self.pool).await.map_err(query_err)?;
453
454 let mut bodies: HashMap<schema_core::GenericValue, schema_core::GenericValue> =
458 HashMap::with_capacity(rows.len());
459 for row in &rows {
460 let key = value::first_column_to_generic(row);
461 let document: serde_json::Value = row.try_get("document").map_err(query_err)?;
462 bodies.insert(key, value::json_to_generic(document));
463 }
464
465 for (value, id) in chunk {
469 let document = match bodies.remove(*value) {
470 Some(body) => Document::Upsert {
471 id: (*id).clone(),
472 body,
473 },
474 None => Document::Delete { id: (*id).clone() },
475 };
476 out.push(document);
477 }
478 }
479 }
480 Ok(out)
481 }
482
483 fn backfill_scopes(&self) -> Vec<IndexScope> {
484 self.spec
487 .indexes()
488 .map(|(name, schema)| IndexScope {
489 index: name.clone(),
490 root: SnapshotTable {
491 db_schema: schema.db_schema.clone(),
492 table: schema.table.clone(),
493 },
494 })
495 .collect()
496 }
497
498 async fn index_mappings(&self) -> Result<Vec<IndexMapping>> {
499 Ok(self.spec.index_mappings())
502 }
503}
504
505pub(super) fn query_err(error: sqlx::Error) -> SourceError {
506 SourceError::Query(error.to_string())
507}
508
509pub(crate) const PRIMARY_KEY_SQL: &str = "SELECT a.attname AS name \
512 FROM pg_index i \
513 JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) \
514 WHERE i.indrelid = $1::regclass AND i.indisprimary";
515
516pub(crate) async fn primary_key_column_names(
520 pool: &PgPool,
521 qualified: String,
522) -> Result<Vec<String>> {
523 let rows = sqlx::query(PRIMARY_KEY_SQL)
524 .bind(qualified)
525 .fetch_all(pool)
526 .await
527 .map_err(query_err)?;
528 let mut names = Vec::with_capacity(rows.len());
529 for row in &rows {
530 names.push(row.try_get::<String, _>("name").map_err(query_err)?);
531 }
532 Ok(names)
533}