Skip to main content

faucet_sink_postgres/
sink.rs

1//! PostgreSQL sink implementation.
2
3use crate::config::{PostgresColumnMapping, PostgresSinkConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use faucet_core::util::quote_ident;
7use serde_json::Value;
8use sqlx::postgres::PgPoolOptions;
9use sqlx::{PgPool, Row};
10
11/// Render a JSON value as the text to bind for a PostgreSQL column whose
12/// underlying type is `udt` (`information_schema.columns.udt_name`), or `None`
13/// for SQL `NULL`.
14///
15/// The accompanying placeholder is emitted as `$N::<udt>`, so PostgreSQL runs
16/// the destination column type's input function over this text. That makes
17/// `string → timestamptz/uuid/date`, `number → int4/numeric/float8`,
18/// `bool → bool`, and `json → jsonb` all work — instead of binding every value
19/// as `serde_json::Value` (which sqlx encodes as `jsonb`, so an insert into any
20/// non-`jsonb` column fails at runtime with *"column is of type … but
21/// expression is of type jsonb"*; this was the C1 bug in audit #146).
22///
23/// For `json`/`jsonb` columns the value is bound as its JSON text (so a string
24/// keeps its quotes and objects/arrays round-trip); the `::jsonb` cast then
25/// parses it. For every other type the scalar's plain text form is bound and
26/// the column's input function parses it via the cast.
27fn pg_bind_text(value: Option<&Value>, udt: &str) -> Option<String> {
28    match value {
29        None | Some(Value::Null) => None,
30        Some(v) => {
31            if udt.eq_ignore_ascii_case("json") || udt.eq_ignore_ascii_case("jsonb") {
32                Some(v.to_string())
33            } else {
34                match v {
35                    Value::Bool(b) => Some(b.to_string()),
36                    Value::Number(n) => Some(n.to_string()),
37                    Value::String(s) => Some(s.clone()),
38                    // Arrays/objects have no scalar text form for a non-JSON
39                    // column; bind their JSON text so the `::<type>` cast fails
40                    // loudly rather than silently coercing.
41                    other => Some(other.to_string()),
42                }
43            }
44        }
45    }
46}
47
48/// Build the SQL relation reference for the configured table, optionally
49/// schema-qualified.
50///
51/// Both the AutoMap column-discovery probe and the `INSERT` statements use this
52/// single helper, so column discovery is always scoped to the *exact* relation
53/// the `INSERT` targets (#146 M13). With no schema the bare quoted table name
54/// resolves against the connection's `search_path`; with a schema it becomes
55/// `"schema"."table"`, pinning both discovery and insert to that namespace —
56/// otherwise a table of the same name in another schema pollutes the
57/// AutoMap column set (duplicate / wrong columns).
58fn qualified_table_ref(schema: Option<&str>, table: &str) -> String {
59    match schema {
60        Some(s) => format!("{}.{}", quote_ident(s), quote_ident(table)),
61        None => quote_ident(table),
62    }
63}
64
65/// A sink that writes JSON records to a PostgreSQL table.
66pub struct PostgresSink {
67    config: PostgresSinkConfig,
68    pool: PgPool,
69}
70
71impl PostgresSink {
72    /// Create a new PostgreSQL sink. Establishes a connection pool.
73    pub async fn new(config: PostgresSinkConfig) -> Result<Self, FaucetError> {
74        let pool = PgPoolOptions::new()
75            .max_connections(config.max_connections)
76            .connect(&config.connection_url)
77            .await
78            .map_err(|e| FaucetError::Sink(format!("PostgreSQL connection failed: {e}")))?;
79
80        Ok(Self { config, pool })
81    }
82
83    /// Insert a batch of records using JSONB column mode.
84    async fn insert_jsonb(&self, records: &[Value], column: &str) -> Result<usize, FaucetError> {
85        if records.is_empty() {
86            return Ok(0);
87        }
88
89        // Use a single INSERT with unnest for efficiency.
90        let json_values: Vec<serde_json::Value> = records.to_vec();
91        let query = format!(
92            "INSERT INTO {} ({}) SELECT * FROM unnest($1::jsonb[])",
93            qualified_table_ref(self.config.schema.as_deref(), &self.config.table_name),
94            quote_ident(column)
95        );
96
97        sqlx::query(&query)
98            .bind(json_values)
99            .execute(&self.pool)
100            .await
101            .map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
102
103        Ok(records.len())
104    }
105
106    /// Insert a batch of records using auto-mapped columns.
107    ///
108    /// Discovers each column's name *and* underlying type (`udt_name`) from the
109    /// table schema and maps top-level JSON fields to columns. Each placeholder
110    /// is emitted as `$N::<udt>` and the value is bound as text (see
111    /// [`pg_bind_text`]), so the destination column's input function parses it —
112    /// numbers, booleans, timestamps, uuids, and JSON all land in their native
113    /// column types. (Previously every value was bound as `serde_json::Value`,
114    /// which sqlx encodes as `jsonb`, so an insert into any non-`jsonb` column
115    /// failed at runtime — audit #146 C1.) Uses a single multi-row INSERT
116    /// (sub-chunked at the 65535-parameter cap) for efficiency.
117    async fn insert_auto_map(&self, records: &[Value]) -> Result<usize, FaucetError> {
118        if records.is_empty() {
119            return Ok(0);
120        }
121
122        // Get column names AND their underlying types for the *exact* relation
123        // the INSERT will target. Scoping by `to_regclass(<qualified ref>)`
124        // resolves the relation the same way the INSERT does — by the configured
125        // schema if set, otherwise by the connection's `search_path` — so a
126        // table of the same name in another schema can no longer pollute the
127        // column set with duplicate/wrong columns (#146 M13). The previous query
128        // filtered `information_schema.columns` by `table_name` alone (no schema
129        // predicate), merging every same-named table across all schemas.
130        //
131        // `pg_type.typname` is the concrete type (`int4`, `timestamptz`,
132        // `numeric`, `jsonb`, `uuid`, `text`, …) — identical to the old
133        // `information_schema.columns.udt_name` — used as the per-placeholder
134        // cast target below. `::text` casts the `name`-typed catalog columns so
135        // sqlx decodes them as `String`.
136        let table_ref = qualified_table_ref(self.config.schema.as_deref(), &self.config.table_name);
137        let columns: Vec<(String, String)> = sqlx::query(
138            "SELECT a.attname::text AS column_name, t.typname::text AS udt_name \
139             FROM pg_catalog.pg_attribute a \
140             JOIN pg_catalog.pg_type t ON t.oid = a.atttypid \
141             WHERE a.attrelid = to_regclass($1)::oid \
142               AND a.attnum > 0 AND NOT a.attisdropped \
143             ORDER BY a.attnum",
144        )
145        .bind(&table_ref)
146        .fetch_all(&self.pool)
147        .await
148        .map_err(|e| FaucetError::Sink(format!("failed to query table columns: {e}")))?
149        .iter()
150        .map(|row| {
151            (
152                row.get::<String, _>("column_name"),
153                row.get::<String, _>("udt_name"),
154            )
155        })
156        .collect();
157
158        if columns.is_empty() {
159            return Err(FaucetError::Sink(format!(
160                "table {table_ref} has no columns or does not exist"
161            )));
162        }
163
164        // Pre-validate all records and collect matched (column, udt, value)
165        // triples per record. The INSERT column set is the UNION of table
166        // columns present in ANY record (in declared table order), not just the
167        // first record's keys — otherwise a field present only in a later record
168        // of the batch would be silently dropped (audit #146 H1). A row missing
169        // a unioned column binds SQL NULL.
170        let mut matched_rows: Vec<Vec<(&String, &String, &Value)>> =
171            Vec::with_capacity(records.len());
172        let mut used: std::collections::HashSet<&str> = std::collections::HashSet::new();
173
174        for record in records {
175            let obj = record
176                .as_object()
177                .ok_or_else(|| FaucetError::Sink("AutoMap requires JSON object records".into()))?;
178
179            let matching: Vec<(&String, &String, &Value)> = columns
180                .iter()
181                .filter_map(|(col, udt)| obj.get(col).map(|v| (col, udt, v)))
182                .collect();
183
184            if matching.is_empty() {
185                tracing::warn!(
186                    record_keys = ?obj.keys().collect::<Vec<_>>(),
187                    table_columns = ?columns,
188                    "record has no keys matching table columns, skipping"
189                );
190                continue;
191            }
192
193            for (c, _, _) in &matching {
194                used.insert(c.as_str());
195            }
196            matched_rows.push(matching);
197        }
198
199        if matched_rows.is_empty() {
200            return Ok(0);
201        }
202
203        // Table columns (in declared order, with their udt) present in at least
204        // one record.
205        let insert_columns: Vec<(String, String)> = columns
206            .iter()
207            .filter(|(c, _)| used.contains(c.as_str()))
208            .cloned()
209            .collect();
210
211        let num_cols = insert_columns.len();
212        let num_rows = matched_rows.len();
213        let col_names: Vec<String> = insert_columns.iter().map(|(c, _)| quote_ident(c)).collect();
214
215        // PostgreSQL caps bind parameters per statement at 65535. A multi-row
216        // INSERT binds `rows × num_cols` parameters, so a wide table at a large
217        // batch_size can exceed it and fail at runtime (#78/#21). Split into
218        // sub-INSERTs of at most floor(MAX_PARAMS / num_cols) rows.
219        const MAX_PG_PARAMS: usize = 65535;
220        let max_rows_per_insert = (MAX_PG_PARAMS / num_cols).max(1);
221
222        for sub in matched_rows.chunks(max_rows_per_insert) {
223            // Build multi-row VALUES clause with per-column casts so the column
224            // type's input function parses the bound text:
225            //   ($1::int4, $2::timestamptz), ($3::int4, $4::timestamptz), ...
226            let mut value_tuples: Vec<String> = Vec::with_capacity(sub.len());
227            for row_idx in 0..sub.len() {
228                let start = row_idx * num_cols + 1;
229                let placeholders: Vec<String> = (0..num_cols)
230                    .map(|c| format!("${}::{}", start + c, insert_columns[c].1))
231                    .collect();
232                value_tuples.push(format!("({})", placeholders.join(", ")));
233            }
234
235            let query = format!(
236                "INSERT INTO {} ({}) VALUES {}",
237                table_ref,
238                col_names.join(", "),
239                value_tuples.join(", ")
240            );
241
242            let mut q = sqlx::query(&query);
243            for matched in sub {
244                // Bind values in the fixed column order, as text matching each
245                // column's type. A record missing a column that appeared in the
246                // first record binds SQL NULL.
247                for (col, udt) in &insert_columns {
248                    let val = matched
249                        .iter()
250                        .find(|(c, _, _)| *c == col)
251                        .map(|(_, _, v)| *v);
252                    q = q.bind(pg_bind_text(val, udt));
253                }
254            }
255
256            q.execute(&self.pool)
257                .await
258                .map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
259        }
260
261        Ok(num_rows)
262    }
263}
264
265#[async_trait]
266impl faucet_core::Sink for PostgresSink {
267    fn config_schema(&self) -> serde_json::Value {
268        serde_json::to_value(faucet_core::schema_for!(PostgresSinkConfig))
269            .expect("schema serialization")
270    }
271
272    /// Preflight connectivity probe (`faucet doctor`).
273    ///
274    /// Acquires a connection from the existing pool and runs `SELECT 1`. This
275    /// is non-mutating and idempotent — it validates that the database is
276    /// reachable and the credentials are accepted without writing anything.
277    async fn check(
278        &self,
279        ctx: &faucet_core::check::CheckContext,
280    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
281        use faucet_core::check::{CheckReport, Probe};
282
283        let started = std::time::Instant::now();
284        let probe =
285            match tokio::time::timeout(ctx.timeout, sqlx::query("SELECT 1").execute(&self.pool))
286                .await
287            {
288                Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
289                Ok(Err(e)) => Probe::fail_hint(
290                    "auth",
291                    started.elapsed(),
292                    e.to_string(),
293                    "check connection_url / credentials / that the database is reachable",
294                ),
295                Err(_) => Probe::fail_hint(
296                    "auth",
297                    started.elapsed(),
298                    "timed out",
299                    "check connection_url / credentials / that the database is reachable",
300                ),
301            };
302        Ok(CheckReport::single(probe))
303    }
304
305    /// Write records to PostgreSQL.
306    ///
307    /// When `config.batch_size > 0` and the input slice is larger than
308    /// `batch_size`, the slice is split into chunks of `batch_size` rows and
309    /// each chunk is sent as a separate multi-row `INSERT`. When
310    /// `config.batch_size == 0`, the entire slice is sent in a single
311    /// `INSERT` — useful when upstream `StreamPage`s are already sized for
312    /// Postgres' per-statement bind-parameter limit (~65 535 / num_columns
313    /// in AutoMap mode).
314    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
315        if records.is_empty() {
316            return Ok(0);
317        }
318
319        let chunks: Vec<&[Value]> = if self.config.batch_size == 0 {
320            // Sentinel: pass the entire upstream page through in a single
321            // INSERT statement. Subject to Postgres' 65 535 bind-parameter
322            // limit in AutoMap mode; JSONB mode binds a single array.
323            vec![records]
324        } else {
325            records.chunks(self.config.batch_size).collect()
326        };
327
328        let mut total = 0;
329        for chunk in chunks {
330            total += match &self.config.column_mapping {
331                PostgresColumnMapping::Jsonb { column } => self.insert_jsonb(chunk, column).await?,
332                PostgresColumnMapping::AutoMap => self.insert_auto_map(chunk).await?,
333            };
334        }
335
336        tracing::info!(
337            table = %self.config.table_name,
338            rows = total,
339            "PostgreSQL write complete"
340        );
341        Ok(total)
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::{pg_bind_text, qualified_table_ref};
348    use serde_json::json;
349
350    #[test]
351    fn qualified_table_ref_unqualified_is_bare_quoted_table() {
352        // No schema → bare quoted table, resolved against the search_path.
353        assert_eq!(qualified_table_ref(None, "events"), "\"events\"");
354    }
355
356    #[test]
357    fn qualified_table_ref_with_schema_is_schema_dot_table() {
358        // With a schema → "schema"."table", so discovery and INSERT both
359        // target the same explicit relation (#146 M13).
360        assert_eq!(
361            qualified_table_ref(Some("analytics"), "events"),
362            "\"analytics\".\"events\""
363        );
364    }
365
366    #[test]
367    fn qualified_table_ref_escapes_embedded_quotes() {
368        // SQL-injection safety: embedded double-quotes are doubled.
369        assert_eq!(
370            qualified_table_ref(Some("we\"ird"), "ta\"ble"),
371            "\"we\"\"ird\".\"ta\"\"ble\""
372        );
373    }
374
375    #[test]
376    fn null_and_absent_bind_sql_null() {
377        assert_eq!(pg_bind_text(None, "text"), None);
378        assert_eq!(pg_bind_text(Some(&json!(null)), "int4"), None);
379        assert_eq!(pg_bind_text(Some(&json!(null)), "jsonb"), None);
380    }
381
382    #[test]
383    fn scalars_bind_plain_text_for_typed_columns() {
384        // The `$N::<udt>` cast parses these via the column's input function.
385        assert_eq!(
386            pg_bind_text(Some(&json!(42)), "int4").as_deref(),
387            Some("42")
388        );
389        assert_eq!(
390            pg_bind_text(Some(&json!(1.5)), "numeric").as_deref(),
391            Some("1.5")
392        );
393        assert_eq!(
394            pg_bind_text(Some(&json!(true)), "bool").as_deref(),
395            Some("true")
396        );
397        assert_eq!(
398            pg_bind_text(Some(&json!("2025-01-01T00:00:00Z")), "timestamptz").as_deref(),
399            Some("2025-01-01T00:00:00Z")
400        );
401        // A plain string into TEXT keeps NO JSON quotes (the bug bound `"Bob"`).
402        assert_eq!(
403            pg_bind_text(Some(&json!("Bob")), "text").as_deref(),
404            Some("Bob")
405        );
406        // Large u64 beyond i64 keeps exact text (no f64 precision loss).
407        assert_eq!(
408            pg_bind_text(Some(&json!(18446744073709551615u64)), "numeric").as_deref(),
409            Some("18446744073709551615")
410        );
411    }
412
413    #[test]
414    fn json_columns_get_json_text_with_quotes_preserved() {
415        // For jsonb/json columns the value is bound as JSON text so the
416        // `::jsonb` cast parses it: a string keeps its quotes, objects/arrays
417        // round-trip.
418        assert_eq!(
419            pg_bind_text(Some(&json!("Bob")), "jsonb").as_deref(),
420            Some("\"Bob\"")
421        );
422        assert_eq!(
423            pg_bind_text(Some(&json!({"a": 1})), "jsonb").as_deref(),
424            Some("{\"a\":1}")
425        );
426        assert_eq!(
427            pg_bind_text(Some(&json!([1, 2])), "json").as_deref(),
428            Some("[1,2]")
429        );
430        assert_eq!(pg_bind_text(Some(&json!(5)), "jsonb").as_deref(), Some("5"));
431        // udt match is case-insensitive.
432        assert_eq!(
433            pg_bind_text(Some(&json!("x")), "JSONB").as_deref(),
434            Some("\"x\"")
435        );
436    }
437
438    #[test]
439    fn objects_into_non_json_columns_emit_json_text_so_the_cast_fails_loudly() {
440        // No scalar text form for an object targeting e.g. an int column; the
441        // `::int4` cast will reject this rather than silently coercing.
442        assert_eq!(
443            pg_bind_text(Some(&json!({"a": 1})), "int4").as_deref(),
444            Some("{\"a\":1}")
445        );
446    }
447}