use crate::config::{PostgresColumnMapping, PostgresSinkConfig};
use async_trait::async_trait;
use faucet_core::FaucetError;
use faucet_core::util::quote_ident;
use serde_json::Value;
use sqlx::postgres::PgPoolOptions;
use sqlx::{PgPool, Row};
pub struct PostgresSink {
config: PostgresSinkConfig,
pool: PgPool,
}
impl PostgresSink {
pub async fn new(config: PostgresSinkConfig) -> Result<Self, FaucetError> {
let pool = PgPoolOptions::new()
.max_connections(config.max_connections)
.connect(&config.connection_url)
.await
.map_err(|e| FaucetError::Sink(format!("PostgreSQL connection failed: {e}")))?;
Ok(Self { config, pool })
}
async fn insert_jsonb(&self, records: &[Value], column: &str) -> Result<usize, FaucetError> {
if records.is_empty() {
return Ok(0);
}
let json_values: Vec<serde_json::Value> = records.to_vec();
let query = format!(
"INSERT INTO {} ({}) SELECT * FROM unnest($1::jsonb[])",
quote_ident(&self.config.table_name),
quote_ident(column)
);
sqlx::query(&query)
.bind(json_values)
.execute(&self.pool)
.await
.map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
Ok(records.len())
}
async fn insert_auto_map(&self, records: &[Value]) -> Result<usize, FaucetError> {
if records.is_empty() {
return Ok(0);
}
let columns: Vec<String> = sqlx::query(
"SELECT column_name FROM information_schema.columns WHERE table_name = $1 ORDER BY ordinal_position"
)
.bind(&self.config.table_name)
.fetch_all(&self.pool)
.await
.map_err(|e| FaucetError::Sink(format!("failed to query table columns: {e}")))?
.iter()
.map(|row| row.get::<String, _>("column_name"))
.collect();
if columns.is_empty() {
return Err(FaucetError::Sink(format!(
"table '{}' has no columns or does not exist",
self.config.table_name
)));
}
let mut matched_rows: Vec<Vec<(&String, &Value)>> = Vec::with_capacity(records.len());
let mut insert_columns: Option<Vec<String>> = None;
for record in records {
let obj = record
.as_object()
.ok_or_else(|| FaucetError::Sink("AutoMap requires JSON object records".into()))?;
let matching: Vec<(&String, &Value)> = columns
.iter()
.filter_map(|col| obj.get(col).map(|v| (col, v)))
.collect();
if matching.is_empty() {
tracing::warn!(
record_keys = ?obj.keys().collect::<Vec<_>>(),
table_columns = ?columns,
"record has no keys matching table columns, skipping"
);
continue;
}
if insert_columns.is_none() {
insert_columns = Some(matching.iter().map(|(c, _)| (*c).clone()).collect());
}
matched_rows.push(matching);
}
let insert_columns = match insert_columns {
Some(cols) => cols,
None => return Ok(0),
};
if matched_rows.is_empty() {
return Ok(0);
}
let num_cols = insert_columns.len();
let num_rows = matched_rows.len();
let col_names: Vec<String> = insert_columns.iter().map(|c| quote_ident(c)).collect();
let mut value_tuples: Vec<String> = Vec::with_capacity(num_rows);
for row_idx in 0..num_rows {
let start = row_idx * num_cols + 1;
let placeholders: Vec<String> =
(start..start + num_cols).map(|i| format!("${i}")).collect();
value_tuples.push(format!("({})", placeholders.join(", ")));
}
let query = format!(
"INSERT INTO {} ({}) VALUES {}",
quote_ident(&self.config.table_name),
col_names.join(", "),
value_tuples.join(", ")
);
let mut q = sqlx::query(&query);
for matched in &matched_rows {
for col in &insert_columns {
let val = matched.iter().find(|(c, _)| *c == col).map(|(_, v)| *v);
q = q.bind(val.cloned());
}
}
q.execute(&self.pool)
.await
.map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
Ok(num_rows)
}
}
#[async_trait]
impl faucet_core::Sink for PostgresSink {
fn config_schema(&self) -> serde_json::Value {
serde_json::to_value(faucet_core::schema_for!(PostgresSinkConfig))
.expect("schema serialization")
}
async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
let mut total = 0;
for chunk in records.chunks(self.config.batch_size) {
total += match &self.config.column_mapping {
PostgresColumnMapping::Jsonb { column } => self.insert_jsonb(chunk, column).await?,
PostgresColumnMapping::AutoMap => self.insert_auto_map(chunk).await?,
};
}
tracing::info!(
table = %self.config.table_name,
rows = total,
"PostgreSQL write complete"
);
Ok(total)
}
}