faucet_sink_postgres/sink.rs
1//! PostgreSQL sink implementation.
2
3use crate::config::{PostgresColumnMapping, PostgresSinkConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use faucet_core::util::quote_ident;
7use serde_json::Value;
8use sqlx::postgres::PgPoolOptions;
9use sqlx::{PgPool, Row};
10
11/// Render a JSON value as the text to bind for a PostgreSQL column whose
12/// underlying type is `udt` (`information_schema.columns.udt_name`), or `None`
13/// for SQL `NULL`.
14///
15/// The accompanying placeholder is emitted as `$N::<udt>`, so PostgreSQL runs
16/// the destination column type's input function over this text. That makes
17/// `string → timestamptz/uuid/date`, `number → int4/numeric/float8`,
18/// `bool → bool`, and `json → jsonb` all work — instead of binding every value
19/// as `serde_json::Value` (which sqlx encodes as `jsonb`, so an insert into any
20/// non-`jsonb` column fails at runtime with *"column is of type … but
21/// expression is of type jsonb"*; this was the C1 bug in audit #146).
22///
23/// For `json`/`jsonb` columns the value is bound as its JSON text (so a string
24/// keeps its quotes and objects/arrays round-trip); the `::jsonb` cast then
25/// parses it. For every other type the scalar's plain text form is bound and
26/// the column's input function parses it via the cast.
27fn pg_bind_text(value: Option<&Value>, udt: &str) -> Option<String> {
28 match value {
29 None | Some(Value::Null) => None,
30 Some(v) => {
31 if udt.eq_ignore_ascii_case("json") || udt.eq_ignore_ascii_case("jsonb") {
32 Some(v.to_string())
33 } else {
34 match v {
35 Value::Bool(b) => Some(b.to_string()),
36 Value::Number(n) => Some(n.to_string()),
37 Value::String(s) => Some(s.clone()),
38 // Arrays/objects have no scalar text form for a non-JSON
39 // column; bind their JSON text so the `::<type>` cast fails
40 // loudly rather than silently coercing.
41 other => Some(other.to_string()),
42 }
43 }
44 }
45 }
46}
47
48/// Build the SQL relation reference for the configured table, optionally
49/// schema-qualified.
50///
51/// Both the AutoMap column-discovery probe and the `INSERT` statements use this
52/// single helper, so column discovery is always scoped to the *exact* relation
53/// the `INSERT` targets (#146 M13). With no schema the bare quoted table name
54/// resolves against the connection's `search_path`; with a schema it becomes
55/// `"schema"."table"`, pinning both discovery and insert to that namespace —
56/// otherwise a table of the same name in another schema pollutes the
57/// AutoMap column set (duplicate / wrong columns).
58fn qualified_table_ref(schema: Option<&str>, table: &str) -> String {
59 match schema {
60 Some(s) => format!("{}.{}", quote_ident(s), quote_ident(table)),
61 None => quote_ident(table),
62 }
63}
64
65/// A sink that writes JSON records to a PostgreSQL table.
66pub struct PostgresSink {
67 config: PostgresSinkConfig,
68 pool: PgPool,
69}
70
71impl PostgresSink {
72 /// Create a new PostgreSQL sink. Establishes a connection pool.
73 pub async fn new(config: PostgresSinkConfig) -> Result<Self, FaucetError> {
74 let pool = PgPoolOptions::new()
75 .max_connections(config.max_connections)
76 .connect(&config.connection_url)
77 .await
78 .map_err(|e| FaucetError::Sink(format!("PostgreSQL connection failed: {e}")))?;
79
80 Ok(Self { config, pool })
81 }
82
83 /// Insert a batch of records using JSONB column mode.
84 async fn insert_jsonb(&self, records: &[Value], column: &str) -> Result<usize, FaucetError> {
85 if records.is_empty() {
86 return Ok(0);
87 }
88
89 // Use a single INSERT with unnest for efficiency.
90 let json_values: Vec<serde_json::Value> = records.to_vec();
91 let query = format!(
92 "INSERT INTO {} ({}) SELECT * FROM unnest($1::jsonb[])",
93 qualified_table_ref(self.config.schema.as_deref(), &self.config.table_name),
94 quote_ident(column)
95 );
96
97 sqlx::query(&query)
98 .bind(json_values)
99 .execute(&self.pool)
100 .await
101 .map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
102
103 Ok(records.len())
104 }
105
106 /// Insert a batch of records using auto-mapped columns.
107 ///
108 /// Discovers each column's name *and* underlying type (`udt_name`) from the
109 /// table schema and maps top-level JSON fields to columns. Each placeholder
110 /// is emitted as `$N::<udt>` and the value is bound as text (see
111 /// [`pg_bind_text`]), so the destination column's input function parses it —
112 /// numbers, booleans, timestamps, uuids, and JSON all land in their native
113 /// column types. (Previously every value was bound as `serde_json::Value`,
114 /// which sqlx encodes as `jsonb`, so an insert into any non-`jsonb` column
115 /// failed at runtime — audit #146 C1.) Uses a single multi-row INSERT
116 /// (sub-chunked at the 65535-parameter cap) for efficiency.
117 async fn insert_auto_map(&self, records: &[Value]) -> Result<usize, FaucetError> {
118 if records.is_empty() {
119 return Ok(0);
120 }
121
122 // Get column names AND their underlying types for the *exact* relation
123 // the INSERT will target. Scoping by `to_regclass(<qualified ref>)`
124 // resolves the relation the same way the INSERT does — by the configured
125 // schema if set, otherwise by the connection's `search_path` — so a
126 // table of the same name in another schema can no longer pollute the
127 // column set with duplicate/wrong columns (#146 M13). The previous query
128 // filtered `information_schema.columns` by `table_name` alone (no schema
129 // predicate), merging every same-named table across all schemas.
130 //
131 // `pg_type.typname` is the concrete type (`int4`, `timestamptz`,
132 // `numeric`, `jsonb`, `uuid`, `text`, …) — identical to the old
133 // `information_schema.columns.udt_name` — used as the per-placeholder
134 // cast target below. `::text` casts the `name`-typed catalog columns so
135 // sqlx decodes them as `String`.
136 let table_ref = qualified_table_ref(self.config.schema.as_deref(), &self.config.table_name);
137 let columns: Vec<(String, String)> = sqlx::query(
138 "SELECT a.attname::text AS column_name, t.typname::text AS udt_name \
139 FROM pg_catalog.pg_attribute a \
140 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid \
141 WHERE a.attrelid = to_regclass($1)::oid \
142 AND a.attnum > 0 AND NOT a.attisdropped \
143 ORDER BY a.attnum",
144 )
145 .bind(&table_ref)
146 .fetch_all(&self.pool)
147 .await
148 .map_err(|e| FaucetError::Sink(format!("failed to query table columns: {e}")))?
149 .iter()
150 .map(|row| {
151 (
152 row.get::<String, _>("column_name"),
153 row.get::<String, _>("udt_name"),
154 )
155 })
156 .collect();
157
158 if columns.is_empty() {
159 return Err(FaucetError::Sink(format!(
160 "table {table_ref} has no columns or does not exist"
161 )));
162 }
163
164 // Pre-validate all records and collect matched (column, udt, value)
165 // triples per record. The INSERT column set is the UNION of table
166 // columns present in ANY record (in declared table order), not just the
167 // first record's keys — otherwise a field present only in a later record
168 // of the batch would be silently dropped (audit #146 H1). A row missing
169 // a unioned column binds SQL NULL.
170 let mut matched_rows: Vec<Vec<(&String, &String, &Value)>> =
171 Vec::with_capacity(records.len());
172 let mut used: std::collections::HashSet<&str> = std::collections::HashSet::new();
173
174 for record in records {
175 let obj = record
176 .as_object()
177 .ok_or_else(|| FaucetError::Sink("AutoMap requires JSON object records".into()))?;
178
179 let matching: Vec<(&String, &String, &Value)> = columns
180 .iter()
181 .filter_map(|(col, udt)| obj.get(col).map(|v| (col, udt, v)))
182 .collect();
183
184 if matching.is_empty() {
185 tracing::warn!(
186 record_keys = ?obj.keys().collect::<Vec<_>>(),
187 table_columns = ?columns,
188 "record has no keys matching table columns, skipping"
189 );
190 continue;
191 }
192
193 for (c, _, _) in &matching {
194 used.insert(c.as_str());
195 }
196 matched_rows.push(matching);
197 }
198
199 if matched_rows.is_empty() {
200 return Ok(0);
201 }
202
203 // Table columns (in declared order, with their udt) present in at least
204 // one record.
205 let insert_columns: Vec<(String, String)> = columns
206 .iter()
207 .filter(|(c, _)| used.contains(c.as_str()))
208 .cloned()
209 .collect();
210
211 let num_cols = insert_columns.len();
212 let num_rows = matched_rows.len();
213 let col_names: Vec<String> = insert_columns.iter().map(|(c, _)| quote_ident(c)).collect();
214
215 // PostgreSQL caps bind parameters per statement at 65535. A multi-row
216 // INSERT binds `rows × num_cols` parameters, so a wide table at a large
217 // batch_size can exceed it and fail at runtime (#78/#21). Split into
218 // sub-INSERTs of at most floor(MAX_PARAMS / num_cols) rows.
219 const MAX_PG_PARAMS: usize = 65535;
220 let max_rows_per_insert = (MAX_PG_PARAMS / num_cols).max(1);
221
222 for sub in matched_rows.chunks(max_rows_per_insert) {
223 // Build multi-row VALUES clause with per-column casts so the column
224 // type's input function parses the bound text:
225 // ($1::int4, $2::timestamptz), ($3::int4, $4::timestamptz), ...
226 let mut value_tuples: Vec<String> = Vec::with_capacity(sub.len());
227 for row_idx in 0..sub.len() {
228 let start = row_idx * num_cols + 1;
229 let placeholders: Vec<String> = (0..num_cols)
230 .map(|c| format!("${}::{}", start + c, insert_columns[c].1))
231 .collect();
232 value_tuples.push(format!("({})", placeholders.join(", ")));
233 }
234
235 let query = format!(
236 "INSERT INTO {} ({}) VALUES {}",
237 table_ref,
238 col_names.join(", "),
239 value_tuples.join(", ")
240 );
241
242 let mut q = sqlx::query(&query);
243 for matched in sub {
244 // Bind values in the fixed column order, as text matching each
245 // column's type. A record missing a column that appeared in the
246 // first record binds SQL NULL.
247 for (col, udt) in &insert_columns {
248 let val = matched
249 .iter()
250 .find(|(c, _, _)| *c == col)
251 .map(|(_, _, v)| *v);
252 q = q.bind(pg_bind_text(val, udt));
253 }
254 }
255
256 q.execute(&self.pool)
257 .await
258 .map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
259 }
260
261 Ok(num_rows)
262 }
263}
264
265#[async_trait]
266impl faucet_core::Sink for PostgresSink {
267 fn config_schema(&self) -> serde_json::Value {
268 serde_json::to_value(faucet_core::schema_for!(PostgresSinkConfig))
269 .expect("schema serialization")
270 }
271
272 /// Preflight connectivity probe (`faucet doctor`).
273 ///
274 /// Acquires a connection from the existing pool and runs `SELECT 1`. This
275 /// is non-mutating and idempotent — it validates that the database is
276 /// reachable and the credentials are accepted without writing anything.
277 async fn check(
278 &self,
279 ctx: &faucet_core::check::CheckContext,
280 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
281 use faucet_core::check::{CheckReport, Probe};
282
283 let started = std::time::Instant::now();
284 let probe =
285 match tokio::time::timeout(ctx.timeout, sqlx::query("SELECT 1").execute(&self.pool))
286 .await
287 {
288 Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
289 Ok(Err(e)) => Probe::fail_hint(
290 "auth",
291 started.elapsed(),
292 e.to_string(),
293 "check connection_url / credentials / that the database is reachable",
294 ),
295 Err(_) => Probe::fail_hint(
296 "auth",
297 started.elapsed(),
298 "timed out",
299 "check connection_url / credentials / that the database is reachable",
300 ),
301 };
302 Ok(CheckReport::single(probe))
303 }
304
305 /// Write records to PostgreSQL.
306 ///
307 /// When `config.batch_size > 0` and the input slice is larger than
308 /// `batch_size`, the slice is split into chunks of `batch_size` rows and
309 /// each chunk is sent as a separate multi-row `INSERT`. When
310 /// `config.batch_size == 0`, the entire slice is sent in a single
311 /// `INSERT` — useful when upstream `StreamPage`s are already sized for
312 /// Postgres' per-statement bind-parameter limit (~65 535 / num_columns
313 /// in AutoMap mode).
314 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
315 if records.is_empty() {
316 return Ok(0);
317 }
318
319 let chunks: Vec<&[Value]> = if self.config.batch_size == 0 {
320 // Sentinel: pass the entire upstream page through in a single
321 // INSERT statement. Subject to Postgres' 65 535 bind-parameter
322 // limit in AutoMap mode; JSONB mode binds a single array.
323 vec![records]
324 } else {
325 records.chunks(self.config.batch_size).collect()
326 };
327
328 let mut total = 0;
329 for chunk in chunks {
330 total += match &self.config.column_mapping {
331 PostgresColumnMapping::Jsonb { column } => self.insert_jsonb(chunk, column).await?,
332 PostgresColumnMapping::AutoMap => self.insert_auto_map(chunk).await?,
333 };
334 }
335
336 tracing::info!(
337 table = %self.config.table_name,
338 rows = total,
339 "PostgreSQL write complete"
340 );
341 Ok(total)
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::{pg_bind_text, qualified_table_ref};
348 use serde_json::json;
349
350 #[test]
351 fn qualified_table_ref_unqualified_is_bare_quoted_table() {
352 // No schema → bare quoted table, resolved against the search_path.
353 assert_eq!(qualified_table_ref(None, "events"), "\"events\"");
354 }
355
356 #[test]
357 fn qualified_table_ref_with_schema_is_schema_dot_table() {
358 // With a schema → "schema"."table", so discovery and INSERT both
359 // target the same explicit relation (#146 M13).
360 assert_eq!(
361 qualified_table_ref(Some("analytics"), "events"),
362 "\"analytics\".\"events\""
363 );
364 }
365
366 #[test]
367 fn qualified_table_ref_escapes_embedded_quotes() {
368 // SQL-injection safety: embedded double-quotes are doubled.
369 assert_eq!(
370 qualified_table_ref(Some("we\"ird"), "ta\"ble"),
371 "\"we\"\"ird\".\"ta\"\"ble\""
372 );
373 }
374
375 #[test]
376 fn null_and_absent_bind_sql_null() {
377 assert_eq!(pg_bind_text(None, "text"), None);
378 assert_eq!(pg_bind_text(Some(&json!(null)), "int4"), None);
379 assert_eq!(pg_bind_text(Some(&json!(null)), "jsonb"), None);
380 }
381
382 #[test]
383 fn scalars_bind_plain_text_for_typed_columns() {
384 // The `$N::<udt>` cast parses these via the column's input function.
385 assert_eq!(
386 pg_bind_text(Some(&json!(42)), "int4").as_deref(),
387 Some("42")
388 );
389 assert_eq!(
390 pg_bind_text(Some(&json!(1.5)), "numeric").as_deref(),
391 Some("1.5")
392 );
393 assert_eq!(
394 pg_bind_text(Some(&json!(true)), "bool").as_deref(),
395 Some("true")
396 );
397 assert_eq!(
398 pg_bind_text(Some(&json!("2025-01-01T00:00:00Z")), "timestamptz").as_deref(),
399 Some("2025-01-01T00:00:00Z")
400 );
401 // A plain string into TEXT keeps NO JSON quotes (the bug bound `"Bob"`).
402 assert_eq!(
403 pg_bind_text(Some(&json!("Bob")), "text").as_deref(),
404 Some("Bob")
405 );
406 // Large u64 beyond i64 keeps exact text (no f64 precision loss).
407 assert_eq!(
408 pg_bind_text(Some(&json!(18446744073709551615u64)), "numeric").as_deref(),
409 Some("18446744073709551615")
410 );
411 }
412
413 #[test]
414 fn json_columns_get_json_text_with_quotes_preserved() {
415 // For jsonb/json columns the value is bound as JSON text so the
416 // `::jsonb` cast parses it: a string keeps its quotes, objects/arrays
417 // round-trip.
418 assert_eq!(
419 pg_bind_text(Some(&json!("Bob")), "jsonb").as_deref(),
420 Some("\"Bob\"")
421 );
422 assert_eq!(
423 pg_bind_text(Some(&json!({"a": 1})), "jsonb").as_deref(),
424 Some("{\"a\":1}")
425 );
426 assert_eq!(
427 pg_bind_text(Some(&json!([1, 2])), "json").as_deref(),
428 Some("[1,2]")
429 );
430 assert_eq!(pg_bind_text(Some(&json!(5)), "jsonb").as_deref(), Some("5"));
431 // udt match is case-insensitive.
432 assert_eq!(
433 pg_bind_text(Some(&json!("x")), "JSONB").as_deref(),
434 Some("\"x\"")
435 );
436 }
437
438 #[test]
439 fn objects_into_non_json_columns_emit_json_text_so_the_cast_fails_loudly() {
440 // No scalar text form for an object targeting e.g. an int column; the
441 // `::int4` cast will reject this rather than silently coercing.
442 assert_eq!(
443 pg_bind_text(Some(&json!({"a": 1})), "int4").as_deref(),
444 Some("{\"a\":1}")
445 );
446 }
447}