faucet_source_postgres/
stream.rs1use crate::config::PostgresSourceConfig;
4use async_trait::async_trait;
5use faucet_core::{FaucetError, Stream, StreamPage};
6use futures::TryStreamExt;
7use serde_json::Value;
8use sqlx::postgres::PgPoolOptions;
9use sqlx::{Column, PgPool, Row};
10use std::pin::Pin;
11
12pub struct PostgresSource {
14 config: PostgresSourceConfig,
15 pool: PgPool,
16}
17
18impl PostgresSource {
19 pub async fn new(config: PostgresSourceConfig) -> Result<Self, FaucetError> {
21 faucet_core::validate_batch_size(config.batch_size)?;
22
23 let pool = PgPoolOptions::new()
24 .max_connections(config.max_connections)
25 .connect(&config.connection_url)
26 .await
27 .map_err(|e| FaucetError::Config(format!("PostgreSQL connection failed: {e}")))?;
28
29 Ok(Self { config, pool })
30 }
31}
32
33fn pg_value_to_json(row: &sqlx::postgres::PgRow, col_name: &str) -> Value {
38 if let Ok(v) = row.try_get::<Value, _>(col_name) {
40 return v;
41 }
42
43 if let Ok(v) = row.try_get::<String, _>(col_name) {
45 return Value::String(v);
46 }
47 if let Ok(v) = row.try_get::<i64, _>(col_name) {
48 return Value::Number(v.into());
49 }
50 if let Ok(v) = row.try_get::<i32, _>(col_name) {
51 return Value::Number(v.into());
52 }
53 if let Ok(v) = row.try_get::<i16, _>(col_name) {
54 return Value::Number(v.into());
55 }
56 if let Ok(v) = row.try_get::<f64, _>(col_name) {
57 return serde_json::Number::from_f64(v)
58 .map(Value::Number)
59 .unwrap_or(Value::Null);
60 }
61 if let Ok(v) = row.try_get::<f32, _>(col_name) {
62 return serde_json::Number::from_f64(v as f64)
63 .map(Value::Number)
64 .unwrap_or(Value::Null);
65 }
66 if let Ok(v) = row.try_get::<bool, _>(col_name) {
67 return Value::Bool(v);
68 }
69
70 if let Ok(v) =
73 row.try_get::<sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>, _>(col_name)
74 {
75 return Value::String(v.to_rfc3339());
76 }
77 if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveDateTime, _>(col_name) {
78 return Value::String(v.to_string());
79 }
80 if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveDate, _>(col_name) {
81 return Value::String(v.to_string());
82 }
83 if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveTime, _>(col_name) {
84 return Value::String(v.to_string());
85 }
86 if let Ok(v) = row.try_get::<sqlx::types::Uuid, _>(col_name) {
88 return Value::String(v.to_string());
89 }
90 if let Ok(v) = row.try_get::<sqlx::types::BigDecimal, _>(col_name) {
92 return Value::String(v.to_string());
93 }
94 if let Ok(v) = row.try_get::<Vec<u8>, _>(col_name) {
96 use base64::Engine as _;
97 return Value::String(base64::engine::general_purpose::STANDARD.encode(v));
98 }
99
100 Value::Null
101}
102
103fn resolve_query(
106 config: &PostgresSourceConfig,
107 context: &std::collections::HashMap<String, Value>,
108) -> (String, Vec<Value>) {
109 if context.is_empty() {
110 (config.query.clone(), Vec::new())
111 } else {
112 faucet_core::util::substitute_context_bind_params(
113 &config.query,
114 context,
115 config.params.len() + 1,
116 |i| format!("${i}"),
117 )
118 }
119}
120
121fn bind_params<'q>(
124 mut query: sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres::PgArguments>,
125 config_params: &'q [Value],
126 bind_values: &'q [Value],
127) -> sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres::PgArguments> {
128 for value in config_params.iter().chain(bind_values) {
135 query = match value {
136 Value::String(s) => query.bind(s.clone()),
137 Value::Number(n) if n.is_i64() => query.bind(n.as_i64().unwrap()),
138 Value::Number(n) => query.bind(n.as_f64().unwrap_or(0.0)),
139 Value::Bool(b) => query.bind(*b),
140 Value::Null => query.bind(None::<String>),
141 _ => query.bind(value.to_string()),
142 };
143 }
144 query
145}
146
147fn row_to_json(row: &sqlx::postgres::PgRow) -> Value {
150 let mut map = serde_json::Map::new();
151 for col in row.columns() {
152 let name = col.name().to_string();
153 let value = pg_value_to_json(row, &name);
154 map.insert(name, value);
155 }
156 Value::Object(map)
157}
158
159#[async_trait]
160impl faucet_core::Source for PostgresSource {
161 async fn fetch_with_context(
162 &self,
163 context: &std::collections::HashMap<String, serde_json::Value>,
164 ) -> Result<Vec<Value>, FaucetError> {
165 let (query_str, bind_values) = resolve_query(&self.config, context);
166 let query = bind_params(sqlx::query(&query_str), &self.config.params, &bind_values);
167
168 let rows = query
169 .fetch_all(&self.pool)
170 .await
171 .map_err(|e| FaucetError::Config(format!("PostgreSQL query failed: {e}")))?;
172
173 let records: Vec<Value> = rows.iter().map(row_to_json).collect();
174 tracing::info!(rows = records.len(), query = %self.config.query, "PostgreSQL source fetch complete");
175 Ok(records)
176 }
177
178 fn stream_pages<'a>(
191 &'a self,
192 context: &'a std::collections::HashMap<String, Value>,
193 _batch_size: usize,
194 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
195 let batch_size = self.config.batch_size;
196
197 Box::pin(async_stream::try_stream! {
198 let (query_str, bind_values) = resolve_query(&self.config, context);
199 let query = bind_params(
200 sqlx::query(&query_str),
201 &self.config.params,
202 &bind_values,
203 );
204
205 let mut rows = query.fetch(&self.pool);
206 let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
207 let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
208 let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
209 let mut total = 0usize;
210
211 while let Some(row) = rows
212 .try_next()
213 .await
214 .map_err(|e| FaucetError::Config(format!("PostgreSQL query failed: {e}")))?
215 {
216 buffer.push(row_to_json(&row));
217 if buffer.len() >= chunk {
218 let page = std::mem::replace(&mut buffer, Vec::with_capacity(initial_capacity));
219 total += page.len();
220 yield StreamPage { records: page, bookmark: None };
221 }
222 }
223 if !buffer.is_empty() {
224 total += buffer.len();
225 yield StreamPage { records: buffer, bookmark: None };
226 }
227
228 tracing::info!(
229 rows = total,
230 batch_size,
231 query = %self.config.query,
232 "PostgreSQL source stream complete",
233 );
234 })
235 }
236
237 fn config_schema(&self) -> serde_json::Value {
238 serde_json::to_value(faucet_core::schema_for!(PostgresSourceConfig))
239 .expect("schema serialization")
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[tokio::test]
248 async fn new_rejects_out_of_range_batch_size() {
249 let mut config = PostgresSourceConfig::new("postgres://localhost/test", "SELECT 1");
250 config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
251 match PostgresSource::new(config).await {
252 Err(faucet_core::FaucetError::Config(m)) => {
253 assert!(m.contains("batch_size"), "got: {m}")
254 }
255 _ => panic!("expected a batch_size Config error"),
256 }
257 }
258}