Skip to main content

faucet_sink_postgres/
config.rs

1//! PostgreSQL sink configuration.
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6/// How to map JSON records to table columns.
7#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
8#[serde(rename_all = "snake_case")]
9pub enum PostgresColumnMapping {
10    /// Insert each record as a single `jsonb` column. The column name
11    /// defaults to `"data"` but can be overridden.
12    Jsonb { column: String },
13    /// Map top-level JSON keys directly to table columns.
14    /// Only keys that match existing columns are inserted; extra keys are ignored.
15    AutoMap,
16}
17
18impl Default for PostgresColumnMapping {
19    fn default() -> Self {
20        Self::Jsonb {
21            column: "data".into(),
22        }
23    }
24}
25
26/// Configuration for the PostgreSQL sink.
27#[derive(Clone, Serialize, Deserialize, JsonSchema)]
28pub struct PostgresSinkConfig {
29    /// PostgreSQL connection URL (e.g. `postgres://user:pass@host/db`).
30    pub connection_url: String,
31    /// Target table name.
32    pub table_name: String,
33    /// How to map JSON records to columns.
34    pub column_mapping: PostgresColumnMapping,
35    /// Maximum number of rows per INSERT statement. Defaults to 500.
36    pub batch_size: usize,
37    /// Maximum number of connections in the pool. Defaults to 5.
38    pub max_connections: u32,
39}
40
41impl std::fmt::Debug for PostgresSinkConfig {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("PostgresSinkConfig")
44            .field("connection_url", &"***")
45            .field("table_name", &self.table_name)
46            .field("column_mapping", &self.column_mapping)
47            .field("batch_size", &self.batch_size)
48            .field("max_connections", &self.max_connections)
49            .finish()
50    }
51}
52
53impl PostgresSinkConfig {
54    /// Create a new config with required fields and sensible defaults.
55    pub fn new(connection_url: impl Into<String>, table_name: impl Into<String>) -> Self {
56        Self {
57            connection_url: connection_url.into(),
58            table_name: table_name.into(),
59            column_mapping: PostgresColumnMapping::default(),
60            batch_size: 500,
61            max_connections: 5,
62        }
63    }
64
65    /// Set the column mapping strategy.
66    pub fn column_mapping(mut self, mapping: PostgresColumnMapping) -> Self {
67        self.column_mapping = mapping;
68        self
69    }
70
71    /// Set the batch size for INSERT statements.
72    pub fn batch_size(mut self, n: usize) -> Self {
73        self.batch_size = n;
74        self
75    }
76
77    /// Set the maximum number of connections in the pool.
78    pub fn max_connections(mut self, n: u32) -> Self {
79        self.max_connections = n;
80        self
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87
88    #[test]
89    fn default_config() {
90        let config = PostgresSinkConfig::new("postgres://localhost/test", "events");
91        assert_eq!(config.table_name, "events");
92        assert_eq!(config.batch_size, 500);
93        assert!(matches!(
94            config.column_mapping,
95            PostgresColumnMapping::Jsonb { ref column } if column == "data"
96        ));
97    }
98
99    #[test]
100    fn builder_methods() {
101        let config = PostgresSinkConfig::new("postgres://localhost/test", "events")
102            .column_mapping(PostgresColumnMapping::AutoMap)
103            .batch_size(100);
104        assert_eq!(config.batch_size, 100);
105        assert!(matches!(
106            config.column_mapping,
107            PostgresColumnMapping::AutoMap
108        ));
109    }
110
111    #[test]
112    fn jsonb_custom_column() {
113        let config = PostgresSinkConfig::new("postgres://localhost/test", "events").column_mapping(
114            PostgresColumnMapping::Jsonb {
115                column: "payload".into(),
116            },
117        );
118        assert!(matches!(
119            config.column_mapping,
120            PostgresColumnMapping::Jsonb { ref column } if column == "payload"
121        ));
122    }
123}