faucet_sink_postgres/
config.rs1use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
8#[serde(rename_all = "snake_case")]
9pub enum PostgresColumnMapping {
10 Jsonb { column: String },
13 AutoMap,
16}
17
18impl Default for PostgresColumnMapping {
19 fn default() -> Self {
20 Self::Jsonb {
21 column: "data".into(),
22 }
23 }
24}
25
26#[derive(Clone, Serialize, Deserialize, JsonSchema)]
28pub struct PostgresSinkConfig {
29 pub connection_url: String,
31 pub table_name: String,
33 pub column_mapping: PostgresColumnMapping,
35 pub batch_size: usize,
37 pub max_connections: u32,
39}
40
41impl std::fmt::Debug for PostgresSinkConfig {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("PostgresSinkConfig")
44 .field("connection_url", &"***")
45 .field("table_name", &self.table_name)
46 .field("column_mapping", &self.column_mapping)
47 .field("batch_size", &self.batch_size)
48 .field("max_connections", &self.max_connections)
49 .finish()
50 }
51}
52
53impl PostgresSinkConfig {
54 pub fn new(connection_url: impl Into<String>, table_name: impl Into<String>) -> Self {
56 Self {
57 connection_url: connection_url.into(),
58 table_name: table_name.into(),
59 column_mapping: PostgresColumnMapping::default(),
60 batch_size: 500,
61 max_connections: 5,
62 }
63 }
64
65 pub fn column_mapping(mut self, mapping: PostgresColumnMapping) -> Self {
67 self.column_mapping = mapping;
68 self
69 }
70
71 pub fn batch_size(mut self, n: usize) -> Self {
73 self.batch_size = n;
74 self
75 }
76
77 pub fn max_connections(mut self, n: u32) -> Self {
79 self.max_connections = n;
80 self
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87
88 #[test]
89 fn default_config() {
90 let config = PostgresSinkConfig::new("postgres://localhost/test", "events");
91 assert_eq!(config.table_name, "events");
92 assert_eq!(config.batch_size, 500);
93 assert!(matches!(
94 config.column_mapping,
95 PostgresColumnMapping::Jsonb { ref column } if column == "data"
96 ));
97 }
98
99 #[test]
100 fn builder_methods() {
101 let config = PostgresSinkConfig::new("postgres://localhost/test", "events")
102 .column_mapping(PostgresColumnMapping::AutoMap)
103 .batch_size(100);
104 assert_eq!(config.batch_size, 100);
105 assert!(matches!(
106 config.column_mapping,
107 PostgresColumnMapping::AutoMap
108 ));
109 }
110
111 #[test]
112 fn jsonb_custom_column() {
113 let config = PostgresSinkConfig::new("postgres://localhost/test", "events").column_mapping(
114 PostgresColumnMapping::Jsonb {
115 column: "payload".into(),
116 },
117 );
118 assert!(matches!(
119 config.column_mapping,
120 PostgresColumnMapping::Jsonb { ref column } if column == "payload"
121 ));
122 }
123}