Skip to main content

faucet_source_postgres/
stream.rs

1//! PostgreSQL source implementation.
2
3use crate::config::PostgresSourceConfig;
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7use sqlx::postgres::PgPoolOptions;
8use sqlx::{Column, PgPool, Row};
9
10/// A source that executes a SQL query against PostgreSQL and returns rows as JSON.
11pub struct PostgresSource {
12    config: PostgresSourceConfig,
13    pool: PgPool,
14}
15
16impl PostgresSource {
17    /// Create a new PostgreSQL source. Establishes a connection pool.
18    pub async fn new(config: PostgresSourceConfig) -> Result<Self, FaucetError> {
19        let pool = PgPoolOptions::new()
20            .max_connections(config.max_connections)
21            .connect(&config.connection_url)
22            .await
23            .map_err(|e| FaucetError::Config(format!("PostgreSQL connection failed: {e}")))?;
24
25        Ok(Self { config, pool })
26    }
27}
28
29/// Convert a raw sqlx column value to a `serde_json::Value`.
30///
31/// Uses `try_get_raw` to inspect the type info and convert accordingly.
32/// Falls back to `Value::Null` for unsupported or null columns.
33fn pg_value_to_json(row: &sqlx::postgres::PgRow, col_name: &str) -> Value {
34    // Try JSON/JSONB first — this is the most flexible
35    if let Ok(v) = row.try_get::<Value, _>(col_name) {
36        return v;
37    }
38
39    // Try common scalar types
40    if let Ok(v) = row.try_get::<String, _>(col_name) {
41        return Value::String(v);
42    }
43    if let Ok(v) = row.try_get::<i64, _>(col_name) {
44        return Value::Number(v.into());
45    }
46    if let Ok(v) = row.try_get::<i32, _>(col_name) {
47        return Value::Number(v.into());
48    }
49    if let Ok(v) = row.try_get::<i16, _>(col_name) {
50        return Value::Number(v.into());
51    }
52    if let Ok(v) = row.try_get::<f64, _>(col_name) {
53        return serde_json::Number::from_f64(v)
54            .map(Value::Number)
55            .unwrap_or(Value::Null);
56    }
57    if let Ok(v) = row.try_get::<f32, _>(col_name) {
58        return serde_json::Number::from_f64(v as f64)
59            .map(Value::Number)
60            .unwrap_or(Value::Null);
61    }
62    if let Ok(v) = row.try_get::<bool, _>(col_name) {
63        return Value::Bool(v);
64    }
65
66    Value::Null
67}
68
69#[async_trait]
70impl faucet_core::Source for PostgresSource {
71    async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
72        let mut query = sqlx::query(&self.config.query);
73
74        for param in &self.config.params {
75            query = query.bind(param);
76        }
77
78        let rows = query
79            .fetch_all(&self.pool)
80            .await
81            .map_err(|e| FaucetError::Config(format!("PostgreSQL query failed: {e}")))?;
82
83        let mut records = Vec::with_capacity(rows.len());
84        for row in &rows {
85            let mut map = serde_json::Map::new();
86            for col in row.columns() {
87                let name = col.name().to_string();
88                let value = pg_value_to_json(row, &name);
89                map.insert(name, value);
90            }
91            records.push(Value::Object(map));
92        }
93
94        tracing::info!(rows = records.len(), query = %self.config.query, "PostgreSQL source fetch complete");
95        Ok(records)
96    }
97
98    fn config_schema(&self) -> serde_json::Value {
99        serde_json::to_value(faucet_core::schema_for!(PostgresSourceConfig))
100            .expect("schema serialization")
101    }
102}