faucet_sink_postgres/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(rename_all = "snake_case")]
10pub enum PostgresColumnMapping {
11 Jsonb { column: String },
14 AutoMap,
17}
18
19impl Default for PostgresColumnMapping {
20 fn default() -> Self {
21 Self::Jsonb {
22 column: "data".into(),
23 }
24 }
25}
26
27#[derive(Clone, Serialize, Deserialize, JsonSchema)]
29pub struct PostgresSinkConfig {
30 pub connection_url: String,
32 pub table_name: String,
34 #[serde(default)]
43 pub schema: Option<String>,
44 pub column_mapping: PostgresColumnMapping,
46 #[serde(default = "default_batch_size")]
67 pub batch_size: usize,
68 pub max_connections: u32,
70}
71
72fn default_batch_size() -> usize {
73 DEFAULT_BATCH_SIZE
74}
75
76impl std::fmt::Debug for PostgresSinkConfig {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("PostgresSinkConfig")
79 .field("connection_url", &"***")
80 .field("table_name", &self.table_name)
81 .field("schema", &self.schema)
82 .field("column_mapping", &self.column_mapping)
83 .field("batch_size", &self.batch_size)
84 .field("max_connections", &self.max_connections)
85 .finish()
86 }
87}
88
89impl PostgresSinkConfig {
90 pub fn new(connection_url: impl Into<String>, table_name: impl Into<String>) -> Self {
92 Self {
93 connection_url: connection_url.into(),
94 table_name: table_name.into(),
95 schema: None,
96 column_mapping: PostgresColumnMapping::default(),
97 batch_size: DEFAULT_BATCH_SIZE,
98 max_connections: 5,
99 }
100 }
101
102 pub fn with_schema(mut self, schema: impl Into<String>) -> Self {
105 self.schema = Some(schema.into());
106 self
107 }
108
109 pub fn column_mapping(mut self, mapping: PostgresColumnMapping) -> Self {
111 self.column_mapping = mapping;
112 self
113 }
114
115 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
121 self.batch_size = batch_size;
122 self
123 }
124
125 pub fn max_connections(mut self, n: u32) -> Self {
127 self.max_connections = n;
128 self
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135
136 #[test]
137 fn default_config() {
138 let config = PostgresSinkConfig::new("postgres://localhost/test", "events");
139 assert_eq!(config.table_name, "events");
140 assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
141 assert!(matches!(
142 config.column_mapping,
143 PostgresColumnMapping::Jsonb { ref column } if column == "data"
144 ));
145 }
146
147 #[test]
148 fn builder_methods() {
149 let config = PostgresSinkConfig::new("postgres://localhost/test", "events")
150 .column_mapping(PostgresColumnMapping::AutoMap)
151 .with_batch_size(100);
152 assert_eq!(config.batch_size, 100);
153 assert!(matches!(
154 config.column_mapping,
155 PostgresColumnMapping::AutoMap
156 ));
157 }
158
159 #[test]
160 fn jsonb_custom_column() {
161 let config = PostgresSinkConfig::new("postgres://localhost/test", "events").column_mapping(
162 PostgresColumnMapping::Jsonb {
163 column: "payload".into(),
164 },
165 );
166 assert!(matches!(
167 config.column_mapping,
168 PostgresColumnMapping::Jsonb { ref column } if column == "payload"
169 ));
170 }
171
172 #[test]
173 fn with_batch_size_overrides_default() {
174 let config =
175 PostgresSinkConfig::new("postgres://localhost/test", "events").with_batch_size(250);
176 assert_eq!(config.batch_size, 250);
177 }
178
179 #[test]
180 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
181 let config =
182 PostgresSinkConfig::new("postgres://localhost/test", "events").with_batch_size(0);
183 assert_eq!(config.batch_size, 0);
184 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
185 }
186
187 #[test]
188 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
189 let config = PostgresSinkConfig::new("postgres://localhost/test", "events")
190 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
191 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
192 }
193
194 #[test]
195 fn batch_size_deserializes_from_json() {
196 let json = r#"{
197 "connection_url": "postgres://localhost/test",
198 "table_name": "events",
199 "column_mapping": {"jsonb": {"column": "data"}},
200 "batch_size": 250,
201 "max_connections": 5
202 }"#;
203 let config: PostgresSinkConfig = serde_json::from_str(json).unwrap();
204 assert_eq!(config.batch_size, 250);
205 }
206
207 #[test]
208 fn batch_size_defaults_when_absent_in_json() {
209 let json = r#"{
210 "connection_url": "postgres://localhost/test",
211 "table_name": "events",
212 "column_mapping": {"jsonb": {"column": "data"}},
213 "max_connections": 5
214 }"#;
215 let config: PostgresSinkConfig = serde_json::from_str(json).unwrap();
216 assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
217 }
218
219 #[test]
220 fn config_builder_chaining() {
221 let config = PostgresSinkConfig::new("postgres://localhost/test", "events")
222 .with_batch_size(100)
223 .with_batch_size(250);
224 assert_eq!(config.batch_size, 250);
225 }
226}