Skip to main content

faucet_source_postgres/
config.rs

1//! PostgreSQL source configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8/// Configuration for the PostgreSQL query source.
9#[derive(Clone, Serialize, Deserialize, JsonSchema)]
10pub struct PostgresSourceConfig {
11    /// PostgreSQL connection URL (e.g. `postgres://user:pass@host/db`).
12    pub connection_url: String,
13    /// SQL query to execute.
14    pub query: String,
15    /// Bind parameters for the query. Defaults to empty.
16    #[serde(default)]
17    pub params: Vec<Value>,
18    /// Maximum number of connections in the pool. Defaults to 10.
19    #[serde(default = "default_max_connections")]
20    pub max_connections: u32,
21    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). Rows are
22    /// drained from the sqlx cursor and yielded whenever the buffer reaches
23    /// this size. Defaults to [`DEFAULT_BATCH_SIZE`].
24    ///
25    /// `batch_size = 0` is the "no batching" sentinel: the cursor is fully
26    /// drained and the entire result set is emitted in a single page. Useful
27    /// for small lookup tables or for sinks (e.g. SQL `COPY`, BigQuery load
28    /// jobs) that prefer one large request to many small ones.
29    #[serde(default = "default_batch_size")]
30    pub batch_size: usize,
31}
32
33fn default_max_connections() -> u32 {
34    10
35}
36
37fn default_batch_size() -> usize {
38    DEFAULT_BATCH_SIZE
39}
40
41impl std::fmt::Debug for PostgresSourceConfig {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("PostgresSourceConfig")
44            .field("connection_url", &"***")
45            .field("query", &self.query)
46            .field("params", &self.params)
47            .field("max_connections", &self.max_connections)
48            .field("batch_size", &self.batch_size)
49            .finish()
50    }
51}
52
53impl PostgresSourceConfig {
54    /// Create a new config with the required connection URL and query.
55    pub fn new(connection_url: impl Into<String>, query: impl Into<String>) -> Self {
56        Self {
57            connection_url: connection_url.into(),
58            query: query.into(),
59            params: Vec::new(),
60            max_connections: 10,
61            batch_size: DEFAULT_BATCH_SIZE,
62        }
63    }
64
65    /// Set bind parameters for the query.
66    pub fn params(mut self, params: Vec<Value>) -> Self {
67        self.params = params;
68        self
69    }
70
71    /// Set the maximum number of connections in the pool.
72    pub fn with_max_connections(mut self, max_connections: u32) -> Self {
73        self.max_connections = max_connections;
74        self
75    }
76
77    /// Set the per-page row count for [`Source::stream_pages`](faucet_core::Source::stream_pages).
78    ///
79    /// Pass `0` to opt out of batching — the entire result set is emitted in
80    /// a single [`StreamPage`](faucet_core::StreamPage).
81    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
82        self.batch_size = batch_size;
83        self
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use serde_json::json;
91
92    #[test]
93    fn default_config() {
94        let config = PostgresSourceConfig::new("postgres://localhost/test", "SELECT * FROM events");
95        assert_eq!(config.query, "SELECT * FROM events");
96        assert!(config.params.is_empty());
97    }
98
99    #[test]
100    fn builder_with_params() {
101        let config = PostgresSourceConfig::new(
102            "postgres://localhost/test",
103            "SELECT * FROM events WHERE id = $1",
104        )
105        .params(vec![json!(42)]);
106        assert_eq!(config.params.len(), 1);
107        assert_eq!(config.params[0], json!(42));
108    }
109
110    #[test]
111    fn debug_masks_connection_url() {
112        let config = PostgresSourceConfig::new("postgres://secret:pass@host/db", "SELECT 1");
113        let debug = format!("{config:?}");
114        assert!(debug.contains("***"));
115        assert!(!debug.contains("secret"));
116        assert!(!debug.contains("pass"));
117    }
118
119    #[test]
120    fn batch_size_defaults_to_default_batch_size() {
121        let config = PostgresSourceConfig::new("postgres://localhost/test", "SELECT 1");
122        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
123    }
124
125    #[test]
126    fn with_batch_size_overrides_default() {
127        let config =
128            PostgresSourceConfig::new("postgres://localhost/test", "SELECT 1").with_batch_size(500);
129        assert_eq!(config.batch_size, 500);
130    }
131
132    #[test]
133    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
134        let config =
135            PostgresSourceConfig::new("postgres://localhost/test", "SELECT 1").with_batch_size(0);
136        assert_eq!(config.batch_size, 0);
137        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
138    }
139
140    #[test]
141    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
142        let config = PostgresSourceConfig::new("postgres://localhost/test", "SELECT 1")
143            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
144        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
145    }
146
147    #[test]
148    fn batch_size_deserializes_from_json() {
149        let json = r#"{
150            "connection_url": "postgres://localhost/test",
151            "query": "SELECT 1",
152            "batch_size": 250
153        }"#;
154        let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
155        assert_eq!(config.batch_size, 250);
156    }
157}