faucet_source_postgres/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8#[derive(Clone, Serialize, Deserialize, JsonSchema)]
10pub struct PostgresSourceConfig {
11 pub connection_url: String,
13 pub query: String,
15 #[serde(default)]
17 pub params: Vec<Value>,
18 #[serde(default = "default_max_connections")]
20 pub max_connections: u32,
21 #[serde(default = "default_batch_size")]
30 pub batch_size: usize,
31}
32
33fn default_max_connections() -> u32 {
34 10
35}
36
37fn default_batch_size() -> usize {
38 DEFAULT_BATCH_SIZE
39}
40
41impl std::fmt::Debug for PostgresSourceConfig {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("PostgresSourceConfig")
44 .field("connection_url", &"***")
45 .field("query", &self.query)
46 .field("params", &self.params)
47 .field("max_connections", &self.max_connections)
48 .field("batch_size", &self.batch_size)
49 .finish()
50 }
51}
52
53impl PostgresSourceConfig {
54 pub fn new(connection_url: impl Into<String>, query: impl Into<String>) -> Self {
56 Self {
57 connection_url: connection_url.into(),
58 query: query.into(),
59 params: Vec::new(),
60 max_connections: 10,
61 batch_size: DEFAULT_BATCH_SIZE,
62 }
63 }
64
65 pub fn params(mut self, params: Vec<Value>) -> Self {
67 self.params = params;
68 self
69 }
70
71 pub fn with_max_connections(mut self, max_connections: u32) -> Self {
73 self.max_connections = max_connections;
74 self
75 }
76
77 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
82 self.batch_size = batch_size;
83 self
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90 use serde_json::json;
91
92 #[test]
93 fn default_config() {
94 let config = PostgresSourceConfig::new("postgres://localhost/test", "SELECT * FROM events");
95 assert_eq!(config.query, "SELECT * FROM events");
96 assert!(config.params.is_empty());
97 }
98
99 #[test]
100 fn builder_with_params() {
101 let config = PostgresSourceConfig::new(
102 "postgres://localhost/test",
103 "SELECT * FROM events WHERE id = $1",
104 )
105 .params(vec![json!(42)]);
106 assert_eq!(config.params.len(), 1);
107 assert_eq!(config.params[0], json!(42));
108 }
109
110 #[test]
111 fn debug_masks_connection_url() {
112 let config = PostgresSourceConfig::new("postgres://secret:pass@host/db", "SELECT 1");
113 let debug = format!("{config:?}");
114 assert!(debug.contains("***"));
115 assert!(!debug.contains("secret"));
116 assert!(!debug.contains("pass"));
117 }
118
119 #[test]
120 fn batch_size_defaults_to_default_batch_size() {
121 let config = PostgresSourceConfig::new("postgres://localhost/test", "SELECT 1");
122 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
123 }
124
125 #[test]
126 fn with_batch_size_overrides_default() {
127 let config =
128 PostgresSourceConfig::new("postgres://localhost/test", "SELECT 1").with_batch_size(500);
129 assert_eq!(config.batch_size, 500);
130 }
131
132 #[test]
133 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
134 let config =
135 PostgresSourceConfig::new("postgres://localhost/test", "SELECT 1").with_batch_size(0);
136 assert_eq!(config.batch_size, 0);
137 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
138 }
139
140 #[test]
141 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
142 let config = PostgresSourceConfig::new("postgres://localhost/test", "SELECT 1")
143 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
144 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
145 }
146
147 #[test]
148 fn batch_size_deserializes_from_json() {
149 let json = r#"{
150 "connection_url": "postgres://localhost/test",
151 "query": "SELECT 1",
152 "batch_size": 250
153 }"#;
154 let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
155 assert_eq!(config.batch_size, 250);
156 }
157}