faucet_source_postgres/
stream.rs1use crate::config::PostgresSourceConfig;
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7use sqlx::postgres::PgPoolOptions;
8use sqlx::{Column, PgPool, Row};
9
10pub struct PostgresSource {
12 config: PostgresSourceConfig,
13 pool: PgPool,
14}
15
16impl PostgresSource {
17 pub async fn new(config: PostgresSourceConfig) -> Result<Self, FaucetError> {
19 let pool = PgPoolOptions::new()
20 .max_connections(config.max_connections)
21 .connect(&config.connection_url)
22 .await
23 .map_err(|e| FaucetError::Config(format!("PostgreSQL connection failed: {e}")))?;
24
25 Ok(Self { config, pool })
26 }
27}
28
29fn pg_value_to_json(row: &sqlx::postgres::PgRow, col_name: &str) -> Value {
34 if let Ok(v) = row.try_get::<Value, _>(col_name) {
36 return v;
37 }
38
39 if let Ok(v) = row.try_get::<String, _>(col_name) {
41 return Value::String(v);
42 }
43 if let Ok(v) = row.try_get::<i64, _>(col_name) {
44 return Value::Number(v.into());
45 }
46 if let Ok(v) = row.try_get::<i32, _>(col_name) {
47 return Value::Number(v.into());
48 }
49 if let Ok(v) = row.try_get::<i16, _>(col_name) {
50 return Value::Number(v.into());
51 }
52 if let Ok(v) = row.try_get::<f64, _>(col_name) {
53 return serde_json::Number::from_f64(v)
54 .map(Value::Number)
55 .unwrap_or(Value::Null);
56 }
57 if let Ok(v) = row.try_get::<f32, _>(col_name) {
58 return serde_json::Number::from_f64(v as f64)
59 .map(Value::Number)
60 .unwrap_or(Value::Null);
61 }
62 if let Ok(v) = row.try_get::<bool, _>(col_name) {
63 return Value::Bool(v);
64 }
65
66 Value::Null
67}
68
69#[async_trait]
70impl faucet_core::Source for PostgresSource {
71 async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
72 let mut query = sqlx::query(&self.config.query);
73
74 for param in &self.config.params {
75 query = query.bind(param);
76 }
77
78 let rows = query
79 .fetch_all(&self.pool)
80 .await
81 .map_err(|e| FaucetError::Config(format!("PostgreSQL query failed: {e}")))?;
82
83 let mut records = Vec::with_capacity(rows.len());
84 for row in &rows {
85 let mut map = serde_json::Map::new();
86 for col in row.columns() {
87 let name = col.name().to_string();
88 let value = pg_value_to_json(row, &name);
89 map.insert(name, value);
90 }
91 records.push(Value::Object(map));
92 }
93
94 tracing::info!(rows = records.len(), query = %self.config.query, "PostgreSQL source fetch complete");
95 Ok(records)
96 }
97
98 fn config_schema(&self) -> serde_json::Value {
99 serde_json::to_value(faucet_core::schema_for!(PostgresSourceConfig))
100 .expect("schema serialization")
101 }
102}