faucet_sink_postgres/
sink.rs1use crate::config::{PostgresColumnMapping, PostgresSinkConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use faucet_core::util::quote_ident;
7use serde_json::Value;
8use sqlx::postgres::PgPoolOptions;
9use sqlx::{PgPool, Row};
10
11pub struct PostgresSink {
13 config: PostgresSinkConfig,
14 pool: PgPool,
15}
16
17impl PostgresSink {
18 pub async fn new(config: PostgresSinkConfig) -> Result<Self, FaucetError> {
20 let pool = PgPoolOptions::new()
21 .max_connections(config.max_connections)
22 .connect(&config.connection_url)
23 .await
24 .map_err(|e| FaucetError::Sink(format!("PostgreSQL connection failed: {e}")))?;
25
26 Ok(Self { config, pool })
27 }
28
29 async fn insert_jsonb(&self, records: &[Value], column: &str) -> Result<usize, FaucetError> {
31 if records.is_empty() {
32 return Ok(0);
33 }
34
35 let json_values: Vec<serde_json::Value> = records.to_vec();
37 let query = format!(
38 "INSERT INTO {} ({}) SELECT * FROM unnest($1::jsonb[])",
39 quote_ident(&self.config.table_name),
40 quote_ident(column)
41 );
42
43 sqlx::query(&query)
44 .bind(json_values)
45 .execute(&self.pool)
46 .await
47 .map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
48
49 Ok(records.len())
50 }
51
52 async fn insert_auto_map(&self, records: &[Value]) -> Result<usize, FaucetError> {
58 if records.is_empty() {
59 return Ok(0);
60 }
61
62 let columns: Vec<String> = sqlx::query(
64 "SELECT column_name FROM information_schema.columns WHERE table_name = $1 ORDER BY ordinal_position"
65 )
66 .bind(&self.config.table_name)
67 .fetch_all(&self.pool)
68 .await
69 .map_err(|e| FaucetError::Sink(format!("failed to query table columns: {e}")))?
70 .iter()
71 .map(|row| row.get::<String, _>("column_name"))
72 .collect();
73
74 if columns.is_empty() {
75 return Err(FaucetError::Sink(format!(
76 "table '{}' has no columns or does not exist",
77 self.config.table_name
78 )));
79 }
80
81 let mut matched_rows: Vec<Vec<(&String, &Value)>> = Vec::with_capacity(records.len());
84
85 let mut insert_columns: Option<Vec<String>> = None;
89
90 for record in records {
91 let obj = record
92 .as_object()
93 .ok_or_else(|| FaucetError::Sink("AutoMap requires JSON object records".into()))?;
94
95 let matching: Vec<(&String, &Value)> = columns
96 .iter()
97 .filter_map(|col| obj.get(col).map(|v| (col, v)))
98 .collect();
99
100 if matching.is_empty() {
101 tracing::warn!(
102 record_keys = ?obj.keys().collect::<Vec<_>>(),
103 table_columns = ?columns,
104 "record has no keys matching table columns, skipping"
105 );
106 continue;
107 }
108
109 if insert_columns.is_none() {
111 insert_columns = Some(matching.iter().map(|(c, _)| (*c).clone()).collect());
112 }
113
114 matched_rows.push(matching);
115 }
116
117 let insert_columns = match insert_columns {
118 Some(cols) => cols,
119 None => return Ok(0),
120 };
121
122 if matched_rows.is_empty() {
123 return Ok(0);
124 }
125
126 let num_cols = insert_columns.len();
127 let num_rows = matched_rows.len();
128 let col_names: Vec<String> = insert_columns.iter().map(|c| quote_ident(c)).collect();
129
130 let mut value_tuples: Vec<String> = Vec::with_capacity(num_rows);
132 for row_idx in 0..num_rows {
133 let start = row_idx * num_cols + 1;
134 let placeholders: Vec<String> =
135 (start..start + num_cols).map(|i| format!("${i}")).collect();
136 value_tuples.push(format!("({})", placeholders.join(", ")));
137 }
138
139 let query = format!(
140 "INSERT INTO {} ({}) VALUES {}",
141 quote_ident(&self.config.table_name),
142 col_names.join(", "),
143 value_tuples.join(", ")
144 );
145
146 let mut q = sqlx::query(&query);
147 for matched in &matched_rows {
148 for col in &insert_columns {
151 let val = matched.iter().find(|(c, _)| *c == col).map(|(_, v)| *v);
152 q = q.bind(val.cloned());
153 }
154 }
155
156 q.execute(&self.pool)
157 .await
158 .map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
159
160 Ok(num_rows)
161 }
162}
163
164#[async_trait]
165impl faucet_core::Sink for PostgresSink {
166 fn config_schema(&self) -> serde_json::Value {
167 serde_json::to_value(faucet_core::schema_for!(PostgresSinkConfig))
168 .expect("schema serialization")
169 }
170
171 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
172 let mut total = 0;
173 for chunk in records.chunks(self.config.batch_size) {
174 total += match &self.config.column_mapping {
175 PostgresColumnMapping::Jsonb { column } => self.insert_jsonb(chunk, column).await?,
176 PostgresColumnMapping::AutoMap => self.insert_auto_map(chunk).await?,
177 };
178 }
179
180 tracing::info!(
181 table = %self.config.table_name,
182 rows = total,
183 "PostgreSQL write complete"
184 );
185 Ok(total)
186 }
187}