use crate::config::{PostgresColumnMapping, PostgresSinkConfig};
use async_trait::async_trait;
use faucet_core::FaucetError;
use faucet_core::util::quote_ident;
use serde_json::Value;
use sqlx::postgres::PgPoolOptions;
use sqlx::{PgPool, Row};
fn pg_bind_text(value: Option<&Value>, udt: &str) -> Option<String> {
match value {
None | Some(Value::Null) => None,
Some(v) => {
if udt.eq_ignore_ascii_case("json") || udt.eq_ignore_ascii_case("jsonb") {
Some(v.to_string())
} else {
match v {
Value::Bool(b) => Some(b.to_string()),
Value::Number(n) => Some(n.to_string()),
Value::String(s) => Some(s.clone()),
other => Some(other.to_string()),
}
}
}
}
}
fn qualified_table_ref(schema: Option<&str>, table: &str) -> String {
match schema {
Some(s) => format!("{}.{}", quote_ident(s), quote_ident(table)),
None => quote_ident(table),
}
}
pub struct PostgresSink {
config: PostgresSinkConfig,
pool: PgPool,
}
impl PostgresSink {
pub async fn new(config: PostgresSinkConfig) -> Result<Self, FaucetError> {
let pool = PgPoolOptions::new()
.max_connections(config.max_connections)
.connect(&config.connection_url)
.await
.map_err(|e| FaucetError::Sink(format!("PostgreSQL connection failed: {e}")))?;
Ok(Self { config, pool })
}
async fn insert_jsonb(&self, records: &[Value], column: &str) -> Result<usize, FaucetError> {
if records.is_empty() {
return Ok(0);
}
let json_values: Vec<serde_json::Value> = records.to_vec();
let query = format!(
"INSERT INTO {} ({}) SELECT * FROM unnest($1::jsonb[])",
qualified_table_ref(self.config.schema.as_deref(), &self.config.table_name),
quote_ident(column)
);
sqlx::query(&query)
.bind(json_values)
.execute(&self.pool)
.await
.map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
Ok(records.len())
}
async fn insert_auto_map(&self, records: &[Value]) -> Result<usize, FaucetError> {
if records.is_empty() {
return Ok(0);
}
let table_ref = qualified_table_ref(self.config.schema.as_deref(), &self.config.table_name);
let columns: Vec<(String, String)> = sqlx::query(
"SELECT a.attname::text AS column_name, t.typname::text AS udt_name \
FROM pg_catalog.pg_attribute a \
JOIN pg_catalog.pg_type t ON t.oid = a.atttypid \
WHERE a.attrelid = to_regclass($1)::oid \
AND a.attnum > 0 AND NOT a.attisdropped \
ORDER BY a.attnum",
)
.bind(&table_ref)
.fetch_all(&self.pool)
.await
.map_err(|e| FaucetError::Sink(format!("failed to query table columns: {e}")))?
.iter()
.map(|row| {
(
row.get::<String, _>("column_name"),
row.get::<String, _>("udt_name"),
)
})
.collect();
if columns.is_empty() {
return Err(FaucetError::Sink(format!(
"table {table_ref} has no columns or does not exist"
)));
}
let mut matched_rows: Vec<Vec<(&String, &String, &Value)>> =
Vec::with_capacity(records.len());
let mut used: std::collections::HashSet<&str> = std::collections::HashSet::new();
for record in records {
let obj = record
.as_object()
.ok_or_else(|| FaucetError::Sink("AutoMap requires JSON object records".into()))?;
let matching: Vec<(&String, &String, &Value)> = columns
.iter()
.filter_map(|(col, udt)| obj.get(col).map(|v| (col, udt, v)))
.collect();
if matching.is_empty() {
tracing::warn!(
record_keys = ?obj.keys().collect::<Vec<_>>(),
table_columns = ?columns,
"record has no keys matching table columns, skipping"
);
continue;
}
for (c, _, _) in &matching {
used.insert(c.as_str());
}
matched_rows.push(matching);
}
if matched_rows.is_empty() {
return Ok(0);
}
let insert_columns: Vec<(String, String)> = columns
.iter()
.filter(|(c, _)| used.contains(c.as_str()))
.cloned()
.collect();
let num_cols = insert_columns.len();
let num_rows = matched_rows.len();
let col_names: Vec<String> = insert_columns.iter().map(|(c, _)| quote_ident(c)).collect();
const MAX_PG_PARAMS: usize = 65535;
let max_rows_per_insert = (MAX_PG_PARAMS / num_cols).max(1);
for sub in matched_rows.chunks(max_rows_per_insert) {
let mut value_tuples: Vec<String> = Vec::with_capacity(sub.len());
for row_idx in 0..sub.len() {
let start = row_idx * num_cols + 1;
let placeholders: Vec<String> = (0..num_cols)
.map(|c| format!("${}::{}", start + c, insert_columns[c].1))
.collect();
value_tuples.push(format!("({})", placeholders.join(", ")));
}
let query = format!(
"INSERT INTO {} ({}) VALUES {}",
table_ref,
col_names.join(", "),
value_tuples.join(", ")
);
let mut q = sqlx::query(&query);
for matched in sub {
for (col, udt) in &insert_columns {
let val = matched
.iter()
.find(|(c, _, _)| *c == col)
.map(|(_, _, v)| *v);
q = q.bind(pg_bind_text(val, udt));
}
}
q.execute(&self.pool)
.await
.map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
}
Ok(num_rows)
}
}
#[async_trait]
impl faucet_core::Sink for PostgresSink {
fn config_schema(&self) -> serde_json::Value {
serde_json::to_value(faucet_core::schema_for!(PostgresSinkConfig))
.expect("schema serialization")
}
async fn check(
&self,
ctx: &faucet_core::check::CheckContext,
) -> Result<faucet_core::check::CheckReport, FaucetError> {
use faucet_core::check::{CheckReport, Probe};
let started = std::time::Instant::now();
let probe =
match tokio::time::timeout(ctx.timeout, sqlx::query("SELECT 1").execute(&self.pool))
.await
{
Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
Ok(Err(e)) => Probe::fail_hint(
"auth",
started.elapsed(),
e.to_string(),
"check connection_url / credentials / that the database is reachable",
),
Err(_) => Probe::fail_hint(
"auth",
started.elapsed(),
"timed out",
"check connection_url / credentials / that the database is reachable",
),
};
Ok(CheckReport::single(probe))
}
async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
if records.is_empty() {
return Ok(0);
}
let chunks: Vec<&[Value]> = if self.config.batch_size == 0 {
vec![records]
} else {
records.chunks(self.config.batch_size).collect()
};
let mut total = 0;
for chunk in chunks {
total += match &self.config.column_mapping {
PostgresColumnMapping::Jsonb { column } => self.insert_jsonb(chunk, column).await?,
PostgresColumnMapping::AutoMap => self.insert_auto_map(chunk).await?,
};
}
tracing::info!(
table = %self.config.table_name,
rows = total,
"PostgreSQL write complete"
);
Ok(total)
}
}
#[cfg(test)]
mod tests {
use super::{pg_bind_text, qualified_table_ref};
use serde_json::json;
#[test]
fn qualified_table_ref_unqualified_is_bare_quoted_table() {
assert_eq!(qualified_table_ref(None, "events"), "\"events\"");
}
#[test]
fn qualified_table_ref_with_schema_is_schema_dot_table() {
assert_eq!(
qualified_table_ref(Some("analytics"), "events"),
"\"analytics\".\"events\""
);
}
#[test]
fn qualified_table_ref_escapes_embedded_quotes() {
assert_eq!(
qualified_table_ref(Some("we\"ird"), "ta\"ble"),
"\"we\"\"ird\".\"ta\"\"ble\""
);
}
#[test]
fn null_and_absent_bind_sql_null() {
assert_eq!(pg_bind_text(None, "text"), None);
assert_eq!(pg_bind_text(Some(&json!(null)), "int4"), None);
assert_eq!(pg_bind_text(Some(&json!(null)), "jsonb"), None);
}
#[test]
fn scalars_bind_plain_text_for_typed_columns() {
assert_eq!(
pg_bind_text(Some(&json!(42)), "int4").as_deref(),
Some("42")
);
assert_eq!(
pg_bind_text(Some(&json!(1.5)), "numeric").as_deref(),
Some("1.5")
);
assert_eq!(
pg_bind_text(Some(&json!(true)), "bool").as_deref(),
Some("true")
);
assert_eq!(
pg_bind_text(Some(&json!("2025-01-01T00:00:00Z")), "timestamptz").as_deref(),
Some("2025-01-01T00:00:00Z")
);
assert_eq!(
pg_bind_text(Some(&json!("Bob")), "text").as_deref(),
Some("Bob")
);
assert_eq!(
pg_bind_text(Some(&json!(18446744073709551615u64)), "numeric").as_deref(),
Some("18446744073709551615")
);
}
#[test]
fn json_columns_get_json_text_with_quotes_preserved() {
assert_eq!(
pg_bind_text(Some(&json!("Bob")), "jsonb").as_deref(),
Some("\"Bob\"")
);
assert_eq!(
pg_bind_text(Some(&json!({"a": 1})), "jsonb").as_deref(),
Some("{\"a\":1}")
);
assert_eq!(
pg_bind_text(Some(&json!([1, 2])), "json").as_deref(),
Some("[1,2]")
);
assert_eq!(pg_bind_text(Some(&json!(5)), "jsonb").as_deref(), Some("5"));
assert_eq!(
pg_bind_text(Some(&json!("x")), "JSONB").as_deref(),
Some("\"x\"")
);
}
#[test]
fn objects_into_non_json_columns_emit_json_text_so_the_cast_fails_loudly() {
assert_eq!(
pg_bind_text(Some(&json!({"a": 1})), "int4").as_deref(),
Some("{\"a\":1}")
);
}
}