1mod fields;
45mod query;
46mod resolve;
47pub(crate) mod value;
48
49use std::collections::{HashMap, HashSet};
50use std::sync::{Arc, Mutex, PoisonError};
51
52use async_trait::async_trait;
53use schema_core::{
54 ColumnName, DatabaseSchema, Filter, IndexMapping, IndexName, IndexSchema, SoftDelete, TableName,
55};
56use sources_core::document::{Document, DocumentBuilder, DocumentId, IndexScope};
57use sources_core::{Catalog, ColumnInfo, Result, RowKey, SnapshotTable, SourceError, SourceSpec};
58use sqlx::{PgPool, Row};
59
60use fields::find_paths;
61
62type ColTypeCache = HashMap<(String, String, String), ColumnMeta>;
64
65const BUILD_CHUNK: usize = 512;
69
70#[derive(Debug, Clone)]
75struct ColumnMeta {
76 sql_type: String,
77 nullable: bool,
78}
79
80#[derive(Debug, Clone)]
85pub struct PgDocumentBuilder {
86 pool: PgPool,
87 spec: Arc<SourceSpec>,
88 pk_cache: Arc<Mutex<HashMap<(String, String), ColumnName>>>,
90 col_type_cache: Arc<Mutex<ColTypeCache>>,
93}
94
95impl PgDocumentBuilder {
96 pub fn new(pool: PgPool, spec: Arc<SourceSpec>) -> Self {
97 Self {
98 pool,
99 spec,
100 pk_cache: Arc::new(Mutex::new(HashMap::new())),
101 col_type_cache: Arc::new(Mutex::new(HashMap::new())),
102 }
103 }
104
105 #[tracing::instrument(name = "pg.connect", skip_all, err)]
106 pub async fn connect(connection_url: &str, spec: Arc<SourceSpec>) -> Result<Self> {
107 let pool = sqlx::postgres::PgPoolOptions::new()
108 .connect(connection_url)
109 .await
110 .map_err(|e| SourceError::Connection(e.to_string()))?;
111 tracing::info!(indexes = spec.indexes().count(), "connected to Postgres");
112 Ok(Self::new(pool, spec))
113 }
114
115 pub(super) async fn table_primary_key(
119 &self,
120 schema: &DatabaseSchema,
121 table: &TableName,
122 ) -> Result<ColumnName> {
123 let cache_key = (schema.to_string(), table.to_string());
124 {
125 let cache = self.pk_cache.lock().unwrap_or_else(PoisonError::into_inner);
126 if let Some(column) = cache.get(&cache_key) {
127 return Ok(column.clone());
128 }
129 }
130 let column = match self.fetch_primary_key(schema, table).await?.as_slice() {
131 [single] => single.clone(),
132 [] => {
133 return Err(SourceError::Query(format!(
134 "table `{schema}.{table}` has no primary key"
135 )));
136 }
137 _ => {
138 return Err(SourceError::Unsupported(format!(
139 "table `{schema}.{table}` has a composite primary key; relations require a single-column key"
140 )));
141 }
142 };
143 self.pk_cache
144 .lock()
145 .unwrap_or_else(PoisonError::into_inner)
146 .insert(cache_key, column.clone());
147 Ok(column)
148 }
149
150 async fn fetch_primary_key(
151 &self,
152 schema: &DatabaseSchema,
153 table: &TableName,
154 ) -> Result<Vec<ColumnName>> {
155 let names = primary_key_column_names(&self.pool, format!("{schema}.{table}")).await?;
156 names
157 .into_iter()
158 .map(|name| {
159 ColumnName::try_new(name)
160 .map_err(|e| SourceError::Query(format!("invalid primary key column: {e}")))
161 })
162 .collect()
163 }
164
165 async fn relation_pks(
168 &self,
169 schema: &schema_core::IndexSchema,
170 ) -> Result<HashMap<String, ColumnName>> {
171 let mut tables = Vec::new();
172 fields::collect_relation_tables(&schema.fields, &mut tables);
173 let unique: HashSet<&TableName> = tables.iter().collect();
174 let mut pks = HashMap::new();
175 for table in unique {
176 pks.insert(
177 table.to_string(),
178 self.table_primary_key(&schema.db_schema, table).await?,
179 );
180 }
181 Ok(pks)
182 }
183
184 pub(super) async fn column_type(
189 &self,
190 schema: &DatabaseSchema,
191 table: &TableName,
192 column: &ColumnName,
193 ) -> Result<String> {
194 Ok(self.column_meta(schema, table, column).await?.sql_type)
195 }
196
197 async fn column_meta(
201 &self,
202 schema: &DatabaseSchema,
203 table: &TableName,
204 column: &ColumnName,
205 ) -> Result<ColumnMeta> {
206 let cache_key = (schema.to_string(), table.to_string(), column.to_string());
207 {
208 let cache = self
209 .col_type_cache
210 .lock()
211 .unwrap_or_else(PoisonError::into_inner);
212 if let Some(meta) = cache.get(&cache_key) {
213 return Ok(meta.clone());
214 }
215 }
216 let sql = "SELECT format_type(a.atttypid, a.atttypmod) AS sql_type, a.attnotnull AS not_null \
221 FROM pg_attribute a \
222 WHERE a.attrelid = $1::regclass AND a.attname = $2 \
223 AND a.attnum > 0 AND NOT a.attisdropped";
224 let row = sqlx::query(sql)
225 .bind(format!("{schema}.{table}"))
226 .bind(column.as_ref().to_owned())
227 .fetch_optional(&self.pool)
228 .await
229 .map_err(query_err)?;
230 let meta = match row {
231 Some(row) => {
232 let sql_type: String = row.try_get("sql_type").map_err(query_err)?;
233 let not_null: bool = row.try_get("not_null").map_err(query_err)?;
234 ColumnMeta {
235 sql_type,
236 nullable: !not_null,
237 }
238 }
239 None => {
240 return Err(SourceError::Query(format!(
241 "references unknown column `{schema}.{table}.{column}`"
242 )));
243 }
244 };
245 self.col_type_cache
246 .lock()
247 .unwrap_or_else(PoisonError::into_inner)
248 .insert(cache_key, meta.clone());
249 Ok(meta)
250 }
251
252 async fn filter_column_types(
257 &self,
258 schema: &IndexSchema,
259 ) -> Result<HashMap<(String, String), String>> {
260 let mut columns = Vec::new();
261 fields::collect_filter_columns(&schema.fields, &mut columns);
262
263 let when = match &schema.soft_delete {
265 Some(SoftDelete::Column(c)) => c.when.as_deref(),
266 Some(SoftDelete::Field(f)) => f.when.as_deref(),
267 None => None,
268 };
269 let root_filters = schema.filters.as_deref().unwrap_or_default();
270 for filter in when.unwrap_or_default().iter().chain(root_filters) {
271 if let Filter::ValueOp(value_op) = filter {
272 columns.push((&schema.table, &value_op.column));
273 }
274 }
275
276 let mut types = HashMap::new();
277 for (table, column) in columns {
278 let key = (table.to_string(), column.to_string());
279 if types.contains_key(&key) {
280 continue;
281 }
282 let sql_type = self.column_type(&schema.db_schema, table, column).await?;
283 types.insert(key, sql_type);
284 }
285 Ok(types)
286 }
287}
288
289#[async_trait]
295impl Catalog for PgDocumentBuilder {
296 async fn column(
297 &self,
298 schema: &DatabaseSchema,
299 table: &TableName,
300 column: &ColumnName,
301 ) -> Result<ColumnInfo> {
302 let meta = self.column_meta(schema, table, column).await?;
303 Ok(ColumnInfo {
304 sql_type: meta.sql_type,
305 nullable: meta.nullable,
306 })
307 }
308}
309
310#[async_trait]
311impl DocumentBuilder for PgDocumentBuilder {
312 #[tracing::instrument(
313 name = "pg.resolve",
314 level = "debug",
315 skip_all,
316 fields(table = table.as_ref()),
317 err,
318 )]
319 async fn resolve(&self, table: &TableName, key: &RowKey) -> Result<Vec<DocumentId>> {
320 let mut ids = Vec::new();
321 for (name, schema) in self.spec.indexes() {
322 if schema.table == *table {
323 ids.push(DocumentId {
324 index: name.clone(),
325 key: key.clone(),
326 });
327 continue;
328 }
329
330 let mut paths = Vec::new();
331 let mut prefix = Vec::new();
332 find_paths(&schema.fields, table, &mut prefix, &mut paths);
333 if paths.is_empty() {
334 continue;
335 }
336 let Some(pk_column) = schema.primary_key.clone() else {
337 tracing::warn!(
338 index = %name, table = %table,
339 "cannot reverse-resolve: index has no primary_key",
340 );
341 continue;
342 };
343
344 let mut seen = HashSet::new();
345 for path in &paths {
346 for root in self.resolve_path(schema, table, key, path).await? {
347 if seen.insert(root.clone()) {
348 ids.push(DocumentId {
349 index: name.clone(),
350 key: RowKey(vec![(pk_column.clone(), root)]),
351 });
352 }
353 }
354 }
355 }
356 tracing::trace!(documents = ids.len(), "resolved affected documents");
357 Ok(ids)
358 }
359
360 #[tracing::instrument(
361 name = "pg.build",
362 level = "debug",
363 skip_all,
364 fields(index = id.index.as_ref()),
365 err,
366 )]
367 async fn build(&self, id: &DocumentId) -> Result<Document> {
368 let schema = self
369 .spec
370 .schema(&id.index)
371 .ok_or_else(|| SourceError::Query(format!("unknown index `{}`", id.index)))?;
372
373 let pks = self.relation_pks(schema).await?;
374 let col_types = self.filter_column_types(schema).await?;
375 let (sql, params) = query::document_query(schema, &id.key.0, &pks, &col_types)?;
376
377 let mut statement = sqlx::query(sql);
378 for param in ¶ms {
379 statement = query::bind_param(statement, param)?;
380 }
381 let row = statement
382 .fetch_optional(&self.pool)
383 .await
384 .map_err(query_err)?;
385
386 match row {
389 None => Ok(Document::Delete { id: id.clone() }),
390 Some(row) => {
391 let document: serde_json::Value = row.try_get("document").map_err(query_err)?;
392 Ok(Document::Upsert {
393 id: id.clone(),
394 body: value::json_to_generic(document),
395 })
396 }
397 }
398 }
399
400 #[tracing::instrument(name = "pg.build_many", level = "debug", skip_all, fields(ids = ids.len()))]
401 async fn build_many(&self, ids: &[DocumentId]) -> Result<Vec<Document>> {
402 let mut by_index: HashMap<&IndexName, Vec<&DocumentId>> = HashMap::new();
403 for id in ids {
404 by_index.entry(&id.index).or_default().push(id);
405 }
406
407 let mut out = Vec::with_capacity(ids.len());
408 for (index_name, group) in by_index {
409 let schema = self
410 .spec
411 .schema(index_name)
412 .ok_or_else(|| SourceError::Query(format!("unknown index `{index_name}`")))?;
413
414 let keyed: Option<Vec<(&schema_core::GenericValue, &DocumentId)>> = group
421 .iter()
422 .map(|id| match id.key.0.as_slice() {
423 [(_, value)] => Some((value, *id)),
424 _ => None,
425 })
426 .collect();
427 let (Some(pk_column), Some(keyed)) = (schema.primary_key.clone(), keyed) else {
428 for id in group {
429 out.push(self.build(id).await?);
430 }
431 continue;
432 };
433
434 let pks = self.relation_pks(schema).await?;
435 let col_types = self.filter_column_types(schema).await?;
436
437 for chunk in keyed.chunks(BUILD_CHUNK) {
438 let keys: Vec<schema_core::GenericValue> =
439 chunk.iter().map(|(value, _)| (*value).clone()).collect();
440 let (sql, params) =
441 query::documents_query(schema, &pk_column, &keys, &pks, &col_types)?;
442
443 let mut statement = sqlx::query(sql);
444 for param in ¶ms {
445 statement = query::bind_param(statement, param)?;
446 }
447 let rows = statement.fetch_all(&self.pool).await.map_err(query_err)?;
448
449 let mut bodies: HashMap<schema_core::GenericValue, schema_core::GenericValue> =
453 HashMap::with_capacity(rows.len());
454 for row in &rows {
455 let key = value::first_column_to_generic(row);
456 let document: serde_json::Value = row.try_get("document").map_err(query_err)?;
457 bodies.insert(key, value::json_to_generic(document));
458 }
459
460 for (value, id) in chunk {
464 let document = match bodies.remove(*value) {
465 Some(body) => Document::Upsert {
466 id: (*id).clone(),
467 body,
468 },
469 None => Document::Delete { id: (*id).clone() },
470 };
471 out.push(document);
472 }
473 }
474 }
475 Ok(out)
476 }
477
478 fn backfill_scopes(&self) -> Vec<IndexScope> {
479 self.spec
482 .indexes()
483 .map(|(name, schema)| IndexScope {
484 index: name.clone(),
485 root: SnapshotTable {
486 db_schema: schema.db_schema.clone(),
487 table: schema.table.clone(),
488 },
489 })
490 .collect()
491 }
492
493 async fn index_mappings(&self) -> Result<Vec<IndexMapping>> {
494 Ok(self.spec.index_mappings())
497 }
498}
499
500pub(super) fn query_err(error: sqlx::Error) -> SourceError {
501 SourceError::Query(error.to_string())
502}
503
504pub(crate) const PRIMARY_KEY_SQL: &str = "SELECT a.attname AS name \
507 FROM pg_index i \
508 JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) \
509 WHERE i.indrelid = $1::regclass AND i.indisprimary";
510
511pub(crate) async fn primary_key_column_names(
515 pool: &PgPool,
516 qualified: String,
517) -> Result<Vec<String>> {
518 let rows = sqlx::query(PRIMARY_KEY_SQL)
519 .bind(qualified)
520 .fetch_all(pool)
521 .await
522 .map_err(query_err)?;
523 let mut names = Vec::with_capacity(rows.len());
524 for row in &rows {
525 names.push(row.try_get::<String, _>("name").map_err(query_err)?);
526 }
527 Ok(names)
528}