Skip to main content

faucet_source_postgres/
stream.rs

1//! PostgreSQL source implementation.
2
3use crate::config::PostgresSourceConfig;
4use async_trait::async_trait;
5use faucet_core::{FaucetError, Stream, StreamPage};
6use futures::TryStreamExt;
7use serde_json::Value;
8use sqlx::postgres::PgPoolOptions;
9use sqlx::{Column, PgPool, Row};
10use std::pin::Pin;
11
12/// A source that executes a SQL query against PostgreSQL and returns rows as JSON.
13pub struct PostgresSource {
14    config: PostgresSourceConfig,
15    pool: PgPool,
16}
17
18impl PostgresSource {
19    /// Create a new PostgreSQL source. Establishes a connection pool.
20    pub async fn new(config: PostgresSourceConfig) -> Result<Self, FaucetError> {
21        faucet_core::validate_batch_size(config.batch_size)?;
22
23        let pool = PgPoolOptions::new()
24            .max_connections(config.max_connections)
25            .connect(&config.connection_url)
26            .await
27            .map_err(|e| FaucetError::Config(format!("PostgreSQL connection failed: {e}")))?;
28
29        Ok(Self { config, pool })
30    }
31}
32
33/// Convert a raw sqlx column value to a `serde_json::Value`.
34///
35/// Uses `try_get_raw` to inspect the type info and convert accordingly.
36/// Falls back to `Value::Null` for unsupported or null columns.
37fn pg_value_to_json(row: &sqlx::postgres::PgRow, col_name: &str) -> Value {
38    // Try JSON/JSONB first — this is the most flexible
39    if let Ok(v) = row.try_get::<Value, _>(col_name) {
40        return v;
41    }
42
43    // Try common scalar types
44    if let Ok(v) = row.try_get::<String, _>(col_name) {
45        return Value::String(v);
46    }
47    if let Ok(v) = row.try_get::<i64, _>(col_name) {
48        return Value::Number(v.into());
49    }
50    if let Ok(v) = row.try_get::<i32, _>(col_name) {
51        return Value::Number(v.into());
52    }
53    if let Ok(v) = row.try_get::<i16, _>(col_name) {
54        return Value::Number(v.into());
55    }
56    if let Ok(v) = row.try_get::<f64, _>(col_name) {
57        return serde_json::Number::from_f64(v)
58            .map(Value::Number)
59            .unwrap_or(Value::Null);
60    }
61    if let Ok(v) = row.try_get::<f32, _>(col_name) {
62        return serde_json::Number::from_f64(v as f64)
63            .map(Value::Number)
64            .unwrap_or(Value::Null);
65    }
66    if let Ok(v) = row.try_get::<bool, _>(col_name) {
67        return Value::Bool(v);
68    }
69
70    // Richer types that would otherwise silently decode to Null (#78/#43).
71    // Timestamps → RFC3339 / ISO-8601 strings.
72    if let Ok(v) =
73        row.try_get::<sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>, _>(col_name)
74    {
75        return Value::String(v.to_rfc3339());
76    }
77    if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveDateTime, _>(col_name) {
78        return Value::String(v.to_string());
79    }
80    if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveDate, _>(col_name) {
81        return Value::String(v.to_string());
82    }
83    if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveTime, _>(col_name) {
84        return Value::String(v.to_string());
85    }
86    // UUID → canonical hyphenated string.
87    if let Ok(v) = row.try_get::<sqlx::types::Uuid, _>(col_name) {
88        return Value::String(v.to_string());
89    }
90    // NUMERIC / DECIMAL → string, preserving exact precision.
91    if let Ok(v) = row.try_get::<sqlx::types::BigDecimal, _>(col_name) {
92        return Value::String(v.to_string());
93    }
94    // BYTEA → base64 (so binary survives the JSON round-trip).
95    if let Ok(v) = row.try_get::<Vec<u8>, _>(col_name) {
96        use base64::Engine as _;
97        return Value::String(base64::engine::general_purpose::STANDARD.encode(v));
98    }
99
100    Value::Null
101}
102
103/// Build the effective SQL query and ordered context-bind values for a given
104/// parent context. Returns the literal query when there is no context.
105fn resolve_query(
106    config: &PostgresSourceConfig,
107    context: &std::collections::HashMap<String, Value>,
108) -> (String, Vec<Value>) {
109    if context.is_empty() {
110        (config.query.clone(), Vec::new())
111    } else {
112        faucet_core::util::substitute_context_bind_params(
113            &config.query,
114            context,
115            config.params.len() + 1,
116            |i| format!("${i}"),
117        )
118    }
119}
120
121/// Apply configured params followed by context-derived bind values onto a
122/// sqlx query.
123fn bind_params<'q>(
124    mut query: sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres::PgArguments>,
125    config_params: &'q [Value],
126    bind_values: &'q [Value],
127) -> sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres::PgArguments> {
128    // Bind the static config params and the per-context values as native
129    // scalar types, in positional order ($1, $2, …). Binding a raw
130    // `serde_json::Value` encodes it as `jsonb` (sqlx), which breaks comparisons
131    // against typed columns — e.g. `WHERE id = $1` against an integer column
132    // fails with "operator does not exist: integer = jsonb". config_params
133    // previously bound the raw Value and hit exactly this (audit #146 H12).
134    for value in config_params.iter().chain(bind_values) {
135        query = match value {
136            Value::String(s) => query.bind(s.clone()),
137            Value::Number(n) if n.is_i64() => query.bind(n.as_i64().unwrap()),
138            Value::Number(n) => query.bind(n.as_f64().unwrap_or(0.0)),
139            Value::Bool(b) => query.bind(*b),
140            Value::Null => query.bind(None::<String>),
141            _ => query.bind(value.to_string()),
142        };
143    }
144    query
145}
146
147/// Convert a single `PgRow` into a JSON object whose keys are the row's
148/// column names.
149fn row_to_json(row: &sqlx::postgres::PgRow) -> Value {
150    let mut map = serde_json::Map::new();
151    for col in row.columns() {
152        let name = col.name().to_string();
153        let value = pg_value_to_json(row, &name);
154        map.insert(name, value);
155    }
156    Value::Object(map)
157}
158
159#[async_trait]
160impl faucet_core::Source for PostgresSource {
161    async fn fetch_with_context(
162        &self,
163        context: &std::collections::HashMap<String, serde_json::Value>,
164    ) -> Result<Vec<Value>, FaucetError> {
165        let (query_str, bind_values) = resolve_query(&self.config, context);
166        let query = bind_params(sqlx::query(&query_str), &self.config.params, &bind_values);
167
168        let rows = query
169            .fetch_all(&self.pool)
170            .await
171            .map_err(|e| FaucetError::Config(format!("PostgreSQL query failed: {e}")))?;
172
173        let records: Vec<Value> = rows.iter().map(row_to_json).collect();
174        tracing::info!(rows = records.len(), query = %self.config.query, "PostgreSQL source fetch complete");
175        Ok(records)
176    }
177
178    /// Stream rows from the underlying sqlx cursor without buffering the full
179    /// result set. Each emitted [`StreamPage`] holds up to
180    /// [`PostgresSourceConfig::batch_size`] rows.
181    ///
182    /// The trait-level `batch_size` argument is ignored in favour of the
183    /// config field — the config is the user-facing knob the README
184    /// documents, and routing the pipeline-supplied hint through it would
185    /// silently override an explicit config value.
186    ///
187    /// `batch_size = 0` drains the entire cursor into a single page. The
188    /// postgres query source has no incremental-replication mode today, so
189    /// every emitted page carries `bookmark: None`.
190    fn stream_pages<'a>(
191        &'a self,
192        context: &'a std::collections::HashMap<String, Value>,
193        _batch_size: usize,
194    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
195        let batch_size = self.config.batch_size;
196
197        Box::pin(async_stream::try_stream! {
198            let (query_str, bind_values) = resolve_query(&self.config, context);
199            let query = bind_params(
200                sqlx::query(&query_str),
201                &self.config.params,
202                &bind_values,
203            );
204
205            let mut rows = query.fetch(&self.pool);
206            let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
207            let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
208            let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
209            let mut total = 0usize;
210
211            while let Some(row) = rows
212                .try_next()
213                .await
214                .map_err(|e| FaucetError::Config(format!("PostgreSQL query failed: {e}")))?
215            {
216                buffer.push(row_to_json(&row));
217                if buffer.len() >= chunk {
218                    let page = std::mem::replace(&mut buffer, Vec::with_capacity(initial_capacity));
219                    total += page.len();
220                    yield StreamPage { records: page, bookmark: None };
221                }
222            }
223            if !buffer.is_empty() {
224                total += buffer.len();
225                yield StreamPage { records: buffer, bookmark: None };
226            }
227
228            tracing::info!(
229                rows = total,
230                batch_size,
231                query = %self.config.query,
232                "PostgreSQL source stream complete",
233            );
234        })
235    }
236
237    fn config_schema(&self) -> serde_json::Value {
238        serde_json::to_value(faucet_core::schema_for!(PostgresSourceConfig))
239            .expect("schema serialization")
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    #[tokio::test]
248    async fn new_rejects_out_of_range_batch_size() {
249        let mut config = PostgresSourceConfig::new("postgres://localhost/test", "SELECT 1");
250        config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
251        match PostgresSource::new(config).await {
252            Err(faucet_core::FaucetError::Config(m)) => {
253                assert!(m.contains("batch_size"), "got: {m}")
254            }
255            _ => panic!("expected a batch_size Config error"),
256        }
257    }
258}