Skip to main content

faucet_sink_postgres/
sink.rs

1//! PostgreSQL sink implementation.
2
3use crate::config::{PostgresColumnMapping, PostgresSinkConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use faucet_core::util::quote_ident;
7use serde_json::Value;
8use sqlx::postgres::PgPoolOptions;
9use sqlx::{PgPool, Row};
10
11/// A sink that writes JSON records to a PostgreSQL table.
12pub struct PostgresSink {
13    config: PostgresSinkConfig,
14    pool: PgPool,
15}
16
17impl PostgresSink {
18    /// Create a new PostgreSQL sink. Establishes a connection pool.
19    pub async fn new(config: PostgresSinkConfig) -> Result<Self, FaucetError> {
20        let pool = PgPoolOptions::new()
21            .max_connections(config.max_connections)
22            .connect(&config.connection_url)
23            .await
24            .map_err(|e| FaucetError::Sink(format!("PostgreSQL connection failed: {e}")))?;
25
26        Ok(Self { config, pool })
27    }
28
29    /// Insert a batch of records using JSONB column mode.
30    async fn insert_jsonb(&self, records: &[Value], column: &str) -> Result<usize, FaucetError> {
31        if records.is_empty() {
32            return Ok(0);
33        }
34
35        // Use a single INSERT with unnest for efficiency.
36        let json_values: Vec<serde_json::Value> = records.to_vec();
37        let query = format!(
38            "INSERT INTO {} ({}) SELECT * FROM unnest($1::jsonb[])",
39            quote_ident(&self.config.table_name),
40            quote_ident(column)
41        );
42
43        sqlx::query(&query)
44            .bind(json_values)
45            .execute(&self.pool)
46            .await
47            .map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
48
49        Ok(records.len())
50    }
51
52    /// Insert a batch of records using auto-mapped columns.
53    ///
54    /// Discovers column names from the table schema and maps
55    /// top-level JSON fields to columns. Values are inserted as JSONB.
56    /// Uses a single multi-row INSERT for efficiency.
57    async fn insert_auto_map(&self, records: &[Value]) -> Result<usize, FaucetError> {
58        if records.is_empty() {
59            return Ok(0);
60        }
61
62        // Get column names from the table.
63        let columns: Vec<String> = sqlx::query(
64            "SELECT column_name FROM information_schema.columns WHERE table_name = $1 ORDER BY ordinal_position"
65        )
66        .bind(&self.config.table_name)
67        .fetch_all(&self.pool)
68        .await
69        .map_err(|e| FaucetError::Sink(format!("failed to query table columns: {e}")))?
70        .iter()
71        .map(|row| row.get::<String, _>("column_name"))
72        .collect();
73
74        if columns.is_empty() {
75            return Err(FaucetError::Sink(format!(
76                "table '{}' has no columns or does not exist",
77                self.config.table_name
78            )));
79        }
80
81        // Pre-validate all records and collect matched column values.
82        // Each entry is the list of (column_index, value) for one record.
83        let mut matched_rows: Vec<Vec<(&String, &Value)>> = Vec::with_capacity(records.len());
84
85        // Determine the set of columns used across all records by using
86        // the columns from the first valid record. All rows in a single
87        // multi-row INSERT must share the same column list.
88        let mut insert_columns: Option<Vec<String>> = None;
89
90        for record in records {
91            let obj = record
92                .as_object()
93                .ok_or_else(|| FaucetError::Sink("AutoMap requires JSON object records".into()))?;
94
95            let matching: Vec<(&String, &Value)> = columns
96                .iter()
97                .filter_map(|col| obj.get(col).map(|v| (col, v)))
98                .collect();
99
100            if matching.is_empty() {
101                tracing::warn!(
102                    record_keys = ?obj.keys().collect::<Vec<_>>(),
103                    table_columns = ?columns,
104                    "record has no keys matching table columns, skipping"
105                );
106                continue;
107            }
108
109            // Fix the column set from the first valid record.
110            if insert_columns.is_none() {
111                insert_columns = Some(matching.iter().map(|(c, _)| (*c).clone()).collect());
112            }
113
114            matched_rows.push(matching);
115        }
116
117        let insert_columns = match insert_columns {
118            Some(cols) => cols,
119            None => return Ok(0),
120        };
121
122        if matched_rows.is_empty() {
123            return Ok(0);
124        }
125
126        let num_cols = insert_columns.len();
127        let num_rows = matched_rows.len();
128        let col_names: Vec<String> = insert_columns.iter().map(|c| quote_ident(c)).collect();
129
130        // Build multi-row VALUES clause: ($1, $2), ($3, $4), ...
131        let mut value_tuples: Vec<String> = Vec::with_capacity(num_rows);
132        for row_idx in 0..num_rows {
133            let start = row_idx * num_cols + 1;
134            let placeholders: Vec<String> =
135                (start..start + num_cols).map(|i| format!("${i}")).collect();
136            value_tuples.push(format!("({})", placeholders.join(", ")));
137        }
138
139        let query = format!(
140            "INSERT INTO {} ({}) VALUES {}",
141            quote_ident(&self.config.table_name),
142            col_names.join(", "),
143            value_tuples.join(", ")
144        );
145
146        let mut q = sqlx::query(&query);
147        for matched in &matched_rows {
148            // Bind values in the fixed column order. If a record is missing
149            // a column that appeared in the first record, bind null.
150            for col in &insert_columns {
151                let val = matched.iter().find(|(c, _)| *c == col).map(|(_, v)| *v);
152                q = q.bind(val.cloned());
153            }
154        }
155
156        q.execute(&self.pool)
157            .await
158            .map_err(|e| FaucetError::Sink(format!("PostgreSQL insert failed: {e}")))?;
159
160        Ok(num_rows)
161    }
162}
163
164#[async_trait]
165impl faucet_core::Sink for PostgresSink {
166    fn config_schema(&self) -> serde_json::Value {
167        serde_json::to_value(faucet_core::schema_for!(PostgresSinkConfig))
168            .expect("schema serialization")
169    }
170
171    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
172        let mut total = 0;
173        for chunk in records.chunks(self.config.batch_size) {
174            total += match &self.config.column_mapping {
175                PostgresColumnMapping::Jsonb { column } => self.insert_jsonb(chunk, column).await?,
176                PostgresColumnMapping::AutoMap => self.insert_auto_map(chunk).await?,
177            };
178        }
179
180        tracing::info!(
181            table = %self.config.table_name,
182            rows = total,
183            "PostgreSQL write complete"
184        );
185        Ok(total)
186    }
187}